Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/peer/PeerTransportTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport.peer;
20  import javax.jms.Connection;
21  import javax.jms.DeliveryMode;
22  import javax.jms.Destination;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageListener;
27  import javax.jms.MessageProducer;
28  import javax.jms.Session;
29  import javax.jms.TextMessage;
30  import junit.framework.TestCase;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.activemq.ActiveMQConnectionFactory;
34  import org.activemq.message.ActiveMQQueue;
35  import org.activemq.message.ActiveMQTextMessage;
36  import org.activemq.message.ActiveMQTopic;
37  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
38  
39  /**
40   * @version $Revision: 1.1.1.1 $
41   */
42  public class PeerTransportTest extends TestCase implements MessageListener {
43      protected Log log = LogFactory.getLog(getClass());
44      protected Destination destination;
45      protected boolean topic = true;
46      protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
47      protected static int MESSAGE_COUNT = 50;
48      protected static int NUMBER_IN_CLUSTER = 3;
49      protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
50      protected MessageProducer[] producers;
51      protected Connection[] connections;
52  
53      protected void setUp() throws Exception {
54          connections = new Connection[NUMBER_IN_CLUSTER];
55          producers = new MessageProducer[NUMBER_IN_CLUSTER];
56          Destination destination = createDestination();
57          int portStart = 50000;
58  
59          String root = System.getProperty("activemq.store.dir");
60  
61          for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
62              System.setProperty("activemq.store.dir", root + "_broker_" + i);
63              connections[i] = createConnection();
64              connections[i].setClientID("ClusterTest" + i);
65              connections[i].start();
66              Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
67              producers[i] = session.createProducer(destination);
68              producers[i].setDeliveryMode(deliveryMode);
69              MessageConsumer consumer = createMessageConsumer(session, destination);
70              consumer.setMessageListener(this);
71          }
72          System.out.println("Sleeping to ensure cluster is fully connected");
73          Thread.sleep(5000);
74      }
75  
76      protected void tearDown() throws Exception {
77          if (connections != null) {
78              for (int i = 0;i < connections.length;i++) {
79                  connections[i].close();
80              }
81          }
82      }
83  
84      protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
85          return session.createConsumer(destination);
86      }
87  
88      protected int expectedReceiveCount() {
89          return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
90      }
91  
92      protected Connection createConnection() throws JMSException {
93          System.err.println("creating connection ....");
94          ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName());
95          return fac.createConnection();
96      }
97  
98      protected Destination createDestination() {
99          return createDestination(getClass().getName());
100     }
101 
102     protected Destination createDestination(String name) {
103         if (topic) {
104             return new ActiveMQTopic(name);
105         }
106         else {
107             return new ActiveMQQueue(name);
108         }
109     }
110 
111     /**
112      * @param msg
113      */
114     public void onMessage(Message msg) {
115         //System.out.println("GOT: " + msg);
116         receivedMessageCount.increment();
117         synchronized (receivedMessageCount) {
118             if (receivedMessageCount.get() >= expectedReceiveCount()) {
119                 receivedMessageCount.notify();
120             }
121         }
122     }
123 
124     /**
125      * @throws Exception
126      */
127     public void testSendReceive() throws Exception {
128         for (int i = 0;i < MESSAGE_COUNT;i++) {
129             TextMessage textMessage = new ActiveMQTextMessage();
130             textMessage.setText("MSG-NO:" + i);
131             for (int x = 0;x < producers.length;x++) {
132                 producers[x].send(textMessage);
133             }
134         }
135         synchronized (receivedMessageCount) {
136             if (receivedMessageCount.get() < expectedReceiveCount()) {
137                 receivedMessageCount.wait(20000);
138             }
139         }
140         //sleep a little - to check we don't get too many messages
141         Thread.sleep(2000);
142         System.err.println("GOT: " + receivedMessageCount.get());
143         assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
144     }
145 }