Source code: org/activemq/transport/TransportChannelTestSupport.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.transport;
19
20 import EDU.oswego.cs.dl.util.concurrent.Slot;
21 import junit.framework.TestCase;
22 import org.activemq.io.WireFormat;
23 import org.activemq.io.impl.DefaultWireFormat;
24 import org.activemq.message.ActiveMQMessage;
25 import org.activemq.message.ActiveMQTopic;
26 import org.activemq.message.KeepAlive;
27 import org.activemq.message.Packet;
28 import org.activemq.message.PacketListener;
29 import org.activemq.message.Receipt;
30 import org.activemq.message.ReceiptHolder;
31 import org.activemq.util.IdGenerator;
32
33 import javax.jms.ExceptionListener;
34 import javax.jms.JMSException;
35 import java.net.URI;
36 import java.net.URISyntaxException;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.Vector;
40
41 /**
42 * @version $Revision: 1.1.1.1 $
43 */
44 public class TransportChannelTestSupport extends TestCase implements PacketListener, TransportChannelListener {
45
46 protected int TEST_SIZE = 100;
47 protected Object mutex;
48 protected TransportChannel sender;
49 protected TransportChannel receiver;
50 protected TransportServerChannel server;
51 protected ArrayList packets;
52 protected List exceptions = new Vector();
53 protected boolean sendReceipts = false;
54 private IdGenerator idGenerator = new IdGenerator();
55 protected WireFormat wireFormat = new DefaultWireFormat();
56 private boolean closeReceiver = true;
57
58 public TransportChannelTestSupport() {
59 }
60
61 public TransportChannelTestSupport(String name) {
62 super(name);
63 }
64
65 /*
66 * test for Receipt send(Packet, int)
67 */
68 public void testSendPacket() throws Exception {
69 System.out.println("Sending packets");
70
71 List tmpList = (List) packets.clone();
72 for (int i = 0; i < TEST_SIZE; i++) {
73 Packet packet = (Packet) tmpList.get(i);
74 sender.asyncSend(packet);
75 }
76 System.out.println("Sent: " + TEST_SIZE + " packets");
77
78 for (int i = 0; i < 10; i++) {
79 synchronized (mutex) {
80 if (!packets.isEmpty()) {
81 mutex.wait(500);
82 }
83 }
84 }
85 assertTrue("Packets not consumed, still have: " + packets.size() + " packet(s) unconsumed", packets.isEmpty());
86 assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
87 }
88
89 public void testAsyncSendWithReceipt() throws Exception {
90 sendReceipts = true;
91 Packet packet = new KeepAlive();
92 packet.setId((short) 1);
93 ReceiptHolder rh = sender.asyncSendWithReceipt(packet);
94 Receipt result = rh.getReceipt(5000);
95 if (result == null) {
96 fail("Should have gotten receipt");
97 }
98 }
99
100 public void testRpc() throws Exception {
101 sendReceipts = true;
102
103 List tmpList = (List) packets.clone();
104 for (int i = 0; i < TEST_SIZE; i++) {
105 Packet packet = (Packet) tmpList.get(i);
106 Receipt receipt = sender.send(packet, 4000);
107 assertTrue("Receipt should not be null!", receipt != null);
108 System.out.println("Got receipt: " + receipt + " for packet: " + packet);
109 }
110 }
111
112 public void testForceDisconnect() throws Exception {
113 // Exceptions following the disconnect are allowed to be either hidden
114 // (e.g by a reliable transport) or shown to the client. Accept either
115 // and swallow
116 // any exceptions throws by this particular test
117 sender.setExceptionListener(new ExceptionListener() {
118
119 public void onException(JMSException arg0) {
120 ;
121 }
122 });
123
124 if (receiver != null) {
125 receiver.setExceptionListener(new ExceptionListener() {
126 public void onException(JMSException e) {
127 }
128 });
129 }
130
131 // Send some data down the channel.
132 testAsyncSendWithReceipt();
133
134 // Test disconnect. This will be fatal for most channels, but reliable
135 // channels
136 // should be able to recover from it. In any case transportConnected
137 // should be false
138 // immediately after the disconnect
139 final Slot disconnectEvent = new Slot();
140 sender.addTransportStatusEventListener(new TransportStatusEventListener() {
141 public void statusChanged(TransportStatusEvent e) {
142 if (e.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
143 try {
144 disconnectEvent.offer(e, 1000);
145 }
146 catch (InterruptedException e1) {
147 }
148 }
149 }
150 });
151 sender.forceDisconnect();
152
153 assertNotNull("Should have received state change notification", disconnectEvent.poll(1000 * 30));
154 assertFalse("Should be disconnected", sender.isTransportConnected());
155 //there could ber exceptions thrown - which are valid for a force disconnect
156 //so clear them so tearDown() will pass
157 exceptions.clear();
158 }
159
160 public void consume(Packet packet) {
161 System.out.println("Received packet: " + packet);
162
163 if (sendReceipts) {
164 // lets send a receipt
165 Receipt receipt = new Receipt();
166 receipt.setId(idGenerator.getNextShortSequence());
167 receipt.setCorrelationId(packet.getId());
168 try {
169 receiver.asyncSend(receipt);
170 }
171 catch (JMSException e) {
172 logMessage("Sending receipt: " + receipt + " for packet: " + packet, e);
173 }
174 }
175 else {
176 packets.remove(packet);
177 if (packets.isEmpty()) {
178 synchronized (mutex) {
179 mutex.notify();
180 }
181 }
182 }
183 }
184
185 /**
186 * Assume that sender and receiver are created before we're invoked
187 */
188 protected void setUp() throws Exception {
189 super.setUp();
190
191 assertTrue("sender must be constructed in the TestCase before setUp() is invoked", sender != null);
192 assertTrue("receiver or server must be constructed in the TestCase before setUp() is invoked", receiver != null
193 || server != null);
194
195 mutex = new Object();
196
197 sender.setExceptionListener(new ExceptionListener() {
198
199 public void onException(JMSException ex) {
200 String message = "Sender got an exception:";
201 logMessage(message, ex);
202 }
203 });
204
205 sender.setPacketListener(new PacketListener() {
206
207 public void consume(Packet packet) {
208 System.err.println("Error - sender received a packet: " + packet);
209 exceptions.add(packet);
210 }
211
212 });
213
214 sender.setClientID("sender");
215 sender.start();
216
217 packets = new ArrayList(TEST_SIZE);
218 for (int i = 0; i < TEST_SIZE; i++) {
219 ActiveMQMessage test = new ActiveMQMessage();
220 test.setJMSMessageID("test:" + i);
221 test.setExternalMessageId(true);
222 test.setJMSDestination(new ActiveMQTopic(getName()));
223
224 packets.add(test);
225 }
226 }
227
228 protected void tearDown() throws Exception {
229 //getting exceptions when peers stop is acceptable
230 if (receiver != null) {
231 receiver.setExceptionListener(null);
232 }
233 super.tearDown();
234
235 System.out.println("Stopping sender");
236 sender.stop();
237 if (receiver == null) {
238 System.out.println("No receiver created!");
239 }
240 else {
241 if (closeReceiver) {
242 System.out.println("Stopping receiver");
243 //assertTrue("No receiver created!", receiver != null);
244 receiver.stop();
245 }
246 else {
247 System.out.println("Receiver will be closed by the server");
248 }
249 }
250 if (server != null) {
251 System.out.println("Stopping server");
252 server.stop();
253 }
254 assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
255 }
256
257 protected void configureServer() throws JMSException {
258 if (server != null) {
259 server.setTransportChannelListener(this);
260 server.start();
261 System.out.println("Server has started");
262
263 // lets wait a little for the server to startup
264 /*
265 * try { Thread.sleep(500); } catch (InterruptedException e) {
266 * System.out.println("Caught: " + e); e.printStackTrace(); }
267 */
268 }
269 }
270
271 protected void configureReceiver() {
272 receiver.setPacketListener(this);
273
274 receiver.setExceptionListener(new ExceptionListener() {
275
276 public void onException(JMSException ex) {
277 logMessage("Receiver got an exception:", ex);
278 }
279
280 });
281
282 receiver.setClientID("receiver");
283
284 try {
285 receiver.start();
286 }
287 catch (JMSException e) {
288 logMessage("Failure starting receiver: ", e);
289 }
290 System.out.println("Receiver has started");
291 }
292
293
294 protected void createSenderAndReceiver(String string) throws URISyntaxException, JMSException {
295 URI uri = new URI(string);
296
297 receiver = TransportChannelProvider.create(wireFormat, uri);
298 if (receiver != null) {
299 configureReceiver();
300 }
301
302 sender = TransportChannelProvider.create(wireFormat, uri);
303 }
304
305 protected void createSenderAndServer(String subject) throws URISyntaxException, JMSException {
306 URI uri = new URI(subject);
307 server = TransportServerChannelProvider.create(wireFormat, uri);
308 configureServer();
309 sender = TransportChannelProvider.create(wireFormat, uri);
310 }
311
312 protected void logMessage(String message, JMSException ex) {
313 System.err.println(message);
314 ex.printStackTrace();
315 Throwable t = ex.getLinkedException();
316 if (t != null && t != ex) {
317 System.out.println("Reason: " + t);
318 t.printStackTrace();
319 }
320 exceptions.add(ex);
321 }
322
323 public void addClient(TransportChannel channel) {
324 this.receiver = channel;
325 this.closeReceiver = false;
326
327 System.out.println("addClient() with receiver: " + receiver);
328
329 assertTrue("Should have received a receiver by now", receiver != null);
330
331 configureReceiver();
332 }
333
334 public void removeClient(TransportChannel channel) {
335 }
336 }