Source code: org/activemq/transport/peer/PeerTransportTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.transport.peer;
20 import javax.jms.Connection;
21 import javax.jms.DeliveryMode;
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.TextMessage;
30 import junit.framework.TestCase;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.activemq.ActiveMQConnectionFactory;
34 import org.activemq.message.ActiveMQQueue;
35 import org.activemq.message.ActiveMQTextMessage;
36 import org.activemq.message.ActiveMQTopic;
37 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
38
39 /**
40 * @version $Revision: 1.1.1.1 $
41 */
42 public class PeerTransportTest extends TestCase implements MessageListener {
43 protected Log log = LogFactory.getLog(getClass());
44 protected Destination destination;
45 protected boolean topic = true;
46 protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
47 protected static int MESSAGE_COUNT = 50;
48 protected static int NUMBER_IN_CLUSTER = 3;
49 protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
50 protected MessageProducer[] producers;
51 protected Connection[] connections;
52
53 protected void setUp() throws Exception {
54 connections = new Connection[NUMBER_IN_CLUSTER];
55 producers = new MessageProducer[NUMBER_IN_CLUSTER];
56 Destination destination = createDestination();
57 int portStart = 50000;
58
59 String root = System.getProperty("activemq.store.dir");
60
61 for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
62 System.setProperty("activemq.store.dir", root + "_broker_" + i);
63 connections[i] = createConnection();
64 connections[i].setClientID("ClusterTest" + i);
65 connections[i].start();
66 Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
67 producers[i] = session.createProducer(destination);
68 producers[i].setDeliveryMode(deliveryMode);
69 MessageConsumer consumer = createMessageConsumer(session, destination);
70 consumer.setMessageListener(this);
71 }
72 System.out.println("Sleeping to ensure cluster is fully connected");
73 Thread.sleep(5000);
74 }
75
76 protected void tearDown() throws Exception {
77 if (connections != null) {
78 for (int i = 0;i < connections.length;i++) {
79 connections[i].close();
80 }
81 }
82 }
83
84 protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
85 return session.createConsumer(destination);
86 }
87
88 protected int expectedReceiveCount() {
89 return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
90 }
91
92 protected Connection createConnection() throws JMSException {
93 System.err.println("creating connection ....");
94 ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName());
95 return fac.createConnection();
96 }
97
98 protected Destination createDestination() {
99 return createDestination(getClass().getName());
100 }
101
102 protected Destination createDestination(String name) {
103 if (topic) {
104 return new ActiveMQTopic(name);
105 }
106 else {
107 return new ActiveMQQueue(name);
108 }
109 }
110
111 /**
112 * @param msg
113 */
114 public void onMessage(Message msg) {
115 //System.out.println("GOT: " + msg);
116 receivedMessageCount.increment();
117 synchronized (receivedMessageCount) {
118 if (receivedMessageCount.get() >= expectedReceiveCount()) {
119 receivedMessageCount.notify();
120 }
121 }
122 }
123
124 /**
125 * @throws Exception
126 */
127 public void testSendReceive() throws Exception {
128 for (int i = 0;i < MESSAGE_COUNT;i++) {
129 TextMessage textMessage = new ActiveMQTextMessage();
130 textMessage.setText("MSG-NO:" + i);
131 for (int x = 0;x < producers.length;x++) {
132 producers[x].send(textMessage);
133 }
134 }
135 synchronized (receivedMessageCount) {
136 if (receivedMessageCount.get() < expectedReceiveCount()) {
137 receivedMessageCount.wait(20000);
138 }
139 }
140 //sleep a little - to check we don't get too many messages
141 Thread.sleep(2000);
142 System.err.println("GOT: " + receivedMessageCount.get());
143 assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
144 }
145 }