Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/usecases/SystemTest.java


1   package org.activemq.usecases;
2   
3   import junit.framework.TestCase;
4   import org.apache.jorphan.logging.LoggingManager;
5   import org.apache.log.Logger;
6   
7   import javax.jms.*;
8   
9   import java.util.Collections;
10  import java.util.HashMap;
11  import java.util.Map;
12  
13  import org.apache.derby.jdbc.EmbeddedDataSource;
14  
15  import org.activemq.ActiveMQConnection;
16  import org.activemq.ActiveMQConnectionFactory;
17  import org.activemq.message.ActiveMQMessage;
18  import org.activemq.util.IdGenerator;
19  import org.activemq.broker.impl.BrokerConnectorImpl;
20  import org.activemq.broker.impl.BrokerContainerImpl;
21  import org.activemq.io.impl.DefaultWireFormat;
22  import org.activemq.store.PersistenceAdapter;
23  import org.activemq.store.jdbc.JDBCPersistenceAdapter;
24  
25  public class SystemTest extends TestCase implements MessageListener {
26  
27      private static final Logger log = LoggingManager.getLoggerForClass();
28  
29      public static final String ACTIVEMQ_SERVER = "ActiveMQ Server";
30      public static final boolean TRANSACTED_FALSE = false;
31      public static final String TOOL_DEFAULT = "TOOL.DEFAULT";
32      public static final String LAST_MESSAGE = "LAST";
33  
34      private static int msgCounter = 0;
35      public static Map ProducerMap = Collections.synchronizedMap(new HashMap());
36  
37      private int producerCount = 0;
38      private int consumerCount = 0;
39      private int subjectCount = 0;
40      private int messageCount = 0;
41      private boolean isPersistent = true;
42      private boolean isDurable = true;
43      private boolean isTopic = true;
44      private boolean isEmbeddedBroker = true;
45      private boolean init = false;
46      private boolean testStillRunning = true;
47      private BrokerContainerImpl broker = null;
48      private String brokerUrl = null;
49      private MessageConsumer consumer = null;
50  
51      /**
52       * Default constructor
53       */
54      protected SystemTest(){
55          super();
56          brokerUrl = "tcp://localhost:6099";
57          isEmbeddedBroker = true;
58          ProducerMap.clear();
59          msgCounter = 0;
60          init = false;
61          consumer = null;
62          testStillRunning = true;
63      }
64  
65      /**
66       * Constructor
67       *
68       * @param isTopic - true when topic, false when queue.
69       * @parm isPersistent - true when the delivery mode is persistent.
70       * @param isDurable - true when the suscriber is durable(For topic only).
71       * @param producerCount - number of producers.
72       * @param consumerCount - number of consumers.
73       * @param subjectCount - number of destinations.
74       * @param messageCount - number of messages to be delivered.
75       * @param testTitle - test title/name.
76       *
77       */
78      protected SystemTest(boolean isTopic,
79                           boolean isPersistent,
80                           boolean isDurable,
81                           int producerCount,
82                           int consumerCount,
83                           int subjectCount,
84                           int messageCount,
85                           String testTitle){
86          super();
87          this.isTopic = isTopic;
88          this.isPersistent = isPersistent;
89          this.isDurable = isDurable;
90          this.producerCount = producerCount;
91          this.consumerCount = consumerCount;
92          this.subjectCount = subjectCount;
93          this.messageCount = messageCount;
94          this.testParameterSettings(testTitle);
95  
96          brokerUrl = "tcp://localhost:6099";
97          isEmbeddedBroker = true;
98          ProducerMap.clear();
99          msgCounter = 0;
100         init = false;
101         consumer = null;
102         testStillRunning = true;
103     }
104 
105     /*
106      * Producer section
107      */
108 
109     /**
110      * Creates the message producer threads.
111      */
112     protected void publish() throws JMSException {
113         String subjects[] = getSubjects();
114 
115         for (int i = 0; i < producerCount; i++) {
116             final int x = i;
117             final String subject = subjects[i % subjects.length];
118 
119             Thread thread = new Thread() {
120                 public void run() {
121                     try {
122                         publish(x, subject);
123 
124                     } catch (Exception e) {
125                         e.printStackTrace();
126                     }
127                 }
128             };
129 
130             thread.start();
131         }
132     }
133 
134     /**
135      * Creates the producer and send the messages.
136      *
137      * @param x - producer number.
138      * @param subject -  the destination where the messages will be sent.
139      */
140     protected void publish(int x, String subject) throws Exception {
141         MessageProducer publisher = null;
142         Connection connection = createConnectionFactory(brokerUrl, isEmbeddedBroker);
143 
144         if (isPersistent) {
145             IdGenerator idGenerator = new IdGenerator();
146             connection.setClientID(idGenerator.generateId());
147         }
148 
149         Session session = createSession(connection, TRANSACTED_FALSE);
150         Destination destination = createDestination(session, subject, isTopic);
151         publisher = session.createProducer(destination);
152 
153         if (isPersistent) {
154             publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
155         } else {
156             publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
157         }
158 
159         StringBuffer sb = new StringBuffer();
160         sb.append("PROD");
161         sb.append(x);
162         sb.append("#BODY");
163 
164         //Sending messages
165         for (int i = 0; i < messageCount - 1; ++i) {
166             TextMessage message = session.createTextMessage(sb.toString());
167             publisher.send(message);
168         }
169         sb.delete(0, sb.length());
170 
171         //Sending the last message
172         sb.append("PROD");
173         sb.append(x);
174         sb.append("#");
175         sb.append(LAST_MESSAGE);
176         TextMessage message = session.createTextMessage(sb.toString());
177 
178         publisher.send(message);
179     }
180 
181     /*
182      * Consumer section
183      */
184 
185     /**
186      * Generates the topic/queue destinations.
187      *
188      * @return String[] - topic/queue destination name.
189      */
190     protected String[] getSubjects() {
191         //Create the subjects.
192         String[] subjects = new String[subjectCount];
193 
194         //Appended to the subject to determine if its a queue or topic.
195         String prefix = null;
196         if (this.isTopic) {
197             prefix = ".TOPIC";
198         } else {
199             prefix = ".QUEUE";
200         }
201 
202         for (int i = 0; i < subjects.length; i++) {
203             subjects[i] = TOOL_DEFAULT + prefix + i;
204         }
205 
206         return subjects;
207     }
208 
209     /**
210      * Suscribes the consumers to the topic/queue destinations.
211      */
212     protected void subscribe() throws JMSException {
213         String subjects[] = getSubjects();
214 
215         for (int i = 0; i < consumerCount; i++) {
216             String subject = subjects[i % subjectCount];
217             subscribe(subject);
218         }
219     }
220 
221     /**
222      * Suscribes the consumer to the topic/queue specified by the subject.
223      *
224      * @param subject - the Destination where the consumer waits upon for messages.
225      */
226     protected void subscribe(String subject) throws JMSException {
227         Connection connection = createConnectionFactory(brokerUrl, isEmbeddedBroker);
228 
229         if (isDurable) {
230             IdGenerator idGenerator = new IdGenerator();
231             connection.setClientID(idGenerator.generateId());
232         }
233 
234         //Start the connection before receiving messages.
235         connection.start();
236         Session session = createSession(connection, TRANSACTED_FALSE);
237         Destination destination = createDestination(session, subject, isTopic);
238 
239         if (isDurable && isTopic) {
240             consumer = session.createDurableSubscriber((Topic) destination, getClass().getName());
241         } else {
242             consumer = session.createConsumer(destination);
243         }
244 
245         consumer.setMessageListener(this);
246     }
247 
248     /**
249      * Processes the received message.
250      *
251      * @param message - message received by the listener.
252      */
253     public void onMessage(Message message) {
254         try {
255             ActiveMQMessage amsg = (ActiveMQMessage) message;
256             TextMessage textMessage = (TextMessage) message;
257 
258             StringBuffer sb = new StringBuffer();
259             sb.append(textMessage.getText());
260             sb.append("#");
261             sb.append(amsg.getConsumerIdentifer());
262 
263             msgCounter++;
264             String strMsgCounter = String.valueOf(msgCounter);
265             ProducerMap.put(strMsgCounter, sb.toString());
266 
267         } catch (JMSException e) {
268             //log.error("Unable to force deserialize the content", e);
269         }
270     }
271 
272     /**
273      * Validates the result of the producing and consumption of messages by the server.
274      * It checks for duplicate messages, message count and order.
275      */
276     protected synchronized void timerLoop() {
277         System.out.println("MessagingSystemTest.timerLoop() * started * ");
278         Map ProducerTextMap = new HashMap();
279         Map currentProducerMap = null;
280         String producerName = null;
281         String msgBody = null;
282         String consumerName = null;
283         String mapKey = null;
284         int expectedNoOfMessages = messageCount;
285         boolean dowhile = true;
286 
287         while (dowhile) {
288             try {
289                 Thread.sleep(1000);
290             } catch (InterruptedException e) {
291                 e.printStackTrace();
292             }
293 
294             //Retrieve the map containing the received messages data
295             currentProducerMap = resetProducerMap();
296 
297             if (currentProducerMap.size() == 0) {
298                 dowhile = false;
299             }
300 
301             //Put the map values to another map for parsing.
302             for (int i = 1; i <= currentProducerMap.size(); i++) {
303                 String ProdMsg = (String) currentProducerMap.get(String.valueOf(i));
304                 producerName = ProdMsg.substring(0, ProdMsg.indexOf("#"));
305                 msgBody = ProdMsg.substring(ProdMsg.indexOf("#") + 1, ProdMsg.lastIndexOf("#"));
306                 consumerName = ProdMsg.substring(ProdMsg.lastIndexOf("#"), ProdMsg.length());
307 
308                 if (isTopic) {
309                     mapKey = consumerName + producerName;
310                 } else {
311                     mapKey = producerName;
312                 }
313 
314                 if (ProducerTextMap.containsKey(mapKey)) {
315                     //Increment the counter value
316                     Integer value = (Integer) ProducerTextMap.get(mapKey);
317                     ProducerTextMap.put(mapKey, new Integer(value.intValue() + 1));
318                 } else {
319                     //Put the Producer Name in the map
320                     ProducerTextMap.put(mapKey, new Integer(1));
321                 }
322 
323                 Integer messageCounter = (Integer) ProducerTextMap.get(mapKey);
324 
325                 //Checks for duplicate messages.
326                 if (messageCounter.intValue() > expectedNoOfMessages) {
327                     assertTrue("Should not have received duplicate messages!", messageCounter.intValue() <= expectedNoOfMessages);
328                     if (messageCounter.intValue() > expectedNoOfMessages) {
329                         System.out.println("Should not have received duplicate message!");
330                     }
331                     break;
332                 } else if (LAST_MESSAGE.equals(msgBody)) {
333 
334                     System.out.println("entered MsgBody.equals(LAST_MESSAGE)..." + mapKey +
335                                        " " + messageCounter.intValue() +"=" + expectedNoOfMessages);
336 
337                     // Validates that the messages received is equal to the number
338                     // of expected messages
339                     if (messageCounter.intValue() != expectedNoOfMessages) {
340                         System.out.println("entered messageCounter.intValue() != expectedNoOfMessages...");
341 
342                         // Checks for message order.
343                         assertTrue("Should have received messages in order!", messageCounter.intValue() == expectedNoOfMessages);
344                         if (messageCounter.intValue() != expectedNoOfMessages) {
345                             System.out.println("Should have received messages in order!");
346                         }
347                         break;
348                     } else if (currentProducerMap.size() == i) {
349                         System.out.println("MessagingSystemTest.timerLoop() says system_test_pass!!!... ");
350                         break;
351                     }
352                 }
353             }
354         }
355 
356         testStillRunning = false;
357         System.out.println("MessagingSystemTest.timerLoop() * ended * ");
358     }
359 
360     /**
361      * Returns the message entries and clears the map for another set
362      * of messages to be processed.
363      *
364      * @return Map - messages to be processed.
365      */
366     protected synchronized Map resetProducerMap() {
367         Map copy = Collections.synchronizedMap(new HashMap(ProducerMap));
368         ProducerMap.clear();
369         msgCounter = 0;
370 
371         return copy;
372     }
373 
374     /*
375      * Utility section
376      */
377 
378     /**
379      * Creates the connection to the broker.
380      *
381      * @param url            - broker url.
382      * @param embeddedBroker - true if an embedded broker will be used.
383      * @return Connection - broker connection.
384      */
385     private static Connection createConnectionFactory(String url,
386                                                      boolean embeddedBroker) throws JMSException {
387         //Used to create a session from the default MQ server ActiveMQConnectionFactory.
388         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
389 
390         if (embeddedBroker) {
391             factory.setUseEmbeddedBroker(true);
392         }
393 
394         factory.setTurboBoost(true);
395         ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
396 
397         c.getPrefetchPolicy().setQueuePrefetch(1000);
398         c.getPrefetchPolicy().setQueueBrowserPrefetch(1000);
399         c.getPrefetchPolicy().setTopicPrefetch(1000);
400         c.getPrefetchPolicy().setDurableTopicPrefetch(1000);
401 
402         return c;
403     }
404 
405     /**
406      * Creates the connection session.
407      *
408      * @param connection   - broker connection.
409      * @param isTransacted - true if the session will be session transacted.
410      *                     otherwise the the session will be using auto acknowledge.
411      * @return Session - connection session.
412      */
413     private static Session createSession(Connection connection,
414                                         boolean isTransacted) throws JMSException {
415         if (isTransacted) {
416             return connection.createSession(true, Session.SESSION_TRANSACTED);
417         } else {
418             return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
419         }
420     }
421 
422     /**
423      * Creates the session destination.
424      *
425      * @param session - connection session.
426      * @param subject - destination name.
427      * @param isTopic - true if the destination is a topic,
428      *                otherwise the destination is a queue.
429      * @return Destination - session destination.
430      */
431     private static Destination createDestination(Session session,
432                                                 String subject,
433                                                 boolean isTopic) throws JMSException {
434         if (isTopic) {
435             return session.createTopic(subject);
436         } else {
437             return session.createQueue(subject);
438         }
439     }
440 
441     /*
442      * Unit test section
443      */
444 
445     /**
446      * Sets up the resources of the unit test.
447      *
448      * @throws Exception
449      */
450     protected void setUp() throws Exception {
451         //Set up the broker
452         createBroker();
453     }
454 
455     /**
456      * Clears up the resources used in the unit test.
457      */
458     protected void tearDown() throws Exception {
459         //Shut down the broker
460         destoryBroker();
461     }
462 
463     /**
464      * Executes the unit test by running the producers and consumers.
465      * It checks for duplicate messages, message count and order.
466      */
467     protected void doTest() throws Exception {
468         System.out.println("MessagingSystemTest.doTest() * start *");
469 
470         //Set up the consumers
471         subscribe();
472 
473         System.out.println("MessagingSystemTest.doTest() after suscribe()...");
474 
475         //Set up the producers
476         publish();
477 
478         System.out.println("MessagingSystemTest.doTest() after publish()...");
479 
480         //Run the test
481         Thread timer = new Thread() {
482             public void run() {
483                 timerLoop();
484             }
485         };
486         timer.setPriority(Thread.MIN_PRIORITY);
487         timer.start();
488 
489         while (testStillRunning) {
490             try {
491                 if (Thread.currentThread() == timer) {
492                     Thread.sleep(1000);
493                 }
494             } catch (InterruptedException e) {
495                 e.printStackTrace();
496             }
497         }
498         System.out.println("MessagingSystemTest.doTest() * end *");
499     }
500 
501     /**
502      * Sets up and starts the broker.
503      */
504     private void createBroker() throws Exception {
505         broker = new BrokerContainerImpl("localhost");
506         broker.addConnector(new BrokerConnectorImpl(broker, "vm://localhost", new DefaultWireFormat()));
507         broker.setPersistenceAdapter(createPersistenceAdapter());
508         broker.start();
509     }
510 
511     /**
512      * Closes the broker.
513      */
514     private void destoryBroker() throws Exception {
515         if (broker != null) {
516             broker.stop();
517         }
518 
519         consumer = null;
520         broker = null;
521     }
522 
523     /**
524      * Returns the persistence adapter.
525      * Sets up the testing database to be used when the messages are persistent.
526      * It attempts to recreate the tables everytime the test is executed.
527      *
528      * @return PersistenceAdapter - persistence adapter.
529      */
530     protected PersistenceAdapter createPersistenceAdapter() {
531         EmbeddedDataSource ds = new EmbeddedDataSource();
532         ds.setDatabaseName("testdb");
533         if (!init) {
534             ds.setCreateDatabase("create");
535         }
536 
537         JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(ds, new DefaultWireFormat());
538 
539         if (!init) {
540             persistenceAdapter.setDropTablesOnStartup(true);
541         }
542 
543         init = true;
544 
545         return persistenceAdapter;
546     }
547 
548     /**
549      * Prints the test settings.
550      *
551      * @param strTestTitle - unit test name.
552      */
553     public void testParameterSettings(String strTestTitle) {
554         System.out.println(strTestTitle);
555         System.out.println("============================================================");
556         System.out.println("Test settings:");
557         System.out.println("isTopic=" + new Boolean(isTopic).toString());
558         System.out.println("isPersistent=" + new Boolean(isPersistent).toString());
559         System.out.println("isDurable=" + new Boolean(isDurable).toString());
560         System.out.println("producerCount=" + producerCount);
561         System.out.println("consumerCount=" + consumerCount);
562         System.out.println("subjectCount=" + subjectCount);
563         System.out.println("messageCount=" + messageCount);
564         System.out.println("");
565     }
566 }