Source code: org/activemq/usecases/BecksNetworkTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.usecases;
19
20 import junit.framework.TestCase;
21 import org.activemq.ActiveMQConnectionFactory;
22 import org.activemq.broker.BrokerContainer;
23 import org.activemq.broker.impl.BrokerContainerImpl;
24 import org.activemq.store.vm.VMPersistenceAdapter;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import javax.jms.*;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.Set;
32 import java.util.SortedMap;
33 import java.util.TreeMap;
34
35 /**
36 * @author bbeck
37 * @version $Revision: 1.1 $
38 */
39 public class BecksNetworkTest extends TestCase {
40 private static final Log log = LogFactory.getLog(BecksNetworkTest.class);
41
42 private static final int NUM_BROKERS = 10;
43 private static final int NUM_PRODUCERS = 10;
44 private static final int NUM_CONSUMERS = 1;
45 private static final int NUM_MESSAGES = 1;
46 private static final long MESSAGE_SEND_DELAY = 100;
47 private static final long MESSAGE_RECEIVE_DELAY = 50;
48 private static final int BASE_PORT = 9500;
49 private static final String QUEUE_NAME = "QUEUE";
50 private static final String MESSAGE_PRODUCER_KEY = "PRODUCER";
51 private static final String MESSAGE_BODY_KEY = "BODY";
52
53 public void testCase() throws Throwable {
54 main(new String[]{});
55 }
56
57 public static void main(String[] args) throws Throwable {
58 String[] addresses = new String[NUM_BROKERS];
59 for (int i = 0; i < NUM_BROKERS; i++) {
60 addresses[i] = "tcp://localhost:" + (BASE_PORT + i);
61 }
62
63 log.info("Starting brokers");
64 BrokerContainer[] brokers = startBrokers(addresses);
65 String reliableURL = createAddressString(addresses, "reliable:", null);
66
67 log.info("Creating simulation state");
68 final SimulationState state = new SimulationState(NUM_PRODUCERS * NUM_MESSAGES);
69 Thread stateWatcher = new Thread("Simulation State Watcher Thread") {
70 public void run() {
71 while (state.getState() != SimulationState.FINISHED) {
72 log.info("State: " + state);
73
74 synchronized (this) {
75 try {
76 wait(5000);
77 }
78 catch (InterruptedException ignored) {
79 }
80 }
81 }
82 }
83 };
84 stateWatcher.setDaemon(true);
85 stateWatcher.start();
86
87 log.info("Starting components");
88 MessageProducerComponent[] producers = new MessageProducerComponent[NUM_PRODUCERS];
89 MessageConsumerComponent[] consumers = new MessageConsumerComponent[NUM_CONSUMERS];
90 {
91 for (int i = 0; i < NUM_PRODUCERS; i++) {
92 producers[i] = new MessageProducerComponent(state, "MessageProducer[" + i + "]", reliableURL, NUM_MESSAGES);
93 producers[i].start();
94 }
95
96 for (int i = 0; i < NUM_CONSUMERS; i++) {
97 consumers[i] = new MessageConsumerComponent(state, "MessageConsumer[" + i + "]", reliableURL);
98 consumers[i].start();
99 }
100 }
101
102 // Start the simulation
103 log.info("##### Starting the simulation...");
104 state.setState(SimulationState.RUNNING);
105
106 for (int i = 0; i < producers.length; i++) {
107 producers[i].join();
108 }
109 log.info("Producers finished");
110
111 for (int i = 0; i < consumers.length; i++) {
112 consumers[i].join();
113 log.info(consumers[i].getId() + " consumed " + consumers[i].getNumberOfMessagesConsumed() + " messages.");
114 }
115
116 log.info("Consumers finished");
117 log.info("State: " + state);
118
119 state.waitForSimulationState(SimulationState.FINISHED);
120
121 log.info("Stopping brokers");
122 for (int i = 0; i < brokers.length; i++) {
123 brokers[i].stop();
124 }
125 }
126
127 private static BrokerContainer[] startBrokers(String[] addresses) throws JMSException {
128 BrokerContainer[] containers = new BrokerContainer[addresses.length];
129 for (int i = 0; i < containers.length; i++) {
130 containers[i] = new BrokerContainerImpl(Integer.toString(i));
131 containers[i].setPersistenceAdapter(new VMPersistenceAdapter());
132 containers[i].addConnector(addresses[i]);
133
134 for (int j = 0; j < addresses.length; j++) {
135 if (i == j) {
136 continue;
137 }
138
139 containers[i].addNetworkConnector("reliable:" + addresses[j]);
140 }
141
142 containers[i].start();
143
144 log.debug("Created broker on " + addresses[i]);
145 }
146
147 // Delay so this broker has a chance to come up fully...
148 try {
149 Thread.sleep(2000 * containers.length);
150 }
151 catch (InterruptedException ignored) {
152 }
153
154 return containers;
155 }
156
157 /*
158 private static BrokerContainer[] startBrokers(String[] addresses) throws JMSException, IOException
159 {
160 for(int i = 0; i < addresses.length; i++) {
161 File temp = File.createTempFile("broker_" + i + "_", ".xml");
162 temp.deleteOnExit();
163
164
165 PrintWriter fout = new PrintWriter(new FileWriter(temp));
166 fout.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
167 fout.println("<!DOCTYPE beans PUBLIC \"-//ACTIVEMQ//DTD//EN\" \"http://activemq.codehaus.org/dtd/activemq.dtd\">");
168 fout.println("<beans>");
169 fout.println(" <broker name=\"" + "receiver" + i + "\">");
170 fout.println(" <connector>");
171 fout.println(" <tcpServerTransport uri=\"" + addresses[i] + "\"/>");
172 fout.println(" </connector>");
173
174 if(addresses.length > 1) {
175 String otherAddresses = createAddressString(addresses, "list:", addresses[i]);
176 otherAddresses = "tcp://localhost:9000";
177
178 fout.println(" <networkConnector>");
179 fout.println(" <networkChannel uri=\"" + otherAddresses + "\"/>");
180 fout.println(" </networkConnector>");
181 }
182
183 fout.println(" <persistence>");
184 fout.println(" <vmPersistence/>");
185 fout.println(" </persistence>");
186 fout.println(" </broker>");
187 fout.println("</beans>");
188 fout.close();
189
190 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + i);
191 factory.setUseEmbeddedBroker(true);
192 factory.setBrokerXmlConfig("file:" + temp.getAbsolutePath());
193 factory.setBrokerName("broker-" + addresses[i]);
194 factory.start();
195
196 Connection c = factory.createConnection();
197 c.start();
198 }
199
200 // Delay so this broker has a chance to come up fully...
201 try { Thread.sleep(2000*addresses.length); }
202 catch(InterruptedException ignored) {}
203
204 return null;
205 }
206 */
207
208 private static String createAddressString(String[] addresses, String prefix, String addressToSkip) {
209 StringBuffer sb = new StringBuffer(prefix);
210 boolean first = true;
211
212 for (int i = 0; i < addresses.length; i++) {
213 if (addressToSkip != null && addressToSkip.equals(addresses[i])) {
214 continue;
215 }
216
217 if (!first) {
218 sb.append(',');
219 }
220
221 sb.append(addresses[i]);
222 first = false;
223 }
224
225 return sb.toString();
226 }
227
228 private static final class SimulationState {
229 public static final int INITIALIZED = 1;
230 public static final int RUNNING = 2;
231 public static final int FINISHED = 3;
232
233 private final Object m_stateLock;
234 private int m_state;
235 private final int m_numExpectedMessages;
236 private final Set m_messagesProduced;
237 private final Set m_messagesConsumed;
238
239 public SimulationState(int numMessages) {
240 m_stateLock = new Object();
241 synchronized (m_stateLock) {
242 m_state = INITIALIZED;
243 m_numExpectedMessages = numMessages;
244 m_messagesProduced = new HashSet();
245 m_messagesConsumed = new HashSet();
246 }
247 }
248
249 public int getState() {
250 synchronized (m_stateLock) {
251 return m_state;
252 }
253 }
254
255 public void setState(int newState) {
256 synchronized (m_stateLock) {
257 m_state = newState;
258 m_stateLock.notifyAll();
259 }
260 }
261
262 public void waitForSimulationState(int state) throws InterruptedException {
263 synchronized (m_stateLock) {
264 while (m_state != state) {
265 m_stateLock.wait();
266 }
267 }
268 }
269
270 public void onMessageProduced(String producerId, String messageBody) {
271 log.debug("-> onMessageProduced(" + producerId + ", " + messageBody + ")");
272
273 synchronized (m_stateLock) {
274 if (m_state == INITIALIZED) {
275 throw new RuntimeException("Message produced before the simulation has begun: messageBody=" + messageBody);
276 }
277
278 if (m_state == FINISHED) {
279 throw new RuntimeException("Message produced after the simulation has finished: messageBody=" + messageBody);
280 }
281
282 if (!m_messagesProduced.add(messageBody)) {
283 throw new RuntimeException("Duplicate message produced: messageBody=" + messageBody);
284 }
285 }
286 }
287
288 public void onMessageConsumed(String consumerId, String messageBody) {
289 log.debug("<- onMessageConsumed(" + consumerId + ", " + messageBody + ")");
290
291 synchronized (m_stateLock) {
292 if (m_state != RUNNING) {
293 throw new RuntimeException("Message consumed while the simulation wasn't running: state = " + m_state + ", messageBody=" + messageBody);
294 }
295
296 if (!m_messagesProduced.contains(messageBody)) {
297 throw new RuntimeException("Message consumed that wasn't produced: messageBody=" + messageBody);
298 }
299
300 if (!m_messagesConsumed.add(messageBody)) {
301 throw new RuntimeException("Message consumed more than once: messageBody=" + messageBody);
302 }
303
304 if (m_messagesConsumed.size() == m_numExpectedMessages) {
305 setState(FINISHED);
306 log.info("All expected messages have been consumed, finishing simulation.");
307 }
308 }
309 }
310
311 public String toString() {
312 synchronized (m_stateLock) {
313 SortedMap unconsumed = new TreeMap();
314 for (Iterator iter = m_messagesProduced.iterator(); iter.hasNext();) {
315 String message = (String) iter.next();
316 int colonIndex = message.indexOf(':');
317 String producerId = message.substring(0, colonIndex);
318
319 Integer numMessages = (Integer) unconsumed.get(producerId);
320 numMessages = (numMessages == null) ? new Integer(1) : new Integer(numMessages.intValue() + 1);
321 unconsumed.put(producerId, numMessages);
322 }
323
324 for (Iterator iter = m_messagesConsumed.iterator(); iter.hasNext();) {
325 String message = (String) iter.next();
326 int colonIndex = message.indexOf(':');
327 String producerId = message.substring(0, colonIndex);
328
329 Integer numMessages = (Integer) unconsumed.get(producerId);
330 numMessages = (numMessages == null) ? new Integer(-1) : new Integer(numMessages.intValue() - 1);
331 if (numMessages.intValue() == 0) {
332 unconsumed.remove(producerId);
333 }
334 else {
335 unconsumed.put(producerId, numMessages);
336 }
337 }
338
339
340 return "SimulationState["
341 + "state=" + m_state + " "
342 + "numExpectedMessages=" + m_numExpectedMessages + " "
343 + "numMessagesProduced=" + m_messagesProduced.size() + " "
344 + "numMessagesConsumed=" + m_messagesConsumed.size() + " "
345 + "unconsumed=" + unconsumed;
346 }
347 }
348 }
349
350 private static abstract class SimulationComponent extends Thread {
351 protected final SimulationState m_simulationState;
352 protected final String m_id;
353
354 protected abstract void _initialize() throws Throwable;
355
356 protected abstract void _run() throws Throwable;
357
358 protected abstract void _cleanup() throws Throwable;
359
360 public SimulationComponent(SimulationState state, String id) {
361 super(id);
362
363 m_simulationState = state;
364 m_id = id;
365 }
366
367 public String getId() {
368 return m_id;
369 }
370
371 public final void run() {
372 try {
373 try {
374 _initialize();
375 }
376 catch (Throwable t) {
377 log.error("Error during initialization", t);
378 return;
379 }
380
381 try {
382 if (m_simulationState.getState() == SimulationState.FINISHED) {
383 log.info(m_id + " : NO NEED TO WAIT FOR RUNNING - already FINISHED");
384 }
385 else {
386 log.info(m_id + ": WAITING for RUNNING started");
387 m_simulationState.waitForSimulationState(SimulationState.RUNNING);
388 log.info(m_id + ": WAITING for RUNNING finished");
389 }
390 }
391 catch (InterruptedException e) {
392 log.error("Interrupted during wait for the simulation to begin", e);
393 return;
394 }
395
396 try {
397 _run();
398 }
399 catch (Throwable t) {
400 log.error("Error during running", t);
401 }
402 }
403 finally {
404 try {
405 _cleanup();
406 }
407 catch (Throwable t) {
408 log.error("Error during cleanup", t);
409 }
410 }
411
412 }
413 }
414
415 private static abstract class JMSComponent extends SimulationComponent {
416 protected final String m_url;
417 protected Connection m_connection;
418 protected Session m_session;
419 protected Queue m_queue;
420
421 public JMSComponent(SimulationState state, String id, String url) {
422 super(state, id);
423
424 m_url = url;
425 }
426
427 protected void _initialize() throws JMSException {
428 m_connection = new ActiveMQConnectionFactory(m_url).createConnection();
429 m_connection.start();
430
431 m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
432 m_queue = m_session.createQueue(QUEUE_NAME);
433 }
434
435 protected void _cleanup() throws JMSException {
436 if (m_session != null) {
437 m_session.close();
438 }
439
440 if (m_connection != null) {
441 m_connection.close();
442 }
443 }
444 }
445
446 private static final class MessageProducerComponent extends JMSComponent {
447 private final int m_numMessagesToSend;
448 private MessageProducer m_producer;
449
450 public MessageProducerComponent(SimulationState state, String id, String url, int numMessages) {
451 super(state, id, url);
452
453 m_numMessagesToSend = numMessages;
454 }
455
456 protected void _initialize() throws JMSException {
457 super._initialize();
458
459 m_producer = m_session.createProducer(m_queue);
460 m_producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
461 }
462
463 protected void _cleanup() throws JMSException {
464 if (m_producer != null) {
465 m_producer.close();
466 }
467
468 super._cleanup();
469 }
470
471 public void _run() throws JMSException, InterruptedException {
472 log.debug(m_id + ": started");
473 for (int num = 0; num < m_numMessagesToSend; num++) {
474 String messageBody = createMessageBody(m_id, num);
475
476 MapMessage message = m_session.createMapMessage();
477 message.setString(MESSAGE_PRODUCER_KEY, m_id);
478 message.setString(MESSAGE_BODY_KEY, messageBody);
479
480 // Pretend to be doing some work....
481 Thread.sleep(MESSAGE_SEND_DELAY);
482
483 m_simulationState.onMessageProduced(m_id, messageBody);
484 m_producer.send(message);
485 }
486 }
487
488 private static String createMessageBody(String id, int num) {
489 return id + ":" + Integer.toString(num);
490 }
491 }
492
493 private static final class MessageConsumerComponent extends JMSComponent implements MessageListener {
494 private final Object m_stateLock;
495 private boolean m_inOnMessage;
496
497 private MessageConsumer m_consumer;
498 private int m_numMessagesConsumed;
499
500 public MessageConsumerComponent(SimulationState state, String id, String url) {
501 super(state, id, url);
502 m_stateLock = new Object();
503 m_inOnMessage = false;
504 }
505
506 protected void _initialize() throws JMSException {
507 super._initialize();
508
509 m_consumer = m_session.createConsumer(m_queue);
510 m_consumer.setMessageListener(this);
511
512 m_numMessagesConsumed = 0;
513 }
514
515 protected void _cleanup() throws JMSException {
516 if (m_consumer != null) {
517 m_consumer.close();
518 }
519
520 super._cleanup();
521 }
522
523 public void _run() throws InterruptedException {
524 log.info(m_id + ": WAITING for FINISHED started");
525 m_simulationState.waitForSimulationState(SimulationState.FINISHED);
526 log.info(m_id + ": WAITING for FINISHED finished");
527 }
528
529 public int getNumberOfMessagesConsumed() {
530 return m_numMessagesConsumed;
531 }
532
533 public void onMessage(Message msg) {
534 synchronized (m_stateLock) {
535 if (m_inOnMessage) {
536 log.error("Already in onMessage!!!");
537 }
538
539 m_inOnMessage = true;
540 }
541
542 try {
543 MapMessage message = (MapMessage) msg;
544 String messageBody = message.getString(MESSAGE_BODY_KEY);
545
546 m_simulationState.onMessageConsumed(m_id, messageBody);
547 m_numMessagesConsumed++;
548
549 // Pretend to be doing some work....
550 Thread.sleep(MESSAGE_RECEIVE_DELAY);
551 }
552 catch (Throwable t) {
553 log.error("Unexpected error during onMessage: message=" + msg, t);
554 }
555 finally {
556 synchronized (m_stateLock) {
557 if (!m_inOnMessage) {
558 log.error("Not already in onMessage!!!");
559 }
560
561 m_inOnMessage = false;
562 }
563 }
564 }
565 }
566 }