Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/TransportChannelSupport.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport;
20  import java.net.URI;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.Map;
24  
25  import javax.jms.ExceptionListener;
26  import javax.jms.JMSException;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.activemq.UnsupportedWireFormatException;
31  import org.activemq.broker.BrokerConnector;
32  import org.activemq.io.WireFormat;
33  import org.activemq.message.Packet;
34  import org.activemq.message.PacketListener;
35  import org.activemq.message.Receipt;
36  import org.activemq.message.ReceiptHolder;
37  import org.activemq.message.WireFormatInfo;
38  import org.activemq.util.ExecutorHelper;
39  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
40  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
41  import EDU.oswego.cs.dl.util.concurrent.Executor;
42  
43  /**
44   * Some basic functionality, common across most transport implementations of channels
45   * 
46   * @version $Revision: 1.1.1.1 $
47   */
48  public abstract class TransportChannelSupport implements TransportChannel {
49      private static final Log log = LogFactory.getLog(TransportChannelSupport.class);
50      private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
51      private ConcurrentHashMap requestMap = new ConcurrentHashMap();
52      private PacketListener packetListener;
53      private ExceptionListener exceptionListener;
54      private String clientID;
55      private TransportChannelListener transportChannelListener;
56      private long lastReceiptTimstamp = 0;
57      private boolean serverSide;
58      protected boolean pendingStop = false;
59      protected boolean transportConnected = true;
60      protected WireFormat currentWireFormat;
61      protected boolean cachingEnabled = false;
62      protected boolean noDelay = false;
63      protected boolean usedInternally = false; //denotes if transport is used by an internal Connection
64      
65      
66      
67      protected TransportChannelSupport(){
68      }
69      
70      protected TransportChannelSupport(WireFormat wf){
71          this.currentWireFormat = wf;
72      }
73  
74      /**
75       * Give the TransportChannel a hint it's about to stop
76       * 
77       * @param pendingStop
78       */
79      public void setPendingStop(boolean pendingStop) {
80          this.pendingStop = pendingStop;
81      }
82  
83      /**
84       * @return true if the channel is about to stop
85       */
86      public boolean isPendingStop() {
87          return pendingStop;
88      }
89      
90      /**
91      * set the wire format to be used by this channel
92      * @param wireformat
93      */
94     public void setWireFormat(WireFormat wireformat){
95         currentWireFormat = wireformat;
96     }
97     
98     /**
99      * Get the current wireformat used by this channel
100     * @return the current wire format - or null if not set
101     */
102    public WireFormat getWireFormat(){
103        return currentWireFormat;
104    }
105 
106     /**
107      * close the channel
108      */
109     public void stop() {
110         transportConnected = false;
111         Map map = new HashMap(this.requestMap);
112         for (Iterator i = map.values().iterator();i.hasNext();) {
113             ReceiptHolder rh = (ReceiptHolder) i.next();
114             rh.close();
115         }
116         map.clear();
117         requestMap.clear();
118         if (transportChannelListener != null) {
119             transportChannelListener.removeClient(this);
120         }
121         exceptionListener = null;
122         packetListener = null;
123     }
124 
125     /**
126      * synchronously send a Packet
127      * 
128      * @param packet
129      * @return a Receipt
130      * @throws JMSException
131      */
132     public Receipt send(Packet packet) throws JMSException {
133         return send(packet, 0);
134     }
135 
136     /**
137      * Synchronously send a Packet
138      * 
139      * @param packet packet to send
140      * @param timeout amount of time to wait for a receipt
141      * @return the Receipt
142      * @throws JMSException
143      */
144     public Receipt send(Packet packet, int timeout) throws JMSException {
145         ReceiptHolder rh = asyncSendWithReceipt(packet);
146         Receipt result = rh.getReceipt(timeout);
147         return result;
148     }
149     
150     /**
151      * Asynchronously send a Packet with receipt.
152      * 
153      * @param packet the packet to send
154      * @return a ReceiptHolder for the packet
155      * @throws JMSException
156      */
157   public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException {
158         ReceiptHolder rh = new ReceiptHolder();
159         requestMap.put(new Short(packet.getId()), rh);
160         Packet response = doAsyncSend(packet);
161         if (response != null && response instanceof Receipt){
162             rh.setReceipt((Receipt)response);
163         }
164         return rh;
165   }
166 
167     // Properties
168     //-------------------------------------------------------------------------
169     /**
170      * @return the transportChannelListener
171      */
172     public TransportChannelListener getTransportChannelListener() {
173         return transportChannelListener;
174     }
175 
176     /**
177      * @param transportChannelListener
178      */
179     public void setTransportChannelListener(TransportChannelListener transportChannelListener) {
180         this.transportChannelListener = transportChannelListener;
181     }
182 
183     /**
184      * Add a listener for changes in a channels status
185      * 
186      * @param listener
187      */
188     public void addTransportStatusEventListener(TransportStatusEventListener listener) {
189         listeners.add(listener);
190     }
191 
192     /**
193      * Remove a listener for changes in a channels status
194      * 
195      * @param listener
196      */
197     public void removeTransportStatusEventListener(TransportStatusEventListener listener) {
198         listeners.remove(listener);
199     }
200 
201     /**
202      * @return the clientID
203      */
204     public String getClientID() {
205         return clientID;
206     }
207 
208     /**
209      * @param clientID set the clientID
210      */
211     public void setClientID(String clientID) {
212         this.clientID = clientID;
213     }
214 
215     /**
216      * @return the exception listener
217      */
218     public ExceptionListener getExceptionListener() {
219         return exceptionListener;
220     }
221 
222     /**
223      * @return the packet listener
224      */
225     public PacketListener getPacketListener() {
226         return packetListener;
227     }
228 
229     /**
230      * Set a listener for Packets
231      * 
232      * @param l
233      */
234     public void setPacketListener(PacketListener l) {
235         this.packetListener = l;
236     }
237 
238     /**
239      * Set an exception listener to listen for asynchronously generated exceptions
240      * 
241      * @param listener
242      */
243     public void setExceptionListener(ExceptionListener listener) {
244         this.exceptionListener = listener;
245     }
246 
247     /**
248      * @return true if server side
249      */
250     public boolean isServerSide() {
251         return serverSide;
252     }
253 
254     /**
255      * @param serverSide
256      */
257     public void setServerSide(boolean serverSide) {
258         this.serverSide = serverSide;
259     }
260     
261     /**
262      * @return true if the transport channel is active,
263      * this value will be false through reconnecting
264      */
265     public boolean isTransportConnected(){
266         return transportConnected;
267     }
268     
269     protected void setTransportConnected(boolean value){
270         transportConnected = value;
271     }
272     
273     /**
274      * Some transports rely on an embedded broker (beer based protocols)
275      * @return true if an embedded broker required
276      */
277     public boolean requiresEmbeddedBroker(){
278         return false;
279     }
280     
281     /**
282      * Some transports that rely on an embedded broker need to
283      * create the connector used by the broker
284      * @return the BrokerConnector or null if not applicable
285      * @throws JMSException
286      */
287     public BrokerConnector getEmbeddedBrokerConnector() throws JMSException{
288         return null;
289     }
290     
291     
292     /**
293      * @return true if this transport is multicast based (i.e. broadcasts to multiple nodes)
294      */
295     public boolean isMulticast(){
296         return false;
297     }
298     
299     /**
300      * Can this wireformat process packets of this version
301      * @param version the version number to test
302      * @return true if can accept the version
303      */
304     public boolean canProcessWireFormatVersion(int version){
305         return true;
306     }
307     
308   public long getLastReceiptTimestamp() {
309     return lastReceiptTimstamp;
310   }
311     
312     /**
313      * @return Returns the usedInternally.
314      */
315     public boolean isUsedInternally() {
316         return usedInternally;
317     }
318     /**
319      * @param usedInternally The usedInternally to set.
320      */
321     public void setUsedInternally(boolean usedInternally) {
322         this.usedInternally = usedInternally;
323     }
324     
325     /**
326      * Does the transport support wire format version info
327      * @return
328      */
329     public boolean doesSupportWireFormatVersioning(){
330         return true;
331     }
332     
333     /**
334      * @return the current version of this wire format
335      */
336     public int getCurrentWireFormatVersion(){
337         return -1;
338     }
339     
340 
341        
342     /**
343      * some transports/wire formats will implement their own fragementation
344      * @return true unless a transport/wire format supports it's own fragmentation
345      */
346     public boolean doesSupportMessageFragmentation(){
347         return getWireFormat() != null && getWireFormat().doesSupportMessageFragmentation();
348     }
349     
350     
351     /**
352      * Some transports/wireformats will not be able to understand compressed messages
353      * @return true unless a transport/wire format cannot understand compression
354      */
355     public boolean doesSupportMessageCompression(){
356         return getWireFormat() != null && getWireFormat().doesSupportMessageCompression();
357     }
358     
359     // Implementation methods
360     //-------------------------------------------------------------------------
361     /**
362      * consume a packet from the channel
363      * 
364      * @param packet
365      * @throws UnsupportedWireFormatException
366      */
367     protected void doConsumePacket(Packet packet) {
368         doConsumePacket(packet, packetListener);
369     }
370 
371     protected void doConsumePacket(Packet packet, PacketListener listener) {
372         if (!doHandleReceipt(packet) && !doHandleWireFormat(packet)) {
373             if (listener != null) {
374                 listener.consume(packet);
375             }
376             else {
377                 log.warn("No packet listener set to receive packets");
378             }
379         }
380     }
381 
382     protected boolean doHandleReceipt(Packet packet) {
383         boolean result = false;
384         if (packet != null) {
385             if (packet.isReceipt()) {
386               lastReceiptTimstamp = System.currentTimeMillis();
387                 result = true;
388                 Receipt receipt = (Receipt) packet;
389                 ReceiptHolder rh = (ReceiptHolder) requestMap.remove(new Short(receipt.getCorrelationId()));
390                 if (rh != null) {
391                     rh.setReceipt(receipt);
392                 }
393                 else {
394                     log.warn("No Packet found to match Receipt correlationId: " + receipt.getCorrelationId());
395                 }
396             }
397         }
398         return result;
399     }
400 
401     protected boolean doHandleWireFormat(Packet packet) {
402         boolean handled = false;
403         if (packet.getPacketType() == Packet.WIRE_FORMAT_INFO) {
404             handled = true;
405             WireFormatInfo info = (WireFormatInfo) packet;
406             if (!canProcessWireFormatVersion(info.getVersion())) {
407                 setPendingStop(true);
408                 String errorStr = "Cannot process wire format of version: " + info.getVersion();
409                 TransportStatusEvent event = new TransportStatusEvent();
410                 event.setChannelStatus(TransportStatusEvent.FAILED);
411                 fireStatusEvent(event);
412                 onAsyncException(new UnsupportedWireFormatException(errorStr));
413                 stop();
414             }
415             else {
416                 if (log.isDebugEnabled()) {
417                     log.debug(this + " using wire format version: " + info.getVersion());
418                 }
419             }
420         }
421         return handled;
422     }
423 
424     /**
425      * send a Packet to the raw underlying transport This method is here to allow specific implementations to override
426      * this method
427      * 
428      * @param packet
429      * @return a response or null
430      * @throws JMSException
431      */
432     protected Packet doAsyncSend(Packet packet) throws JMSException {
433         asyncSend(packet);
434         return null;
435     }
436 
437     /**
438      * Handles an exception thrown while performing async dispatch of messages
439      * 
440      * @param e
441      */
442     protected void onAsyncException(JMSException e) {
443         if (exceptionListener != null) {
444             transportConnected = false;
445             exceptionListener.onException(e);
446         }
447         else {
448             log.warn("Caught exception dispatching message and no ExceptionListener registered: " + e, e);
449         }
450     }
451 
452     /**
453      * Fire status event to any status event listeners
454      * 
455      * @param remoteURI
456      * @param status
457      */
458     protected void fireStatusEvent(URI remoteURI, int status) {
459         TransportStatusEvent event = new TransportStatusEvent();
460         event.setChannelStatus(status);
461         event.setRemoteURI(remoteURI);
462         fireStatusEvent(event);
463     }
464 
465     /**
466      * Fire status event to any status event listeners
467      * 
468      * @param event
469      */
470     protected void fireStatusEvent(TransportStatusEvent event) {
471         if (event != null) {
472             for (Iterator i = listeners.iterator();i.hasNext();) {
473                 TransportStatusEventListener l = (TransportStatusEventListener) i.next();
474                 l.statusChanged(event);
475             }
476         }
477     }
478 
479     /**
480      * A helper method to stop the execution of an executor
481      * 
482      * @param executor the executor or null if one is not created yet
483      * @throws InterruptedException
484      * @throws JMSException
485      */
486     protected void stopExecutor(Executor executor) throws InterruptedException, JMSException {
487         ExecutorHelper.stopExecutor(executor);
488     }
489     /**
490      * @return Returns the cachingEnabled.
491      */
492     public boolean isCachingEnabled() {
493         return cachingEnabled;
494     }
495     /**
496      * @param cachingEnabled The cachingEnabled to set.
497      */
498     public void setCachingEnabled(boolean cachingEnabled) {
499         this.cachingEnabled = cachingEnabled;
500     }
501     
502     /**
503      * Inform Transport to send messages as quickly
504      * as possible - for Tcp - this means disabling Nagles,
505      * which on OSX may provide better performance for sync
506      * sends
507      * @return Returns the noDelay.
508      */
509     public boolean isNoDelay() {
510         return noDelay;
511     }
512     /**
513      * @param noDelay The noDelay to set.
514      */
515     public void setNoDelay(boolean noDelay) {
516         this.noDelay = noDelay;
517     }
518 }