Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/TransportChannelTestSupport.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.transport;
19  
20  import EDU.oswego.cs.dl.util.concurrent.Slot;
21  import junit.framework.TestCase;
22  import org.activemq.io.WireFormat;
23  import org.activemq.io.impl.DefaultWireFormat;
24  import org.activemq.message.ActiveMQMessage;
25  import org.activemq.message.ActiveMQTopic;
26  import org.activemq.message.KeepAlive;
27  import org.activemq.message.Packet;
28  import org.activemq.message.PacketListener;
29  import org.activemq.message.Receipt;
30  import org.activemq.message.ReceiptHolder;
31  import org.activemq.util.IdGenerator;
32  
33  import javax.jms.ExceptionListener;
34  import javax.jms.JMSException;
35  import java.net.URI;
36  import java.net.URISyntaxException;
37  import java.util.ArrayList;
38  import java.util.List;
39  import java.util.Vector;
40  
41  /**
42   * @version $Revision: 1.1.1.1 $
43   */
44  public class TransportChannelTestSupport extends TestCase implements PacketListener, TransportChannelListener {
45  
46      protected int TEST_SIZE = 100;
47      protected Object mutex;
48      protected TransportChannel sender;
49      protected TransportChannel receiver;
50      protected TransportServerChannel server;
51      protected ArrayList packets;
52      protected List exceptions = new Vector();
53      protected boolean sendReceipts = false;
54      private IdGenerator idGenerator = new IdGenerator();
55      protected WireFormat wireFormat = new DefaultWireFormat();
56      private boolean closeReceiver = true;
57  
58      public TransportChannelTestSupport() {
59      }
60  
61      public TransportChannelTestSupport(String name) {
62          super(name);
63      }
64  
65      /*
66       * test for Receipt send(Packet, int)
67       */
68      public void testSendPacket() throws Exception {
69          System.out.println("Sending packets");
70  
71          List tmpList = (List) packets.clone();
72          for (int i = 0; i < TEST_SIZE; i++) {
73              Packet packet = (Packet) tmpList.get(i);
74              sender.asyncSend(packet);
75          }
76          System.out.println("Sent: " + TEST_SIZE + " packets");
77  
78          for (int i = 0; i < 10; i++) {
79              synchronized (mutex) {
80                  if (!packets.isEmpty()) {
81                      mutex.wait(500);
82                  }
83              }
84          }
85          assertTrue("Packets not consumed, still have: " + packets.size() + " packet(s) unconsumed", packets.isEmpty());
86          assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
87      }
88  
89      public void testAsyncSendWithReceipt() throws Exception {
90          sendReceipts = true;
91          Packet packet = new KeepAlive();
92          packet.setId((short) 1);
93          ReceiptHolder rh = sender.asyncSendWithReceipt(packet);
94          Receipt result = rh.getReceipt(5000);
95          if (result == null) {
96              fail("Should have gotten receipt");
97          }
98      }
99  
100     public void testRpc() throws Exception {
101         sendReceipts = true;
102 
103         List tmpList = (List) packets.clone();
104         for (int i = 0; i < TEST_SIZE; i++) {
105             Packet packet = (Packet) tmpList.get(i);
106             Receipt receipt = sender.send(packet, 4000);
107             assertTrue("Receipt should not be null!", receipt != null);
108             System.out.println("Got receipt: " + receipt + " for packet: " + packet);
109         }
110     }
111 
112     public void testForceDisconnect() throws Exception {
113 // Exceptions following the disconnect are allowed to be either hidden
114 // (e.g by a reliable transport) or shown to the client. Accept either
115 // and swallow
116 // any exceptions throws by this particular test
117         sender.setExceptionListener(new ExceptionListener() {
118 
119             public void onException(JMSException arg0) {
120                 ;
121             }
122         });
123 
124         if (receiver != null) {
125             receiver.setExceptionListener(new ExceptionListener() {
126                 public void onException(JMSException e) {
127                 }
128             });
129         }
130 
131 // Send some data down the channel.
132         testAsyncSendWithReceipt();
133 
134 // Test disconnect. This will be fatal for most channels, but reliable
135 // channels
136 // should be able to recover from it. In any case transportConnected
137 // should be false
138 // immediately after the disconnect
139         final Slot disconnectEvent = new Slot();
140         sender.addTransportStatusEventListener(new TransportStatusEventListener() {
141             public void statusChanged(TransportStatusEvent e) {
142                 if (e.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
143                     try {
144                         disconnectEvent.offer(e, 1000);
145                     }
146                     catch (InterruptedException e1) {
147                     }
148                 }
149             }
150         });
151         sender.forceDisconnect();
152 
153         assertNotNull("Should have received state change notification", disconnectEvent.poll(1000 * 30));
154         assertFalse("Should be disconnected", sender.isTransportConnected());
155 //there could ber exceptions thrown - which are valid for a force disconnect
156 //so clear them so tearDown() will pass
157         exceptions.clear();
158     }
159 
160     public void consume(Packet packet) {
161         System.out.println("Received packet: " + packet);
162 
163         if (sendReceipts) {
164             // lets send a receipt
165             Receipt receipt = new Receipt();
166             receipt.setId(idGenerator.getNextShortSequence());
167             receipt.setCorrelationId(packet.getId());
168             try {
169                 receiver.asyncSend(receipt);
170             }
171             catch (JMSException e) {
172                 logMessage("Sending receipt: " + receipt + " for packet: " + packet, e);
173             }
174         }
175         else {
176             packets.remove(packet);
177             if (packets.isEmpty()) {
178                 synchronized (mutex) {
179                     mutex.notify();
180                 }
181             }
182         }
183     }
184 
185     /**
186      * Assume that sender and receiver are created before we're invoked
187      */
188     protected void setUp() throws Exception {
189         super.setUp();
190 
191         assertTrue("sender must be constructed in the TestCase before setUp() is invoked", sender != null);
192         assertTrue("receiver or server must be constructed in the TestCase before setUp() is invoked", receiver != null
193                 || server != null);
194 
195         mutex = new Object();
196 
197         sender.setExceptionListener(new ExceptionListener() {
198 
199             public void onException(JMSException ex) {
200                 String message = "Sender got an exception:";
201                 logMessage(message, ex);
202             }
203         });
204 
205         sender.setPacketListener(new PacketListener() {
206 
207             public void consume(Packet packet) {
208                 System.err.println("Error - sender received a packet: " + packet);
209                 exceptions.add(packet);
210             }
211 
212         });
213 
214         sender.setClientID("sender");
215         sender.start();
216 
217         packets = new ArrayList(TEST_SIZE);
218         for (int i = 0; i < TEST_SIZE; i++) {
219             ActiveMQMessage test = new ActiveMQMessage();
220             test.setJMSMessageID("test:" + i);
221             test.setExternalMessageId(true);
222             test.setJMSDestination(new ActiveMQTopic(getName()));
223 
224             packets.add(test);
225         }
226     }
227 
228     protected void tearDown() throws Exception {
229         //getting exceptions when peers stop is acceptable
230         if (receiver != null) {
231             receiver.setExceptionListener(null);
232         }
233         super.tearDown();
234 
235         System.out.println("Stopping sender");
236         sender.stop();
237         if (receiver == null) {
238             System.out.println("No receiver created!");
239         }
240         else {
241             if (closeReceiver) {
242                 System.out.println("Stopping receiver");
243                 //assertTrue("No receiver created!", receiver != null);
244                 receiver.stop();
245             }
246             else {
247                 System.out.println("Receiver will be closed by the server");
248             }
249         }
250         if (server != null) {
251             System.out.println("Stopping server");
252             server.stop();
253         }
254         assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
255     }
256 
257     protected void configureServer() throws JMSException {
258         if (server != null) {
259             server.setTransportChannelListener(this);
260             server.start();
261             System.out.println("Server has started");
262 
263             // lets wait a little for the server to startup
264             /*
265              * try { Thread.sleep(500); } catch (InterruptedException e) {
266              * System.out.println("Caught: " + e); e.printStackTrace(); }
267              */
268         }
269     }
270 
271     protected void configureReceiver() {
272         receiver.setPacketListener(this);
273 
274         receiver.setExceptionListener(new ExceptionListener() {
275 
276             public void onException(JMSException ex) {
277                 logMessage("Receiver got an exception:", ex);
278             }
279 
280         });
281 
282         receiver.setClientID("receiver");
283 
284         try {
285             receiver.start();
286         }
287         catch (JMSException e) {
288             logMessage("Failure starting receiver: ", e);
289         }
290         System.out.println("Receiver has started");
291     }
292 
293 
294     protected void createSenderAndReceiver(String string) throws URISyntaxException, JMSException {
295         URI uri = new URI(string);
296 
297         receiver = TransportChannelProvider.create(wireFormat, uri);
298         if (receiver != null) {
299             configureReceiver();
300         }
301 
302         sender = TransportChannelProvider.create(wireFormat, uri);
303     }
304 
305     protected void createSenderAndServer(String subject) throws URISyntaxException, JMSException {
306         URI uri = new URI(subject);
307         server = TransportServerChannelProvider.create(wireFormat, uri);
308         configureServer();
309         sender = TransportChannelProvider.create(wireFormat, uri);
310     }
311 
312     protected void logMessage(String message, JMSException ex) {
313         System.err.println(message);
314         ex.printStackTrace();
315         Throwable t = ex.getLinkedException();
316         if (t != null && t != ex) {
317             System.out.println("Reason: " + t);
318             t.printStackTrace();
319         }
320         exceptions.add(ex);
321     }
322 
323     public void addClient(TransportChannel channel) {
324         this.receiver = channel;
325         this.closeReceiver = false;
326 
327         System.out.println("addClient() with receiver: " + receiver);
328 
329         assertTrue("Should have received a receiver by now", receiver != null);
330 
331         configureReceiver();
332     }
333 
334     public void removeClient(TransportChannel channel) {
335     }
336 }