Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/remote/RemoteTransportTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport.remote;
20  import java.util.ArrayList;
21  
22  import javax.jms.Connection;
23  import javax.jms.DeliveryMode;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageProducer;
27  import javax.jms.Queue;
28  import javax.jms.Session;
29  import javax.jms.Topic;
30  
31  import junit.framework.TestCase;
32  
33  import org.activemq.ActiveMQConnection;
34  import org.activemq.ActiveMQConnectionFactory;
35  import org.activemq.broker.BrokerContainer;
36  import org.activemq.broker.impl.BrokerContainerImpl;
37  
38  /**
39   * @version $Revision: 1.1.1.1 $
40   */
41  public class RemoteTransportTest extends TestCase{
42      protected BrokerContainer remoteBroker;
43      protected Connection producerConnection;
44      protected Connection consumerConnection;
45      protected MessageConsumer consumer;
46      protected MessageProducer producer;
47      protected Session producerSession;
48      private Session consumerSession;
49      
50      
51      protected void setUp() throws Exception{
52          String URL = "reliable://" + ActiveMQConnection.DEFAULT_BROKER_URL;
53         
54  
55          remoteBroker = new BrokerContainerImpl("remoteBroker");
56          remoteBroker.addConnector(ActiveMQConnection.DEFAULT_BROKER_URL);
57          remoteBroker.start();
58          
59          //ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("remote://" + URL + "?brokerName=receiver");
60          ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("remote://" + URL );
61          consumerConnection = fac.createConnection();
62          consumerConnection.setClientID("receiver");
63          consumerConnection.start();
64          
65          fac = new ActiveMQConnectionFactory("remote://" + URL + "?brokerName=sender");
66          producerConnection = fac.createConnection();
67          producerConnection.start();
68          
69      }
70      
71      private void doDurableTestSetup() throws Exception{
72          consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
73          Topic destination = consumerSession.createTopic(getName());
74          consumer = consumerSession.createDurableSubscriber(destination, destination.toString());
75         
76          producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
77          producer = producerSession.createProducer(destination);
78      
79      }
80          
81      private void doTopicTestSetup() throws Exception{
82          consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
83          Topic destination = consumerSession.createTopic(getName());
84          consumer = consumerSession.createConsumer(destination);
85         
86          producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
87          producer = producerSession.createProducer(destination);
88          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
89      }
90      
91      private void doQueueTestSetup() throws Exception{
92          consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
93          Queue destination = consumerSession.createQueue(getName());
94          consumer = consumerSession.createConsumer(destination);
95         
96          producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
97          producer = producerSession.createProducer(destination);
98      }
99      
100     protected void tearDown() throws Exception{
101         remoteBroker.stop();
102         consumerConnection.close();
103         producerConnection.close();
104     }
105     
106     
107     
108     public void doTest() throws Exception {
109         Thread.sleep(3000);//make sure everything is connected
110         int COUNT = 10;
111         ArrayList list = new ArrayList(COUNT);
112         for (int i =0; i < COUNT; i++){
113             Message msg = producerSession.createTextMessage("test:"+i);
114             producer.send(msg);
115             list.add(msg);
116         }
117         
118         for (int i =0; i < COUNT; i++){
119             Message msg = consumer.receive(1000);
120             assertNotNull("Receive: " + i + " NO message received!",msg);
121             Message sentMsg = (Message)list.get(i);
122             assertTrue("count " + i + " unexpected message: " +msg, msg.getJMSMessageID().equals(sentMsg.getJMSMessageID()));
123         }
124         
125         // No further messages should have been delivered.
126         assertNull(consumer.receiveNoWait());
127     }
128     
129     public void testDurableSendReceive() throws Exception {
130         doDurableTestSetup();
131         doTest();
132         consumerSession.close();
133         producerSession.close();
134         doDurableTestSetup();
135         doTest();
136     }
137     
138     public void xtestTopicSendReceive() throws Exception {
139         doTopicTestSetup();
140         doTest();
141     }
142     
143     public void xtestQueueSendReceive() throws Exception {
144         doQueueTestSetup();
145         doTest();
146     }
147     
148     
149 }