Source code: org/activemq/transport/remote/RemoteTransportTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.transport.remote;
20 import java.util.ArrayList;
21
22 import javax.jms.Connection;
23 import javax.jms.DeliveryMode;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageProducer;
27 import javax.jms.Queue;
28 import javax.jms.Session;
29 import javax.jms.Topic;
30
31 import junit.framework.TestCase;
32
33 import org.activemq.ActiveMQConnection;
34 import org.activemq.ActiveMQConnectionFactory;
35 import org.activemq.broker.BrokerContainer;
36 import org.activemq.broker.impl.BrokerContainerImpl;
37
38 /**
39 * @version $Revision: 1.1.1.1 $
40 */
41 public class RemoteTransportTest extends TestCase{
42 protected BrokerContainer remoteBroker;
43 protected Connection producerConnection;
44 protected Connection consumerConnection;
45 protected MessageConsumer consumer;
46 protected MessageProducer producer;
47 protected Session producerSession;
48 private Session consumerSession;
49
50
51 protected void setUp() throws Exception{
52 String URL = "reliable://" + ActiveMQConnection.DEFAULT_BROKER_URL;
53
54
55 remoteBroker = new BrokerContainerImpl("remoteBroker");
56 remoteBroker.addConnector(ActiveMQConnection.DEFAULT_BROKER_URL);
57 remoteBroker.start();
58
59 //ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("remote://" + URL + "?brokerName=receiver");
60 ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("remote://" + URL );
61 consumerConnection = fac.createConnection();
62 consumerConnection.setClientID("receiver");
63 consumerConnection.start();
64
65 fac = new ActiveMQConnectionFactory("remote://" + URL + "?brokerName=sender");
66 producerConnection = fac.createConnection();
67 producerConnection.start();
68
69 }
70
71 private void doDurableTestSetup() throws Exception{
72 consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
73 Topic destination = consumerSession.createTopic(getName());
74 consumer = consumerSession.createDurableSubscriber(destination, destination.toString());
75
76 producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
77 producer = producerSession.createProducer(destination);
78
79 }
80
81 private void doTopicTestSetup() throws Exception{
82 consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
83 Topic destination = consumerSession.createTopic(getName());
84 consumer = consumerSession.createConsumer(destination);
85
86 producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
87 producer = producerSession.createProducer(destination);
88 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
89 }
90
91 private void doQueueTestSetup() throws Exception{
92 consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
93 Queue destination = consumerSession.createQueue(getName());
94 consumer = consumerSession.createConsumer(destination);
95
96 producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
97 producer = producerSession.createProducer(destination);
98 }
99
100 protected void tearDown() throws Exception{
101 remoteBroker.stop();
102 consumerConnection.close();
103 producerConnection.close();
104 }
105
106
107
108 public void doTest() throws Exception {
109 Thread.sleep(3000);//make sure everything is connected
110 int COUNT = 10;
111 ArrayList list = new ArrayList(COUNT);
112 for (int i =0; i < COUNT; i++){
113 Message msg = producerSession.createTextMessage("test:"+i);
114 producer.send(msg);
115 list.add(msg);
116 }
117
118 for (int i =0; i < COUNT; i++){
119 Message msg = consumer.receive(1000);
120 assertNotNull("Receive: " + i + " NO message received!",msg);
121 Message sentMsg = (Message)list.get(i);
122 assertTrue("count " + i + " unexpected message: " +msg, msg.getJMSMessageID().equals(sentMsg.getJMSMessageID()));
123 }
124
125 // No further messages should have been delivered.
126 assertNull(consumer.receiveNoWait());
127 }
128
129 public void testDurableSendReceive() throws Exception {
130 doDurableTestSetup();
131 doTest();
132 consumerSession.close();
133 producerSession.close();
134 doDurableTestSetup();
135 doTest();
136 }
137
138 public void xtestTopicSendReceive() throws Exception {
139 doTopicTestSetup();
140 doTest();
141 }
142
143 public void xtestQueueSendReceive() throws Exception {
144 doQueueTestSetup();
145 doTest();
146 }
147
148
149 }