Source code: org/activemq/bug/AMQ317Test.java
1 /**
2 *
3 * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.bug;
19
20 import org.activemq.test.EmbeddedBrokerTestSupport;
21
22 import javax.jms.Connection;
23 import javax.jms.Message;
24 import javax.jms.MessageConsumer;
25 import javax.jms.MessageListener;
26 import javax.jms.Session;
27
28 /**
29 * Test case for AMQ-317
30 *
31 * @author Jason Anderson
32 * @version $Revision: 1.1 $
33 */
34 public class AMQ317Test extends EmbeddedBrokerTestSupport {
35
36 protected long[] received = new long[1];
37 private long group = 10;
38 protected long publishSleepTime = 0;
39 protected boolean verbose;
40
41 public void testAMQ317() throws Exception {
42 Connection connection = connectionFactory.createConnection();
43 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
44 MessageConsumer consumer = session.createConsumer(destination);
45 consumer.setMessageListener(new MessageListener() {
46 public void onMessage(Message message) {
47 if (verbose) {
48 System.out.println("Received: " + message);
49 }
50
51 synchronized (received) {
52 received[0] += 1;
53 }
54
55 if (verbose) {
56 System.out.println("Now value is: " + received[0]);
57 }
58 }
59 });
60 connection.start();
61
62 final int total = 1000000;
63
64 new Thread(new Runnable() {
65 public void run() {
66 for (int i = 0; i < total; i++) {
67 template.convertAndSend(getDestinationString(), "test message");
68 if (publishSleepTime > 0) {
69 try {
70 Thread.sleep(publishSleepTime);
71 }
72 catch (InterruptedException e) {
73 System.out.println("Caught: " + e);
74 }
75 }
76 }
77 }
78 }).start();
79
80 long last = 0;
81 long cur = 0;
82
83 do {
84 synchronized (received) {
85 cur = received[0];
86 }
87 if (cur % group == 0) {
88 System.out.println("received = " + cur + " diff = " + (cur - last));
89 last = cur;
90 }
91 Thread.sleep(1000);
92 }
93 while (cur < total);
94 }
95 }