Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/usecases/BecksNetworkTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.usecases;
19  
20  import junit.framework.TestCase;
21  import org.activemq.ActiveMQConnectionFactory;
22  import org.activemq.broker.BrokerContainer;
23  import org.activemq.broker.impl.BrokerContainerImpl;
24  import org.activemq.store.vm.VMPersistenceAdapter;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  
28  import javax.jms.*;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.Set;
32  import java.util.SortedMap;
33  import java.util.TreeMap;
34  
35  /**
36   * @author bbeck
37   * @version $Revision: 1.1 $
38   */
39  public class BecksNetworkTest extends TestCase {
40      private static final Log log = LogFactory.getLog(BecksNetworkTest.class);
41  
42      private static final int NUM_BROKERS = 10;
43      private static final int NUM_PRODUCERS = 10;
44      private static final int NUM_CONSUMERS = 1;
45      private static final int NUM_MESSAGES = 1;
46      private static final long MESSAGE_SEND_DELAY = 100;
47      private static final long MESSAGE_RECEIVE_DELAY = 50;
48      private static final int BASE_PORT = 9500;
49      private static final String QUEUE_NAME = "QUEUE";
50      private static final String MESSAGE_PRODUCER_KEY = "PRODUCER";
51      private static final String MESSAGE_BODY_KEY = "BODY";
52  
53      public void testCase() throws Throwable {
54          main(new String[]{});
55      }
56  
57      public static void main(String[] args) throws Throwable {
58          String[] addresses = new String[NUM_BROKERS];
59          for (int i = 0; i < NUM_BROKERS; i++) {
60              addresses[i] = "tcp://localhost:" + (BASE_PORT + i);
61          }
62  
63          log.info("Starting brokers");
64          BrokerContainer[] brokers = startBrokers(addresses);
65          String reliableURL = createAddressString(addresses, "reliable:", null);
66  
67          log.info("Creating simulation state");
68          final SimulationState state = new SimulationState(NUM_PRODUCERS * NUM_MESSAGES);
69          Thread stateWatcher = new Thread("Simulation State Watcher Thread") {
70              public void run() {
71                  while (state.getState() != SimulationState.FINISHED) {
72                      log.info("State: " + state);
73  
74                      synchronized (this) {
75                          try {
76                              wait(5000);
77                          }
78                          catch (InterruptedException ignored) {
79                          }
80                      }
81                  }
82              }
83          };
84          stateWatcher.setDaemon(true);
85          stateWatcher.start();
86  
87          log.info("Starting components");
88          MessageProducerComponent[] producers = new MessageProducerComponent[NUM_PRODUCERS];
89          MessageConsumerComponent[] consumers = new MessageConsumerComponent[NUM_CONSUMERS];
90          {
91              for (int i = 0; i < NUM_PRODUCERS; i++) {
92                  producers[i] = new MessageProducerComponent(state, "MessageProducer[" + i + "]", reliableURL, NUM_MESSAGES);
93                  producers[i].start();
94              }
95  
96              for (int i = 0; i < NUM_CONSUMERS; i++) {
97                  consumers[i] = new MessageConsumerComponent(state, "MessageConsumer[" + i + "]", reliableURL);
98                  consumers[i].start();
99              }
100         }
101 
102         // Start the simulation
103         log.info("##### Starting the simulation...");
104         state.setState(SimulationState.RUNNING);
105 
106         for (int i = 0; i < producers.length; i++) {
107             producers[i].join();
108         }
109         log.info("Producers finished");
110 
111         for (int i = 0; i < consumers.length; i++) {
112             consumers[i].join();
113             log.info(consumers[i].getId() + " consumed " + consumers[i].getNumberOfMessagesConsumed() + " messages.");
114         }
115 
116         log.info("Consumers finished");
117         log.info("State: " + state);
118 
119         state.waitForSimulationState(SimulationState.FINISHED);
120 
121         log.info("Stopping brokers");
122         for (int i = 0; i < brokers.length; i++) {
123             brokers[i].stop();
124         }
125     }
126 
127     private static BrokerContainer[] startBrokers(String[] addresses) throws JMSException {
128         BrokerContainer[] containers = new BrokerContainer[addresses.length];
129         for (int i = 0; i < containers.length; i++) {
130             containers[i] = new BrokerContainerImpl(Integer.toString(i));
131             containers[i].setPersistenceAdapter(new VMPersistenceAdapter());
132             containers[i].addConnector(addresses[i]);
133 
134             for (int j = 0; j < addresses.length; j++) {
135                 if (i == j) {
136                     continue;
137                 }
138 
139                 containers[i].addNetworkConnector("reliable:" + addresses[j]);
140             }
141 
142             containers[i].start();
143 
144             log.debug("Created broker on " + addresses[i]);
145         }
146 
147         // Delay so this broker has a chance to come up fully...
148         try {
149             Thread.sleep(2000 * containers.length);
150         }
151         catch (InterruptedException ignored) {
152         }
153 
154         return containers;
155     }
156 
157     /*
158     private static BrokerContainer[] startBrokers(String[] addresses) throws JMSException, IOException
159     {
160     for(int i = 0; i < addresses.length; i++) {
161                     File temp = File.createTempFile("broker_" + i + "_", ".xml");
162                     temp.deleteOnExit();
163 
164 
165                     PrintWriter fout = new PrintWriter(new FileWriter(temp));
166                     fout.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
167                     fout.println("<!DOCTYPE beans PUBLIC  \"-//ACTIVEMQ//DTD//EN\" \"http://activemq.codehaus.org/dtd/activemq.dtd\">");
168                     fout.println("<beans>");
169                     fout.println("  <broker name=\"" + "receiver" + i + "\">");
170                     fout.println("          <connector>");
171                     fout.println("                  <tcpServerTransport uri=\"" + addresses[i] + "\"/>");
172                     fout.println("          </connector>");
173 
174                     if(addresses.length > 1) {
175                             String otherAddresses = createAddressString(addresses, "list:", addresses[i]);
176                             otherAddresses = "tcp://localhost:9000";
177 
178             fout.println("          <networkConnector>");
179             fout.println("                  <networkChannel uri=\"" + otherAddresses + "\"/>");
180             fout.println("          </networkConnector>");
181                     }
182 
183                     fout.println("          <persistence>");
184                     fout.println("                  <vmPersistence/>");
185                     fout.println("          </persistence>");
186                     fout.println("  </broker>");
187                     fout.println("</beans>");
188                     fout.close();
189 
190                     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + i);
191                     factory.setUseEmbeddedBroker(true);
192                     factory.setBrokerXmlConfig("file:" + temp.getAbsolutePath());
193                     factory.setBrokerName("broker-" + addresses[i]);
194                     factory.start();
195 
196                     Connection c = factory.createConnection();
197                     c.start();
198     }
199 
200             // Delay so this broker has a chance to come up fully...
201             try { Thread.sleep(2000*addresses.length); }
202             catch(InterruptedException ignored) {}
203 
204             return null;
205     }
206     */
207 
208     private static String createAddressString(String[] addresses, String prefix, String addressToSkip) {
209         StringBuffer sb = new StringBuffer(prefix);
210         boolean first = true;
211 
212         for (int i = 0; i < addresses.length; i++) {
213             if (addressToSkip != null && addressToSkip.equals(addresses[i])) {
214                 continue;
215             }
216 
217             if (!first) {
218                 sb.append(',');
219             }
220 
221             sb.append(addresses[i]);
222             first = false;
223         }
224 
225         return sb.toString();
226     }
227 
228     private static final class SimulationState {
229         public static final int INITIALIZED = 1;
230         public static final int RUNNING = 2;
231         public static final int FINISHED = 3;
232 
233         private final Object m_stateLock;
234         private int m_state;
235         private final int m_numExpectedMessages;
236         private final Set m_messagesProduced;
237         private final Set m_messagesConsumed;
238 
239         public SimulationState(int numMessages) {
240             m_stateLock = new Object();
241             synchronized (m_stateLock) {
242                 m_state = INITIALIZED;
243                 m_numExpectedMessages = numMessages;
244                 m_messagesProduced = new HashSet();
245                 m_messagesConsumed = new HashSet();
246             }
247         }
248 
249         public int getState() {
250             synchronized (m_stateLock) {
251                 return m_state;
252             }
253         }
254 
255         public void setState(int newState) {
256             synchronized (m_stateLock) {
257                 m_state = newState;
258                 m_stateLock.notifyAll();
259             }
260         }
261 
262         public void waitForSimulationState(int state) throws InterruptedException {
263             synchronized (m_stateLock) {
264                 while (m_state != state) {
265                     m_stateLock.wait();
266                 }
267             }
268         }
269 
270         public void onMessageProduced(String producerId, String messageBody) {
271             log.debug("-> onMessageProduced(" + producerId + ", " + messageBody + ")");
272 
273             synchronized (m_stateLock) {
274                 if (m_state == INITIALIZED) {
275                     throw new RuntimeException("Message produced before the simulation has begun: messageBody=" + messageBody);
276                 }
277 
278                 if (m_state == FINISHED) {
279                     throw new RuntimeException("Message produced after the simulation has finished: messageBody=" + messageBody);
280                 }
281 
282                 if (!m_messagesProduced.add(messageBody)) {
283                     throw new RuntimeException("Duplicate message produced: messageBody=" + messageBody);
284                 }
285             }
286         }
287 
288         public void onMessageConsumed(String consumerId, String messageBody) {
289             log.debug("<- onMessageConsumed(" + consumerId + ", " + messageBody + ")");
290 
291             synchronized (m_stateLock) {
292                 if (m_state != RUNNING) {
293                     throw new RuntimeException("Message consumed while the simulation wasn't running: state = " + m_state + ", messageBody=" + messageBody);
294                 }
295 
296                 if (!m_messagesProduced.contains(messageBody)) {
297                     throw new RuntimeException("Message consumed that wasn't produced: messageBody=" + messageBody);
298                 }
299 
300                 if (!m_messagesConsumed.add(messageBody)) {
301                     throw new RuntimeException("Message consumed more than once: messageBody=" + messageBody);
302                 }
303 
304                 if (m_messagesConsumed.size() == m_numExpectedMessages) {
305                     setState(FINISHED);
306                     log.info("All expected messages have been consumed, finishing simulation.");
307                 }
308             }
309         }
310 
311         public String toString() {
312             synchronized (m_stateLock) {
313                 SortedMap unconsumed = new TreeMap();
314                 for (Iterator iter = m_messagesProduced.iterator(); iter.hasNext();) {
315                     String message = (String) iter.next();
316                     int colonIndex = message.indexOf(':');
317                     String producerId = message.substring(0, colonIndex);
318 
319                     Integer numMessages = (Integer) unconsumed.get(producerId);
320                     numMessages = (numMessages == null) ? new Integer(1) : new Integer(numMessages.intValue() + 1);
321                     unconsumed.put(producerId, numMessages);
322                 }
323 
324                 for (Iterator iter = m_messagesConsumed.iterator(); iter.hasNext();) {
325                     String message = (String) iter.next();
326                     int colonIndex = message.indexOf(':');
327                     String producerId = message.substring(0, colonIndex);
328 
329                     Integer numMessages = (Integer) unconsumed.get(producerId);
330                     numMessages = (numMessages == null) ? new Integer(-1) : new Integer(numMessages.intValue() - 1);
331                     if (numMessages.intValue() == 0) {
332                         unconsumed.remove(producerId);
333                     }
334                     else {
335                         unconsumed.put(producerId, numMessages);
336                     }
337                 }
338 
339 
340                 return "SimulationState["
341                         + "state=" + m_state + " "
342                         + "numExpectedMessages=" + m_numExpectedMessages + " "
343                         + "numMessagesProduced=" + m_messagesProduced.size() + " "
344                         + "numMessagesConsumed=" + m_messagesConsumed.size() + " "
345                         + "unconsumed=" + unconsumed;
346             }
347         }
348     }
349 
350     private static abstract class SimulationComponent extends Thread {
351         protected final SimulationState m_simulationState;
352         protected final String m_id;
353 
354         protected abstract void _initialize() throws Throwable;
355 
356         protected abstract void _run() throws Throwable;
357 
358         protected abstract void _cleanup() throws Throwable;
359 
360         public SimulationComponent(SimulationState state, String id) {
361             super(id);
362 
363             m_simulationState = state;
364             m_id = id;
365         }
366 
367         public String getId() {
368             return m_id;
369         }
370 
371         public final void run() {
372             try {
373                 try {
374                     _initialize();
375                 }
376                 catch (Throwable t) {
377                     log.error("Error during initialization", t);
378                     return;
379                 }
380 
381                 try {
382                     if (m_simulationState.getState() == SimulationState.FINISHED) {
383                         log.info(m_id + " : NO NEED TO WAIT FOR RUNNING - already FINISHED");
384                     }
385                     else {
386                         log.info(m_id + ": WAITING for RUNNING started");
387                         m_simulationState.waitForSimulationState(SimulationState.RUNNING);
388                         log.info(m_id + ": WAITING for RUNNING finished");
389                     }
390                 }
391                 catch (InterruptedException e) {
392                     log.error("Interrupted during wait for the simulation to begin", e);
393                     return;
394                 }
395 
396                 try {
397                     _run();
398                 }
399                 catch (Throwable t) {
400                     log.error("Error during running", t);
401                 }
402             }
403             finally {
404                 try {
405                     _cleanup();
406                 }
407                 catch (Throwable t) {
408                     log.error("Error during cleanup", t);
409                 }
410             }
411 
412         }
413     }
414 
415     private static abstract class JMSComponent extends SimulationComponent {
416         protected final String m_url;
417         protected Connection m_connection;
418         protected Session m_session;
419         protected Queue m_queue;
420 
421         public JMSComponent(SimulationState state, String id, String url) {
422             super(state, id);
423 
424             m_url = url;
425         }
426 
427         protected void _initialize() throws JMSException {
428             m_connection = new ActiveMQConnectionFactory(m_url).createConnection();
429             m_connection.start();
430 
431             m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
432             m_queue = m_session.createQueue(QUEUE_NAME);
433         }
434 
435         protected void _cleanup() throws JMSException {
436             if (m_session != null) {
437                 m_session.close();
438             }
439 
440             if (m_connection != null) {
441                 m_connection.close();
442             }
443         }
444     }
445 
446     private static final class MessageProducerComponent extends JMSComponent {
447         private final int m_numMessagesToSend;
448         private MessageProducer m_producer;
449 
450         public MessageProducerComponent(SimulationState state, String id, String url, int numMessages) {
451             super(state, id, url);
452 
453             m_numMessagesToSend = numMessages;
454         }
455 
456         protected void _initialize() throws JMSException {
457             super._initialize();
458 
459             m_producer = m_session.createProducer(m_queue);
460             m_producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
461         }
462 
463         protected void _cleanup() throws JMSException {
464             if (m_producer != null) {
465                 m_producer.close();
466             }
467 
468             super._cleanup();
469         }
470 
471         public void _run() throws JMSException, InterruptedException {
472             log.debug(m_id + ": started");
473             for (int num = 0; num < m_numMessagesToSend; num++) {
474                 String messageBody = createMessageBody(m_id, num);
475 
476                 MapMessage message = m_session.createMapMessage();
477                 message.setString(MESSAGE_PRODUCER_KEY, m_id);
478                 message.setString(MESSAGE_BODY_KEY, messageBody);
479 
480                 // Pretend to be doing some work....
481                 Thread.sleep(MESSAGE_SEND_DELAY);
482 
483                 m_simulationState.onMessageProduced(m_id, messageBody);
484                 m_producer.send(message);
485             }
486         }
487 
488         private static String createMessageBody(String id, int num) {
489             return id + ":" + Integer.toString(num);
490         }
491     }
492 
493     private static final class MessageConsumerComponent extends JMSComponent implements MessageListener {
494         private final Object m_stateLock;
495         private boolean m_inOnMessage;
496 
497         private MessageConsumer m_consumer;
498         private int m_numMessagesConsumed;
499 
500         public MessageConsumerComponent(SimulationState state, String id, String url) {
501             super(state, id, url);
502             m_stateLock = new Object();
503             m_inOnMessage = false;
504         }
505 
506         protected void _initialize() throws JMSException {
507             super._initialize();
508 
509             m_consumer = m_session.createConsumer(m_queue);
510             m_consumer.setMessageListener(this);
511 
512             m_numMessagesConsumed = 0;
513         }
514 
515         protected void _cleanup() throws JMSException {
516             if (m_consumer != null) {
517                 m_consumer.close();
518             }
519 
520             super._cleanup();
521         }
522 
523         public void _run() throws InterruptedException {
524             log.info(m_id + ": WAITING for FINISHED started");
525             m_simulationState.waitForSimulationState(SimulationState.FINISHED);
526             log.info(m_id + ": WAITING for FINISHED finished");
527         }
528 
529         public int getNumberOfMessagesConsumed() {
530             return m_numMessagesConsumed;
531         }
532 
533         public void onMessage(Message msg) {
534             synchronized (m_stateLock) {
535                 if (m_inOnMessage) {
536                     log.error("Already in onMessage!!!");
537                 }
538 
539                 m_inOnMessage = true;
540             }
541 
542             try {
543                 MapMessage message = (MapMessage) msg;
544                 String messageBody = message.getString(MESSAGE_BODY_KEY);
545 
546                 m_simulationState.onMessageConsumed(m_id, messageBody);
547                 m_numMessagesConsumed++;
548 
549                 // Pretend to be doing some work....
550                 Thread.sleep(MESSAGE_RECEIVE_DELAY);
551             }
552             catch (Throwable t) {
553                 log.error("Unexpected error during onMessage: message=" + msg, t);
554             }
555             finally {
556                 synchronized (m_stateLock) {
557                     if (!m_inOnMessage) {
558                         log.error("Not already in onMessage!!!");
559                     }
560 
561                     m_inOnMessage = false;
562                 }
563             }
564         }
565     }
566 }