Source code: org/activemq/usecases/AvailableConsumerTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.usecases;
19
20 import junit.framework.TestCase;
21 import org.activemq.ActiveMQConnectionFactory;
22 import org.activemq.broker.BrokerContainer;
23 import org.activemq.broker.impl.BrokerContainerImpl;
24 import org.activemq.store.vm.VMPersistenceAdapter;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import javax.jms.Connection;
29 import javax.jms.DeliveryMode;
30 import javax.jms.JMSException;
31 import javax.jms.Message;
32 import javax.jms.MessageConsumer;
33 import javax.jms.MessageListener;
34 import javax.jms.MessageProducer;
35 import javax.jms.Queue;
36 import javax.jms.Session;
37
38 /**
39 * TODO this test case relies on a perfect distribution of messages, dispatched one by one to each consumer.
40 * So this test case can only really work with an explicit setting of 'prefetch = 1' or something similar.
41 * The default out of the box dispatcher eagerly dispatches messages as quickly as possible.
42 *
43 * Ensures that if there is a network of brokers that a message is always dispatched to an available consumer * regardless of which broker it is on.
44 *
45 * @version $Revision: 1.1 $
46 */
47 public class AvailableConsumerTest extends TestCase {
48 private static final transient Log log = LogFactory.getLog(AvailableConsumerTest.class);
49 private static final long BROKER_INITIALIZATION_DELAY = 7000;
50
51 public void testOneBroker() throws Throwable {
52 String[] urls = new String[]{"tcp://localhost:9000"};
53 runSimulation(urls, 2, "QUEUE_NAME");
54 //runSimulation(urls, 5, "QUEUE_NAME");
55 //runSimulation(urls, 10, "QUEUE_NAME");
56 }
57
58 public void testTwoBrokers() throws Throwable {
59 String[] urls = new String[]{"tcp://localhost:9000", "tcp://localhost:9001"};
60 runSimulation(urls, 2, "QUEUE_NAME");
61 runSimulation(urls, 5, "QUEUE_NAME");
62 runSimulation(urls, 10, "QUEUE_NAME");
63 }
64
65 public void testTenBrokers() throws Throwable {
66 String[] urls = new String[]{
67 "tcp://localhost:9000", "tcp://localhost:9001", "tcp://localhost:9002", "tcp://localhost:9003", "tcp://localhost:9004",
68 "tcp://localhost:9005", "tcp://localhost:9006", "tcp://localhost:9007", "tcp://localhost:9008", "tcp://localhost:9009"
69 };
70 runSimulation(urls, 2, "QUEUE_NAME");
71 runSimulation(urls, 5, "QUEUE_NAME");
72 runSimulation(urls, 10, "QUEUE_NAME");
73 }
74
75 private void runSimulation(String[] brokerURLs, int numConsumers, String queue) throws Throwable {
76 assertTrue("More than one consumer is required", numConsumers > 1);
77
78 BrokerThread[] brokers = null;
79 BlockingConsumer[] consumers = null;
80 MessagePublisher[] publishers = null;
81
82 try {
83 String reliableURL = createReliableURL(brokerURLs);
84 brokers = createBrokers(brokerURLs);
85 consumers = createConsumers(reliableURL, numConsumers, queue);
86 publishers = createPublishers(reliableURL, 1, queue);
87
88 // Now get all of the consumers blocked except for one
89 {
90 for (int i = 0; i < consumers.length - 1; i++) {
91 publishers[0].sendMessage();
92 }
93 waitUntilNumBlocked(consumers, consumers.length - 1);
94 }
95
96 // Now send one more message which should cause all of the consumers to be blocked
97 {
98 publishers[0].sendMessage();
99 waitUntilNumBlocked(consumers, consumers.length);
100 }
101
102 // Unblock a consumer and make sure it is unblocked
103 {
104 synchronized (consumers[0]) {
105 consumers[0].notifyAll();
106 }
107 waitUntilNumBlocked(consumers, consumers.length - 1);
108 }
109
110 // Send another message and make sure it is blocked again
111 {
112 publishers[0].sendMessage();
113 waitUntilNumBlocked(consumers, consumers.length);
114 }
115
116 // Finally queue up a message for each consumer but one, then unblock them all and make sure one is still unblocked
117 {
118 for (int i = 0; i < consumers.length - 1; i++) {
119 publishers[0].sendMessage();
120 }
121
122 for (int i = 0; i < consumers.length; i++) {
123 synchronized (consumers[i]) {
124 consumers[i].notifyAll();
125 }
126 }
127
128 waitUntilNumBlocked(consumers, consumers.length - 1);
129 }
130 }
131 finally {
132 cleanupSimulation(brokers, publishers, consumers);
133 }
134 }
135
136 private static void cleanupSimulation(BrokerThread[] brokers, MessagePublisher[] publishers, BlockingConsumer[] consumers) {
137 try {
138 for (int i = 0; i < publishers.length; i++) {
139 publishers[i].done();
140 }
141 }
142 catch (Throwable t) {
143 log.warn("Non-fatal error during cleanup", t);
144 }
145
146 try {
147 for (int i = 0; i < consumers.length; i++) {
148 // Unblock it in case it is blocked
149 synchronized (consumers[i]) {
150 consumers[i].notifyAll();
151 }
152
153 consumers[i].done();
154 }
155 }
156 catch (Throwable t) {
157 log.warn("Non-fatal error during cleanup", t);
158 }
159
160 try {
161 for (int i = 0; i < brokers.length; i++) {
162 brokers[i].done();
163 }
164 }
165 catch (Throwable t) {
166 log.warn("Non-fatal error during cleanup", t);
167 }
168 }
169
170 private static BrokerThread[] createBrokers(String[] brokerURLs) throws InterruptedException {
171 BrokerThread[] threads = new BrokerThread[brokerURLs.length];
172 for (int i = 0; i < threads.length; i++) {
173 threads[i] = new BrokerThread(Integer.toString(i), brokerURLs[i], brokerURLs);
174 threads[i].start();
175 }
176
177 // Delay so that the brokers have a chance to come up fully and connect to each other
178 log.debug("Created " + threads.length + " brokers, giving them time to initialize...");
179 Object temp = new Object();
180 synchronized (temp) {
181 temp.wait(BROKER_INITIALIZATION_DELAY * brokerURLs.length);
182 }
183 log.debug("Brokers should be initialized now");
184
185 return threads;
186 }
187
188 private static BlockingConsumer[] createConsumers(String brokerURL, int numConsumers, String queue) throws JMSException {
189 BlockingConsumer[] consumers = new BlockingConsumer[numConsumers];
190 for (int i = 0; i < consumers.length; i++) {
191 consumers[i] = new BlockingConsumer(brokerURL, queue);
192 }
193
194 return consumers;
195 }
196
197 private static MessagePublisher[] createPublishers(String brokerURL, int numPublishers, String queue) throws JMSException {
198 MessagePublisher[] publishers = new MessagePublisher[numPublishers];
199 for (int i = 0; i < publishers.length; i++) {
200 publishers[i] = new MessagePublisher(brokerURL, queue);
201 }
202
203 return publishers;
204 }
205
206 private static String createReliableURL(String[] brokerURLs) {
207 StringBuffer sb = new StringBuffer("reliable:");
208 for (int i = 0; i < brokerURLs.length; i++) {
209 if (i != 0) {
210 sb.append(',');
211 }
212
213 sb.append(brokerURLs[i]);
214 }
215
216 return sb.toString();
217 }
218
219 private static void waitUntilNumBlocked(BlockingConsumer[] consumers, int expectedNumBlocked) throws InterruptedException {
220 boolean found = false;
221 int maxIterations = 50;
222 for (int iteration = 0; iteration < maxIterations; iteration++) {
223 int numBlocked = 0;
224 for (int i = 0; i < consumers.length; i++) {
225 numBlocked += consumers[i].isBlocked() ? 1 : 0;
226 }
227
228 if (numBlocked == expectedNumBlocked) {
229 found = true;
230 break;
231 }
232
233 log.debug("Waiting for " + expectedNumBlocked + " consumers to block, currently only " + numBlocked + " are blocked.");
234 Object temp = new Object();
235 synchronized (temp) {
236 temp.wait(250);
237 }
238 }
239
240 assertTrue("Never saw " + expectedNumBlocked + " consumers blocked", found);
241 }
242
243 private static final class BrokerThread extends Thread {
244 private final String m_id;
245 private final String m_myURL;
246 private final String[] m_linkedURLs;
247 private BrokerContainer m_container;
248
249 public BrokerThread(String id, String myURL, String[] linkedURLs) {
250 m_id = id;
251 m_myURL = myURL;
252 m_linkedURLs = linkedURLs;
253 }
254
255 public void run() {
256 try {
257 m_container = new BrokerContainerImpl(m_id);
258 m_container.setPersistenceAdapter(new VMPersistenceAdapter());
259 m_container.addConnector(m_myURL);
260
261 for (int i = 0; i < m_linkedURLs.length; i++) {
262 if (!m_myURL.equals(m_linkedURLs[i])) {
263 m_container.addNetworkConnector("reliable:" + m_linkedURLs[i]);
264 }
265 }
266
267 m_container.start();
268 }
269 catch (JMSException e) {
270 log.error("Unexpected exception", e);
271 }
272 }
273
274 public void done() {
275 try {
276 m_container.stop();
277 }
278 catch (JMSException e) {
279 log.error("Unexpected exception", e);
280 }
281 }
282 }
283
284 private static final class MessagePublisher {
285 private final String m_url;
286 private final Connection m_connection;
287 private final Session m_session;
288 private final Queue m_queue;
289 private final MessageProducer m_producer;
290
291 public MessagePublisher(String url, String queue) throws JMSException {
292 this(url, queue, DeliveryMode.PERSISTENT);
293 }
294
295 public MessagePublisher(String url, String queue, int deliveryMode) throws JMSException {
296 m_url = url;
297
298 m_connection = new ActiveMQConnectionFactory(m_url).createConnection();
299 m_connection.start();
300
301 m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
302 m_queue = m_session.createQueue(queue);
303
304 m_producer = m_session.createProducer(m_queue);
305 m_producer.setDeliveryMode(deliveryMode);
306 }
307
308 public void done() {
309 try {
310 m_producer.close();
311 }
312 catch (Throwable ignored) {
313 }
314
315 try {
316 m_session.close();
317 }
318 catch (Throwable ignored) {
319 }
320
321 try {
322 m_connection.close();
323 }
324 catch (Throwable ignored) {
325 }
326 }
327
328 public void sendMessage() throws JMSException {
329 Message message = m_session.createMessage();
330 m_producer.send(message);
331 }
332 }
333
334 private static final class BlockingConsumer implements MessageListener {
335 private final String m_url;
336 private final Connection m_connection;
337 private final Session m_session;
338 private final Queue m_queue;
339 private final MessageConsumer m_consumer;
340 private boolean m_blocked;
341
342 public BlockingConsumer(String url, String queue) throws JMSException {
343 m_url = url;
344
345 m_connection = new ActiveMQConnectionFactory(m_url).createConnection();
346 m_connection.start();
347
348 m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
349 m_queue = m_session.createQueue(queue);
350 m_consumer = m_session.createConsumer(m_queue);
351 m_consumer.setMessageListener(this);
352 m_blocked = false;
353 }
354
355 public boolean isBlocked() {
356 return m_blocked;
357 }
358
359 public void done() {
360 try {
361 m_consumer.setMessageListener(null);
362 }
363 catch (Throwable ignored) {
364 }
365
366 try {
367 m_consumer.close();
368 }
369 catch (Throwable ignored) {
370 }
371
372 try {
373 m_session.close();
374 }
375 catch (Throwable ignored) {
376 }
377
378 try {
379 m_connection.close();
380 }
381 catch (Throwable ignored) {
382 }
383 }
384
385 public void onMessage(Message message) {
386 m_blocked = true;
387
388 synchronized (this) {
389 try {
390 wait();
391 }
392 catch (InterruptedException e) {
393 log.error("Unexpected InterruptedException during onMessage", e);
394 }
395 }
396
397 m_blocked = false;
398 }
399 }
400 }