Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/TopicClusterTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport;
20  import javax.jms.Connection;
21  import javax.jms.DeliveryMode;
22  import javax.jms.Destination;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageListener;
27  import javax.jms.MessageProducer;
28  import javax.jms.Session;
29  import javax.jms.TextMessage;
30  import junit.framework.TestCase;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.activemq.ActiveMQConnectionFactory;
34  import org.activemq.broker.BrokerContainer;
35  import org.activemq.broker.impl.BrokerContainerImpl;
36  import org.activemq.message.ActiveMQQueue;
37  import org.activemq.message.ActiveMQTextMessage;
38  import org.activemq.message.ActiveMQTopic;
39  import org.activemq.transport.multicast.MulticastDiscoveryAgent;
40  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
41  
42  /**
43   * @version $Revision: 1.1.1.1 $
44   */
45  public class TopicClusterTest extends TestCase implements MessageListener {
46      protected Log log = LogFactory.getLog(getClass());
47      protected Destination destination;
48      protected boolean topic = true;
49      protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
50      protected static int MESSAGE_COUNT = 50;
51      protected static int NUMBER_IN_CLUSTER = 3;
52      protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
53      protected MessageProducer[] producers;
54      protected Connection[] connections;
55  
56      protected void setUp() throws Exception {
57          connections = new Connection[NUMBER_IN_CLUSTER];
58          producers = new MessageProducer[NUMBER_IN_CLUSTER];
59          Destination destination = createDestination();
60          int portStart = 50000;
61          String root = System.getProperty("activemq.store.dir");
62          try {
63              for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
64  
65                  System.setProperty("activemq.store.dir", root + "_broker_" + i);
66  
67                  connections[i] = createConnection("broker(" + i + ")");
68                  connections[i].setClientID("ClusterTest" + i);
69                  connections[i].start();
70                  Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
71                  producers[i] = session.createProducer(destination);
72                  producers[i].setDeliveryMode(deliveryMode);
73                  MessageConsumer consumer = createMessageConsumer(session,destination);
74                  consumer.setMessageListener(this);
75  
76              }
77              System.out.println("Sleeping to ensure cluster is fully connected");
78              Thread.sleep(5000);
79          } finally {
80              System.setProperty("activemq.store.dir", root);
81          }
82      }
83  
84      protected void tearDown() throws Exception {
85          if (connections != null) {
86              for (int i = 0;i < connections.length;i++) {
87                  connections[i].close();
88              }
89          }
90      }
91      
92      protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException{
93          return session.createConsumer(destination);
94      }
95  
96      protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws JMSException {
97          BrokerContainer container = new BrokerContainerImpl(brokerName);
98        
99          MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(getClass().getName());
100         container.setDiscoveryAgent(agent);
101         String url = "tcp://localhost:0";
102         container.addConnector(url);
103         container.addNetworkConnector(new DiscoveryNetworkConnector(container));
104         container.start();
105         //embedded brokers are resolved by url - so make url unique
106         //this confused me tests for a while :-)
107         return new ActiveMQConnectionFactory(container,"vm://"+brokerName);
108     }
109 
110     protected int expectedReceiveCount() {
111         return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
112     }
113 
114     protected Connection createConnection(String name) throws JMSException {
115         return createGenericClusterFactory(name).createConnection();
116     }
117 
118     protected Destination createDestination() {
119         return createDestination(getClass().getName());
120     }
121 
122     protected Destination createDestination(String name) {
123         if (topic) {
124             return new ActiveMQTopic(name);
125         }
126         else {
127             return new ActiveMQQueue(name);
128         }
129     }
130 
131 
132     /**
133      * @param msg
134      */
135     public void onMessage(Message msg) {
136         //System.out.println("GOT: " + msg);
137         receivedMessageCount.increment();
138         synchronized (receivedMessageCount) {
139             if (receivedMessageCount.get() >= expectedReceiveCount()) {
140                 receivedMessageCount.notify();
141             }
142         }
143     }
144 
145     /**
146      * @throws Exception
147      */
148     public void testSendReceive() throws Exception {
149         for (int i = 0;i < MESSAGE_COUNT;i++) {
150             TextMessage textMessage = new ActiveMQTextMessage();
151             textMessage.setText("MSG-NO:" + i);
152             for (int x = 0;x < producers.length;x++) {
153                 producers[x].send(textMessage);
154             }
155         }
156         synchronized (receivedMessageCount) {
157             if (receivedMessageCount.get() < expectedReceiveCount()) {
158                 receivedMessageCount.wait(20000);
159             }
160         }
161         //sleep a little - to check we don't get too many messages
162         Thread.sleep(2000);
163         System.err.println("GOT: " + receivedMessageCount.get());
164         assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
165     }
166 
167 }