Source code: org/activemq/transport/TopicClusterTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.transport;
20 import javax.jms.Connection;
21 import javax.jms.DeliveryMode;
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.TextMessage;
30 import junit.framework.TestCase;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.activemq.ActiveMQConnectionFactory;
34 import org.activemq.broker.BrokerContainer;
35 import org.activemq.broker.impl.BrokerContainerImpl;
36 import org.activemq.message.ActiveMQQueue;
37 import org.activemq.message.ActiveMQTextMessage;
38 import org.activemq.message.ActiveMQTopic;
39 import org.activemq.transport.multicast.MulticastDiscoveryAgent;
40 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
41
42 /**
43 * @version $Revision: 1.1.1.1 $
44 */
45 public class TopicClusterTest extends TestCase implements MessageListener {
46 protected Log log = LogFactory.getLog(getClass());
47 protected Destination destination;
48 protected boolean topic = true;
49 protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
50 protected static int MESSAGE_COUNT = 50;
51 protected static int NUMBER_IN_CLUSTER = 3;
52 protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
53 protected MessageProducer[] producers;
54 protected Connection[] connections;
55
56 protected void setUp() throws Exception {
57 connections = new Connection[NUMBER_IN_CLUSTER];
58 producers = new MessageProducer[NUMBER_IN_CLUSTER];
59 Destination destination = createDestination();
60 int portStart = 50000;
61 String root = System.getProperty("activemq.store.dir");
62 try {
63 for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
64
65 System.setProperty("activemq.store.dir", root + "_broker_" + i);
66
67 connections[i] = createConnection("broker(" + i + ")");
68 connections[i].setClientID("ClusterTest" + i);
69 connections[i].start();
70 Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
71 producers[i] = session.createProducer(destination);
72 producers[i].setDeliveryMode(deliveryMode);
73 MessageConsumer consumer = createMessageConsumer(session,destination);
74 consumer.setMessageListener(this);
75
76 }
77 System.out.println("Sleeping to ensure cluster is fully connected");
78 Thread.sleep(5000);
79 } finally {
80 System.setProperty("activemq.store.dir", root);
81 }
82 }
83
84 protected void tearDown() throws Exception {
85 if (connections != null) {
86 for (int i = 0;i < connections.length;i++) {
87 connections[i].close();
88 }
89 }
90 }
91
92 protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException{
93 return session.createConsumer(destination);
94 }
95
96 protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws JMSException {
97 BrokerContainer container = new BrokerContainerImpl(brokerName);
98
99 MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(getClass().getName());
100 container.setDiscoveryAgent(agent);
101 String url = "tcp://localhost:0";
102 container.addConnector(url);
103 container.addNetworkConnector(new DiscoveryNetworkConnector(container));
104 container.start();
105 //embedded brokers are resolved by url - so make url unique
106 //this confused me tests for a while :-)
107 return new ActiveMQConnectionFactory(container,"vm://"+brokerName);
108 }
109
110 protected int expectedReceiveCount() {
111 return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
112 }
113
114 protected Connection createConnection(String name) throws JMSException {
115 return createGenericClusterFactory(name).createConnection();
116 }
117
118 protected Destination createDestination() {
119 return createDestination(getClass().getName());
120 }
121
122 protected Destination createDestination(String name) {
123 if (topic) {
124 return new ActiveMQTopic(name);
125 }
126 else {
127 return new ActiveMQQueue(name);
128 }
129 }
130
131
132 /**
133 * @param msg
134 */
135 public void onMessage(Message msg) {
136 //System.out.println("GOT: " + msg);
137 receivedMessageCount.increment();
138 synchronized (receivedMessageCount) {
139 if (receivedMessageCount.get() >= expectedReceiveCount()) {
140 receivedMessageCount.notify();
141 }
142 }
143 }
144
145 /**
146 * @throws Exception
147 */
148 public void testSendReceive() throws Exception {
149 for (int i = 0;i < MESSAGE_COUNT;i++) {
150 TextMessage textMessage = new ActiveMQTextMessage();
151 textMessage.setText("MSG-NO:" + i);
152 for (int x = 0;x < producers.length;x++) {
153 producers[x].send(textMessage);
154 }
155 }
156 synchronized (receivedMessageCount) {
157 if (receivedMessageCount.get() < expectedReceiveCount()) {
158 receivedMessageCount.wait(20000);
159 }
160 }
161 //sleep a little - to check we don't get too many messages
162 Thread.sleep(2000);
163 System.err.println("GOT: " + receivedMessageCount.get());
164 assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
165 }
166
167 }