Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/usecases/AvailableConsumerTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.usecases;
19  
20  import junit.framework.TestCase;
21  import org.activemq.ActiveMQConnectionFactory;
22  import org.activemq.broker.BrokerContainer;
23  import org.activemq.broker.impl.BrokerContainerImpl;
24  import org.activemq.store.vm.VMPersistenceAdapter;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  
28  import javax.jms.Connection;
29  import javax.jms.DeliveryMode;
30  import javax.jms.JMSException;
31  import javax.jms.Message;
32  import javax.jms.MessageConsumer;
33  import javax.jms.MessageListener;
34  import javax.jms.MessageProducer;
35  import javax.jms.Queue;
36  import javax.jms.Session;
37  
38  /**
39   * TODO this test case relies on a perfect distribution of messages, dispatched one by one to each consumer.
40   * So this test case can only really work with an explicit setting of 'prefetch = 1' or something similar.
41   * The default out of the box dispatcher eagerly dispatches messages as quickly as possible.
42   * 
43   * Ensures that if there is a network of brokers that a message is always dispatched to an available consumer * regardless of which broker it is on.
44   *
45   * @version $Revision: 1.1 $
46   */
47  public class AvailableConsumerTest extends TestCase {
48      private static final transient Log log = LogFactory.getLog(AvailableConsumerTest.class);
49      private static final long BROKER_INITIALIZATION_DELAY = 7000;
50  
51      public void testOneBroker() throws Throwable {
52          String[] urls = new String[]{"tcp://localhost:9000"};
53          runSimulation(urls, 2, "QUEUE_NAME");
54          //runSimulation(urls, 5, "QUEUE_NAME");
55          //runSimulation(urls, 10, "QUEUE_NAME");
56      }
57  
58      public void testTwoBrokers() throws Throwable {
59          String[] urls = new String[]{"tcp://localhost:9000", "tcp://localhost:9001"};
60          runSimulation(urls, 2, "QUEUE_NAME");
61          runSimulation(urls, 5, "QUEUE_NAME");
62          runSimulation(urls, 10, "QUEUE_NAME");
63      }
64  
65      public void testTenBrokers() throws Throwable {
66          String[] urls = new String[]{
67              "tcp://localhost:9000", "tcp://localhost:9001", "tcp://localhost:9002", "tcp://localhost:9003", "tcp://localhost:9004",
68              "tcp://localhost:9005", "tcp://localhost:9006", "tcp://localhost:9007", "tcp://localhost:9008", "tcp://localhost:9009"
69          };
70          runSimulation(urls, 2, "QUEUE_NAME");
71          runSimulation(urls, 5, "QUEUE_NAME");
72          runSimulation(urls, 10, "QUEUE_NAME");
73      }
74  
75      private void runSimulation(String[] brokerURLs, int numConsumers, String queue) throws Throwable {
76          assertTrue("More than one consumer is required", numConsumers > 1);
77  
78          BrokerThread[] brokers = null;
79          BlockingConsumer[] consumers = null;
80          MessagePublisher[] publishers = null;
81  
82          try {
83              String reliableURL = createReliableURL(brokerURLs);
84              brokers = createBrokers(brokerURLs);
85              consumers = createConsumers(reliableURL, numConsumers, queue);
86              publishers = createPublishers(reliableURL, 1, queue);
87  
88              // Now get all of the consumers blocked except for one
89              {
90                  for (int i = 0; i < consumers.length - 1; i++) {
91                      publishers[0].sendMessage();
92                  }
93                  waitUntilNumBlocked(consumers, consumers.length - 1);
94              }
95  
96              // Now send one more message which should cause all of the consumers to be blocked
97              {
98                  publishers[0].sendMessage();
99                  waitUntilNumBlocked(consumers, consumers.length);
100             }
101 
102             // Unblock a consumer and make sure it is unblocked
103             {
104                 synchronized (consumers[0]) {
105                     consumers[0].notifyAll();
106                 }
107                 waitUntilNumBlocked(consumers, consumers.length - 1);
108             }
109 
110             // Send another message and make sure it is blocked again
111             {
112                 publishers[0].sendMessage();
113                 waitUntilNumBlocked(consumers, consumers.length);
114             }
115 
116             // Finally queue up a message for each consumer but one, then unblock them all and make sure one is still unblocked
117             {
118                 for (int i = 0; i < consumers.length - 1; i++) {
119                     publishers[0].sendMessage();
120                 }
121 
122                 for (int i = 0; i < consumers.length; i++) {
123                     synchronized (consumers[i]) {
124                         consumers[i].notifyAll();
125                     }
126                 }
127 
128                 waitUntilNumBlocked(consumers, consumers.length - 1);
129             }
130         }
131         finally {
132             cleanupSimulation(brokers, publishers, consumers);
133         }
134     }
135 
136     private static void cleanupSimulation(BrokerThread[] brokers, MessagePublisher[] publishers, BlockingConsumer[] consumers) {
137         try {
138             for (int i = 0; i < publishers.length; i++) {
139                 publishers[i].done();
140             }
141         }
142         catch (Throwable t) {
143             log.warn("Non-fatal error during cleanup", t);
144         }
145 
146         try {
147             for (int i = 0; i < consumers.length; i++) {
148                 // Unblock it in case it is blocked
149                 synchronized (consumers[i]) {
150                     consumers[i].notifyAll();
151                 }
152 
153                 consumers[i].done();
154             }
155         }
156         catch (Throwable t) {
157             log.warn("Non-fatal error during cleanup", t);
158         }
159 
160         try {
161             for (int i = 0; i < brokers.length; i++) {
162                 brokers[i].done();
163             }
164         }
165         catch (Throwable t) {
166             log.warn("Non-fatal error during cleanup", t);
167         }
168     }
169 
170     private static BrokerThread[] createBrokers(String[] brokerURLs) throws InterruptedException {
171         BrokerThread[] threads = new BrokerThread[brokerURLs.length];
172         for (int i = 0; i < threads.length; i++) {
173             threads[i] = new BrokerThread(Integer.toString(i), brokerURLs[i], brokerURLs);
174             threads[i].start();
175         }
176 
177 // Delay so that the brokers have a chance to come up fully and connect to each other
178         log.debug("Created " + threads.length + " brokers, giving them time to initialize...");
179         Object temp = new Object();
180         synchronized (temp) {
181             temp.wait(BROKER_INITIALIZATION_DELAY * brokerURLs.length);
182         }
183         log.debug("Brokers should be initialized now");
184 
185         return threads;
186     }
187 
188     private static BlockingConsumer[] createConsumers(String brokerURL, int numConsumers, String queue) throws JMSException {
189         BlockingConsumer[] consumers = new BlockingConsumer[numConsumers];
190         for (int i = 0; i < consumers.length; i++) {
191             consumers[i] = new BlockingConsumer(brokerURL, queue);
192         }
193 
194         return consumers;
195     }
196 
197     private static MessagePublisher[] createPublishers(String brokerURL, int numPublishers, String queue) throws JMSException {
198         MessagePublisher[] publishers = new MessagePublisher[numPublishers];
199         for (int i = 0; i < publishers.length; i++) {
200             publishers[i] = new MessagePublisher(brokerURL, queue);
201         }
202 
203         return publishers;
204     }
205 
206     private static String createReliableURL(String[] brokerURLs) {
207         StringBuffer sb = new StringBuffer("reliable:");
208         for (int i = 0; i < brokerURLs.length; i++) {
209             if (i != 0) {
210                 sb.append(',');
211             }
212 
213             sb.append(brokerURLs[i]);
214         }
215 
216         return sb.toString();
217     }
218 
219     private static void waitUntilNumBlocked(BlockingConsumer[] consumers, int expectedNumBlocked) throws InterruptedException {
220         boolean found = false;
221         int maxIterations = 50;
222         for (int iteration = 0; iteration < maxIterations; iteration++) {
223             int numBlocked = 0;
224             for (int i = 0; i < consumers.length; i++) {
225                 numBlocked += consumers[i].isBlocked() ? 1 : 0;
226             }
227 
228             if (numBlocked == expectedNumBlocked) {
229                 found = true;
230                 break;
231             }
232 
233             log.debug("Waiting for " + expectedNumBlocked + " consumers to block, currently only " + numBlocked + " are blocked.");
234             Object temp = new Object();
235             synchronized (temp) {
236                 temp.wait(250);
237             }
238         }
239 
240         assertTrue("Never saw " + expectedNumBlocked + " consumers blocked", found);
241     }
242 
243     private static final class BrokerThread extends Thread {
244         private final String m_id;
245         private final String m_myURL;
246         private final String[] m_linkedURLs;
247         private BrokerContainer m_container;
248 
249         public BrokerThread(String id, String myURL, String[] linkedURLs) {
250             m_id = id;
251             m_myURL = myURL;
252             m_linkedURLs = linkedURLs;
253         }
254 
255         public void run() {
256             try {
257                 m_container = new BrokerContainerImpl(m_id);
258                 m_container.setPersistenceAdapter(new VMPersistenceAdapter());
259                 m_container.addConnector(m_myURL);
260 
261                 for (int i = 0; i < m_linkedURLs.length; i++) {
262                     if (!m_myURL.equals(m_linkedURLs[i])) {
263                         m_container.addNetworkConnector("reliable:" + m_linkedURLs[i]);
264                     }
265                 }
266 
267                 m_container.start();
268             }
269             catch (JMSException e) {
270                 log.error("Unexpected exception", e);
271             }
272         }
273 
274         public void done() {
275             try {
276                 m_container.stop();
277             }
278             catch (JMSException e) {
279                 log.error("Unexpected exception", e);
280             }
281         }
282     }
283 
284     private static final class MessagePublisher {
285         private final String m_url;
286         private final Connection m_connection;
287         private final Session m_session;
288         private final Queue m_queue;
289         private final MessageProducer m_producer;
290 
291         public MessagePublisher(String url, String queue) throws JMSException {
292             this(url, queue, DeliveryMode.PERSISTENT);
293         }
294 
295         public MessagePublisher(String url, String queue, int deliveryMode) throws JMSException {
296             m_url = url;
297 
298             m_connection = new ActiveMQConnectionFactory(m_url).createConnection();
299             m_connection.start();
300 
301             m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
302             m_queue = m_session.createQueue(queue);
303 
304             m_producer = m_session.createProducer(m_queue);
305             m_producer.setDeliveryMode(deliveryMode);
306         }
307 
308         public void done() {
309             try {
310                 m_producer.close();
311             }
312             catch (Throwable ignored) {
313             }
314 
315             try {
316                 m_session.close();
317             }
318             catch (Throwable ignored) {
319             }
320 
321             try {
322                 m_connection.close();
323             }
324             catch (Throwable ignored) {
325             }
326         }
327 
328         public void sendMessage() throws JMSException {
329             Message message = m_session.createMessage();
330             m_producer.send(message);
331         }
332     }
333 
334     private static final class BlockingConsumer implements MessageListener {
335         private final String m_url;
336         private final Connection m_connection;
337         private final Session m_session;
338         private final Queue m_queue;
339         private final MessageConsumer m_consumer;
340         private boolean m_blocked;
341 
342         public BlockingConsumer(String url, String queue) throws JMSException {
343             m_url = url;
344 
345             m_connection = new ActiveMQConnectionFactory(m_url).createConnection();
346             m_connection.start();
347 
348             m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
349             m_queue = m_session.createQueue(queue);
350             m_consumer = m_session.createConsumer(m_queue);
351             m_consumer.setMessageListener(this);
352             m_blocked = false;
353         }
354 
355         public boolean isBlocked() {
356             return m_blocked;
357         }
358 
359         public void done() {
360             try {
361                 m_consumer.setMessageListener(null);
362             }
363             catch (Throwable ignored) {
364             }
365 
366             try {
367                 m_consumer.close();
368             }
369             catch (Throwable ignored) {
370             }
371 
372             try {
373                 m_session.close();
374             }
375             catch (Throwable ignored) {
376             }
377 
378             try {
379                 m_connection.close();
380             }
381             catch (Throwable ignored) {
382             }
383         }
384 
385         public void onMessage(Message message) {
386             m_blocked = true;
387 
388             synchronized (this) {
389                 try {
390                     wait();
391                 }
392                 catch (InterruptedException e) {
393                     log.error("Unexpected InterruptedException during onMessage", e);
394                 }
395             }
396 
397             m_blocked = false;
398         }
399     }
400 }