Source code: org/activemq/usecases/SystemTest.java
1 package org.activemq.usecases;
2
3 import junit.framework.TestCase;
4 import org.apache.jorphan.logging.LoggingManager;
5 import org.apache.log.Logger;
6
7 import javax.jms.*;
8
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.Map;
12
13 import org.apache.derby.jdbc.EmbeddedDataSource;
14
15 import org.activemq.ActiveMQConnection;
16 import org.activemq.ActiveMQConnectionFactory;
17 import org.activemq.message.ActiveMQMessage;
18 import org.activemq.util.IdGenerator;
19 import org.activemq.broker.impl.BrokerConnectorImpl;
20 import org.activemq.broker.impl.BrokerContainerImpl;
21 import org.activemq.io.impl.DefaultWireFormat;
22 import org.activemq.store.PersistenceAdapter;
23 import org.activemq.store.jdbc.JDBCPersistenceAdapter;
24
25 public class SystemTest extends TestCase implements MessageListener {
26
27 private static final Logger log = LoggingManager.getLoggerForClass();
28
29 public static final String ACTIVEMQ_SERVER = "ActiveMQ Server";
30 public static final boolean TRANSACTED_FALSE = false;
31 public static final String TOOL_DEFAULT = "TOOL.DEFAULT";
32 public static final String LAST_MESSAGE = "LAST";
33
34 private static int msgCounter = 0;
35 public static Map ProducerMap = Collections.synchronizedMap(new HashMap());
36
37 private int producerCount = 0;
38 private int consumerCount = 0;
39 private int subjectCount = 0;
40 private int messageCount = 0;
41 private boolean isPersistent = true;
42 private boolean isDurable = true;
43 private boolean isTopic = true;
44 private boolean isEmbeddedBroker = true;
45 private boolean init = false;
46 private boolean testStillRunning = true;
47 private BrokerContainerImpl broker = null;
48 private String brokerUrl = null;
49 private MessageConsumer consumer = null;
50
51 /**
52 * Default constructor
53 */
54 protected SystemTest(){
55 super();
56 brokerUrl = "tcp://localhost:6099";
57 isEmbeddedBroker = true;
58 ProducerMap.clear();
59 msgCounter = 0;
60 init = false;
61 consumer = null;
62 testStillRunning = true;
63 }
64
65 /**
66 * Constructor
67 *
68 * @param isTopic - true when topic, false when queue.
69 * @parm isPersistent - true when the delivery mode is persistent.
70 * @param isDurable - true when the suscriber is durable(For topic only).
71 * @param producerCount - number of producers.
72 * @param consumerCount - number of consumers.
73 * @param subjectCount - number of destinations.
74 * @param messageCount - number of messages to be delivered.
75 * @param testTitle - test title/name.
76 *
77 */
78 protected SystemTest(boolean isTopic,
79 boolean isPersistent,
80 boolean isDurable,
81 int producerCount,
82 int consumerCount,
83 int subjectCount,
84 int messageCount,
85 String testTitle){
86 super();
87 this.isTopic = isTopic;
88 this.isPersistent = isPersistent;
89 this.isDurable = isDurable;
90 this.producerCount = producerCount;
91 this.consumerCount = consumerCount;
92 this.subjectCount = subjectCount;
93 this.messageCount = messageCount;
94 this.testParameterSettings(testTitle);
95
96 brokerUrl = "tcp://localhost:6099";
97 isEmbeddedBroker = true;
98 ProducerMap.clear();
99 msgCounter = 0;
100 init = false;
101 consumer = null;
102 testStillRunning = true;
103 }
104
105 /*
106 * Producer section
107 */
108
109 /**
110 * Creates the message producer threads.
111 */
112 protected void publish() throws JMSException {
113 String subjects[] = getSubjects();
114
115 for (int i = 0; i < producerCount; i++) {
116 final int x = i;
117 final String subject = subjects[i % subjects.length];
118
119 Thread thread = new Thread() {
120 public void run() {
121 try {
122 publish(x, subject);
123
124 } catch (Exception e) {
125 e.printStackTrace();
126 }
127 }
128 };
129
130 thread.start();
131 }
132 }
133
134 /**
135 * Creates the producer and send the messages.
136 *
137 * @param x - producer number.
138 * @param subject - the destination where the messages will be sent.
139 */
140 protected void publish(int x, String subject) throws Exception {
141 MessageProducer publisher = null;
142 Connection connection = createConnectionFactory(brokerUrl, isEmbeddedBroker);
143
144 if (isPersistent) {
145 IdGenerator idGenerator = new IdGenerator();
146 connection.setClientID(idGenerator.generateId());
147 }
148
149 Session session = createSession(connection, TRANSACTED_FALSE);
150 Destination destination = createDestination(session, subject, isTopic);
151 publisher = session.createProducer(destination);
152
153 if (isPersistent) {
154 publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
155 } else {
156 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
157 }
158
159 StringBuffer sb = new StringBuffer();
160 sb.append("PROD");
161 sb.append(x);
162 sb.append("#BODY");
163
164 //Sending messages
165 for (int i = 0; i < messageCount - 1; ++i) {
166 TextMessage message = session.createTextMessage(sb.toString());
167 publisher.send(message);
168 }
169 sb.delete(0, sb.length());
170
171 //Sending the last message
172 sb.append("PROD");
173 sb.append(x);
174 sb.append("#");
175 sb.append(LAST_MESSAGE);
176 TextMessage message = session.createTextMessage(sb.toString());
177
178 publisher.send(message);
179 }
180
181 /*
182 * Consumer section
183 */
184
185 /**
186 * Generates the topic/queue destinations.
187 *
188 * @return String[] - topic/queue destination name.
189 */
190 protected String[] getSubjects() {
191 //Create the subjects.
192 String[] subjects = new String[subjectCount];
193
194 //Appended to the subject to determine if its a queue or topic.
195 String prefix = null;
196 if (this.isTopic) {
197 prefix = ".TOPIC";
198 } else {
199 prefix = ".QUEUE";
200 }
201
202 for (int i = 0; i < subjects.length; i++) {
203 subjects[i] = TOOL_DEFAULT + prefix + i;
204 }
205
206 return subjects;
207 }
208
209 /**
210 * Suscribes the consumers to the topic/queue destinations.
211 */
212 protected void subscribe() throws JMSException {
213 String subjects[] = getSubjects();
214
215 for (int i = 0; i < consumerCount; i++) {
216 String subject = subjects[i % subjectCount];
217 subscribe(subject);
218 }
219 }
220
221 /**
222 * Suscribes the consumer to the topic/queue specified by the subject.
223 *
224 * @param subject - the Destination where the consumer waits upon for messages.
225 */
226 protected void subscribe(String subject) throws JMSException {
227 Connection connection = createConnectionFactory(brokerUrl, isEmbeddedBroker);
228
229 if (isDurable) {
230 IdGenerator idGenerator = new IdGenerator();
231 connection.setClientID(idGenerator.generateId());
232 }
233
234 //Start the connection before receiving messages.
235 connection.start();
236 Session session = createSession(connection, TRANSACTED_FALSE);
237 Destination destination = createDestination(session, subject, isTopic);
238
239 if (isDurable && isTopic) {
240 consumer = session.createDurableSubscriber((Topic) destination, getClass().getName());
241 } else {
242 consumer = session.createConsumer(destination);
243 }
244
245 consumer.setMessageListener(this);
246 }
247
248 /**
249 * Processes the received message.
250 *
251 * @param message - message received by the listener.
252 */
253 public void onMessage(Message message) {
254 try {
255 ActiveMQMessage amsg = (ActiveMQMessage) message;
256 TextMessage textMessage = (TextMessage) message;
257
258 StringBuffer sb = new StringBuffer();
259 sb.append(textMessage.getText());
260 sb.append("#");
261 sb.append(amsg.getConsumerIdentifer());
262
263 msgCounter++;
264 String strMsgCounter = String.valueOf(msgCounter);
265 ProducerMap.put(strMsgCounter, sb.toString());
266
267 } catch (JMSException e) {
268 //log.error("Unable to force deserialize the content", e);
269 }
270 }
271
272 /**
273 * Validates the result of the producing and consumption of messages by the server.
274 * It checks for duplicate messages, message count and order.
275 */
276 protected synchronized void timerLoop() {
277 System.out.println("MessagingSystemTest.timerLoop() * started * ");
278 Map ProducerTextMap = new HashMap();
279 Map currentProducerMap = null;
280 String producerName = null;
281 String msgBody = null;
282 String consumerName = null;
283 String mapKey = null;
284 int expectedNoOfMessages = messageCount;
285 boolean dowhile = true;
286
287 while (dowhile) {
288 try {
289 Thread.sleep(1000);
290 } catch (InterruptedException e) {
291 e.printStackTrace();
292 }
293
294 //Retrieve the map containing the received messages data
295 currentProducerMap = resetProducerMap();
296
297 if (currentProducerMap.size() == 0) {
298 dowhile = false;
299 }
300
301 //Put the map values to another map for parsing.
302 for (int i = 1; i <= currentProducerMap.size(); i++) {
303 String ProdMsg = (String) currentProducerMap.get(String.valueOf(i));
304 producerName = ProdMsg.substring(0, ProdMsg.indexOf("#"));
305 msgBody = ProdMsg.substring(ProdMsg.indexOf("#") + 1, ProdMsg.lastIndexOf("#"));
306 consumerName = ProdMsg.substring(ProdMsg.lastIndexOf("#"), ProdMsg.length());
307
308 if (isTopic) {
309 mapKey = consumerName + producerName;
310 } else {
311 mapKey = producerName;
312 }
313
314 if (ProducerTextMap.containsKey(mapKey)) {
315 //Increment the counter value
316 Integer value = (Integer) ProducerTextMap.get(mapKey);
317 ProducerTextMap.put(mapKey, new Integer(value.intValue() + 1));
318 } else {
319 //Put the Producer Name in the map
320 ProducerTextMap.put(mapKey, new Integer(1));
321 }
322
323 Integer messageCounter = (Integer) ProducerTextMap.get(mapKey);
324
325 //Checks for duplicate messages.
326 if (messageCounter.intValue() > expectedNoOfMessages) {
327 assertTrue("Should not have received duplicate messages!", messageCounter.intValue() <= expectedNoOfMessages);
328 if (messageCounter.intValue() > expectedNoOfMessages) {
329 System.out.println("Should not have received duplicate message!");
330 }
331 break;
332 } else if (LAST_MESSAGE.equals(msgBody)) {
333
334 System.out.println("entered MsgBody.equals(LAST_MESSAGE)..." + mapKey +
335 " " + messageCounter.intValue() +"=" + expectedNoOfMessages);
336
337 // Validates that the messages received is equal to the number
338 // of expected messages
339 if (messageCounter.intValue() != expectedNoOfMessages) {
340 System.out.println("entered messageCounter.intValue() != expectedNoOfMessages...");
341
342 // Checks for message order.
343 assertTrue("Should have received messages in order!", messageCounter.intValue() == expectedNoOfMessages);
344 if (messageCounter.intValue() != expectedNoOfMessages) {
345 System.out.println("Should have received messages in order!");
346 }
347 break;
348 } else if (currentProducerMap.size() == i) {
349 System.out.println("MessagingSystemTest.timerLoop() says system_test_pass!!!... ");
350 break;
351 }
352 }
353 }
354 }
355
356 testStillRunning = false;
357 System.out.println("MessagingSystemTest.timerLoop() * ended * ");
358 }
359
360 /**
361 * Returns the message entries and clears the map for another set
362 * of messages to be processed.
363 *
364 * @return Map - messages to be processed.
365 */
366 protected synchronized Map resetProducerMap() {
367 Map copy = Collections.synchronizedMap(new HashMap(ProducerMap));
368 ProducerMap.clear();
369 msgCounter = 0;
370
371 return copy;
372 }
373
374 /*
375 * Utility section
376 */
377
378 /**
379 * Creates the connection to the broker.
380 *
381 * @param url - broker url.
382 * @param embeddedBroker - true if an embedded broker will be used.
383 * @return Connection - broker connection.
384 */
385 private static Connection createConnectionFactory(String url,
386 boolean embeddedBroker) throws JMSException {
387 //Used to create a session from the default MQ server ActiveMQConnectionFactory.
388 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
389
390 if (embeddedBroker) {
391 factory.setUseEmbeddedBroker(true);
392 }
393
394 factory.setTurboBoost(true);
395 ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
396
397 c.getPrefetchPolicy().setQueuePrefetch(1000);
398 c.getPrefetchPolicy().setQueueBrowserPrefetch(1000);
399 c.getPrefetchPolicy().setTopicPrefetch(1000);
400 c.getPrefetchPolicy().setDurableTopicPrefetch(1000);
401
402 return c;
403 }
404
405 /**
406 * Creates the connection session.
407 *
408 * @param connection - broker connection.
409 * @param isTransacted - true if the session will be session transacted.
410 * otherwise the the session will be using auto acknowledge.
411 * @return Session - connection session.
412 */
413 private static Session createSession(Connection connection,
414 boolean isTransacted) throws JMSException {
415 if (isTransacted) {
416 return connection.createSession(true, Session.SESSION_TRANSACTED);
417 } else {
418 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
419 }
420 }
421
422 /**
423 * Creates the session destination.
424 *
425 * @param session - connection session.
426 * @param subject - destination name.
427 * @param isTopic - true if the destination is a topic,
428 * otherwise the destination is a queue.
429 * @return Destination - session destination.
430 */
431 private static Destination createDestination(Session session,
432 String subject,
433 boolean isTopic) throws JMSException {
434 if (isTopic) {
435 return session.createTopic(subject);
436 } else {
437 return session.createQueue(subject);
438 }
439 }
440
441 /*
442 * Unit test section
443 */
444
445 /**
446 * Sets up the resources of the unit test.
447 *
448 * @throws Exception
449 */
450 protected void setUp() throws Exception {
451 //Set up the broker
452 createBroker();
453 }
454
455 /**
456 * Clears up the resources used in the unit test.
457 */
458 protected void tearDown() throws Exception {
459 //Shut down the broker
460 destoryBroker();
461 }
462
463 /**
464 * Executes the unit test by running the producers and consumers.
465 * It checks for duplicate messages, message count and order.
466 */
467 protected void doTest() throws Exception {
468 System.out.println("MessagingSystemTest.doTest() * start *");
469
470 //Set up the consumers
471 subscribe();
472
473 System.out.println("MessagingSystemTest.doTest() after suscribe()...");
474
475 //Set up the producers
476 publish();
477
478 System.out.println("MessagingSystemTest.doTest() after publish()...");
479
480 //Run the test
481 Thread timer = new Thread() {
482 public void run() {
483 timerLoop();
484 }
485 };
486 timer.setPriority(Thread.MIN_PRIORITY);
487 timer.start();
488
489 while (testStillRunning) {
490 try {
491 if (Thread.currentThread() == timer) {
492 Thread.sleep(1000);
493 }
494 } catch (InterruptedException e) {
495 e.printStackTrace();
496 }
497 }
498 System.out.println("MessagingSystemTest.doTest() * end *");
499 }
500
501 /**
502 * Sets up and starts the broker.
503 */
504 private void createBroker() throws Exception {
505 broker = new BrokerContainerImpl("localhost");
506 broker.addConnector(new BrokerConnectorImpl(broker, "vm://localhost", new DefaultWireFormat()));
507 broker.setPersistenceAdapter(createPersistenceAdapter());
508 broker.start();
509 }
510
511 /**
512 * Closes the broker.
513 */
514 private void destoryBroker() throws Exception {
515 if (broker != null) {
516 broker.stop();
517 }
518
519 consumer = null;
520 broker = null;
521 }
522
523 /**
524 * Returns the persistence adapter.
525 * Sets up the testing database to be used when the messages are persistent.
526 * It attempts to recreate the tables everytime the test is executed.
527 *
528 * @return PersistenceAdapter - persistence adapter.
529 */
530 protected PersistenceAdapter createPersistenceAdapter() {
531 EmbeddedDataSource ds = new EmbeddedDataSource();
532 ds.setDatabaseName("testdb");
533 if (!init) {
534 ds.setCreateDatabase("create");
535 }
536
537 JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(ds, new DefaultWireFormat());
538
539 if (!init) {
540 persistenceAdapter.setDropTablesOnStartup(true);
541 }
542
543 init = true;
544
545 return persistenceAdapter;
546 }
547
548 /**
549 * Prints the test settings.
550 *
551 * @param strTestTitle - unit test name.
552 */
553 public void testParameterSettings(String strTestTitle) {
554 System.out.println(strTestTitle);
555 System.out.println("============================================================");
556 System.out.println("Test settings:");
557 System.out.println("isTopic=" + new Boolean(isTopic).toString());
558 System.out.println("isPersistent=" + new Boolean(isPersistent).toString());
559 System.out.println("isDurable=" + new Boolean(isDurable).toString());
560 System.out.println("producerCount=" + producerCount);
561 System.out.println("consumerCount=" + consumerCount);
562 System.out.println("subjectCount=" + subjectCount);
563 System.out.println("messageCount=" + messageCount);
564 System.out.println("");
565 }
566 }