Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/tcp/TcpTransportChannel.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport.tcp;
20  
21  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
22  import EDU.oswego.cs.dl.util.concurrent.BoundedChannel;
23  import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
24  import EDU.oswego.cs.dl.util.concurrent.Executor;
25  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
26  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.activemq.io.WireFormat;
30  import org.activemq.io.WireFormatLoader;
31  import org.activemq.message.Packet;
32  import org.activemq.transport.TransportChannelSupport;
33  import org.activemq.transport.TransportStatusEvent;
34  import org.activemq.util.JMSExceptionHelper;
35  
36  import javax.jms.JMSException;
37  import java.io.BufferedInputStream;
38  import java.io.DataInputStream;
39  import java.io.DataOutputStream;
40  import java.io.EOFException;
41  import java.io.IOException;
42  import java.io.InterruptedIOException;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.Socket;
46  import java.net.SocketAddress;
47  import java.net.SocketException;
48  import java.net.SocketTimeoutException;
49  import java.net.URI;
50  import java.net.UnknownHostException;
51  
52  /**
53   * A tcp implementation of a TransportChannel
54   *
55   * @version $Revision: 1.2 $
56   */
57  public class TcpTransportChannel extends TransportChannelSupport implements Runnable {
58      private static final int DEFAULT_SOCKET_BUFFER_SIZE = 64 * 1024;
59      private static final Log log = LogFactory.getLog(TcpTransportChannel.class);
60      protected Socket socket;
61      protected DataOutputStream dataOut;
62      protected DataInputStream dataIn;
63  
64      private WireFormatLoader wireFormatLoader;
65      private SynchronizedBoolean closed;
66      private SynchronizedBoolean started;
67      private Object outboundLock;
68      private Executor executor;
69      private Thread thread;
70      private boolean useAsyncSend = false;
71      private int soTimeout = 10000;
72      private int socketBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
73      private BoundedChannel exceptionsList;
74      private TcpTransportServerChannel serverChannel;
75  
76      /**
77       * Construct basic helpers
78       *
79       * @param wireFormat
80       */
81      protected TcpTransportChannel(WireFormat wireFormat) {
82          super(wireFormat);
83          this.wireFormatLoader = new WireFormatLoader(wireFormat);
84          closed = new SynchronizedBoolean(false);
85          started = new SynchronizedBoolean(false);
86          // there's not much point logging all exceptions, lets just keep a few around
87          exceptionsList = new BoundedLinkedQueue(10);
88          outboundLock = new Object();
89          setUseAsyncSend(useAsyncSend);
90          super.setCachingEnabled(true);
91      }
92  
93      /**
94       * Connect to a remote Node - e.g. a Broker
95       *
96       * @param wireFormat
97       * @param remoteLocation
98       * @throws JMSException
99       */
100     public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
101         this(wireFormat);
102         try {
103             this.socket = createSocket(remoteLocation);
104             initializeStreams();
105         }
106         catch (Exception ioe) {
107             throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed. " + "URI was: "
108                     + remoteLocation + " Reason: " + ioe, ioe);
109         }
110     }
111 
112     /**
113      * Connect to a remote Node - e.g. a Broker
114      *
115      * @param wireFormat
116      * @param remoteLocation
117      * @param localLocation  - e.g. local InetAddress and local port
118      * @throws JMSException
119      */
120     public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException {
121         this(wireFormat);
122         try {
123             this.socket = createSocket(remoteLocation, localLocation);
124             initializeStreams();
125         }
126         catch (Exception ioe) {
127             throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
128         }
129     }
130 
131    /**
132     * Initialize from a ServerSocket
133     * @param serverChannel
134     * @param wireFormat
135     * @param socket
136     * @param executor
137     * @throws JMSException
138     */
139     public TcpTransportChannel(TcpTransportServerChannel serverChannel,WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
140         this(wireFormat);
141         this.socket = socket;
142         this.executor = executor;
143         this.serverChannel = serverChannel;
144         setServerSide(true);
145         try {
146             initialiseSocket(socket);
147             initializeStreams();
148         }
149         catch (IOException ioe) {
150             throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
151         }
152     }
153 
154     public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
155         this(wireFormat);
156         this.socket = socket;
157         this.executor = executor;
158         try {
159             initialiseSocket(socket);
160             initializeStreams();
161         }
162         catch (IOException ioe) {
163             throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
164         }
165     }
166 
167     /**
168      * start listeneing for events
169      *
170      * @throws JMSException if an error occurs
171      */
172     public void start() throws JMSException {
173         if (started.commit(false, true)) {
174             thread = new Thread(this, toString());
175             try {
176                 if (isServerSide()) {
177                     thread.setDaemon(true);
178                     readWireFormat();
179                     getWireFormat().registerTransportStreams(dataOut, dataIn);
180                     getWireFormat().initiateServerSideProtocol();
181                 }
182                 else {
183                     getWireFormat().registerTransportStreams(dataOut, dataIn);
184                     thread.setPriority(Thread.NORM_PRIORITY + 2);
185                 }
186                 //enable caching on the wire format
187                 currentWireFormat.setCachingEnabled(isCachingEnabled());
188                 thread.start();
189                 //send the wire format
190                 if (!isServerSide()) {
191                     getWireFormat().initiateClientSideProtocol();
192                 }
193         fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.CONNECTED));
194             }
195             catch (EOFException e) {
196                 doClose(e);
197             }
198             catch (IOException e) {
199                 JMSException jmsEx = new JMSException("start failed: " + e.getMessage());
200                 jmsEx.initCause(e);
201                 jmsEx.setLinkedException(e);
202                 throw jmsEx;
203             }
204         }
205     }
206 
207     protected void readWireFormat() throws JMSException, IOException {
208         WireFormat wf = wireFormatLoader.getWireFormat(dataIn);
209         if (wf != null) {
210             setWireFormat(wf);
211         }
212     }
213 
214     /**
215      * close the channel
216      */
217     public void stop() {
218         if (closed.commit(false, true)) {
219             super.stop();
220             try {
221                 if (executor != null) {
222                     stopExecutor(executor);
223                 }
224                 closeStreams();
225                 socket.close();
226             }
227             catch (Exception e) {
228                 log.warn("Caught while closing: " + e + ". Now Closed", e);
229             }
230         }
231         closed.set(true);
232         if (this.serverChannel != null){
233             this.serverChannel.removeClient(this);
234         }
235     }
236 
237     public void forceDisconnect() {
238         log.debug("Forcing disconnect");
239         if (socket != null && socket.isConnected()) {
240             try {
241                 socket.close();
242             }
243             catch (IOException e) {
244                 // Ignore
245             }
246         }
247     }
248 
249     /**
250      * Asynchronously send a Packet
251      *
252      * @param packet
253      * @throws JMSException
254      */
255     public void asyncSend(final Packet packet) throws JMSException {
256         if (executor != null) {
257             try {
258                 executor.execute(new Runnable() {
259                     public void run() {
260                         try {
261                             if (!isClosed()) {
262                                 doAsyncSend(packet);
263                             }
264                         }
265                         catch (JMSException e) {
266                             try {
267                                 exceptionsList.put(e);
268                             }
269                             catch (InterruptedException e1) {
270                                 log.warn("Failed to add element to exception list: " + e1);
271                             }
272                         }
273                     }
274                 });
275             }
276             catch (InterruptedException e) {
277                 log.info("Caught: " + e, e);
278             }
279             try {
280                 JMSException e = (JMSException) exceptionsList.poll(0);
281                 if (e != null) {
282                     throw e;
283                 }
284             }
285             catch (InterruptedException e1) {
286                 log.warn("Failed to remove element to exception list: " + e1);
287             }
288         }
289         else {
290             doAsyncSend(packet);
291         }
292     }
293 
294     /**
295      * @return false
296      */
297     public boolean isMulticast() {
298         return false;
299     }
300 
301     /**
302      * reads packets from a Socket
303      */
304     public void run() {
305         log.trace("TCP consumer thread starting");
306         int count = 0;
307         while (!isClosed()) {
308             if (isServerSide() && ++count > 500) {
309                 count = 0;
310                 Thread.yield();
311             }
312             try {
313                 Packet packet = getWireFormat().readPacket(dataIn);
314                 if (packet != null) {
315                     doConsumePacket(packet);
316                 }
317             }
318             catch (SocketTimeoutException e) {
319                 //onAsyncException(JMSExceptionHelper.newJMSException(e));
320             }
321             catch (InterruptedIOException e) {
322                 // TODO confirm that this really is a bug in the AS/400 JVM
323                 // Patch for AS/400 JVM
324                 // lets ignore these exceptions
325                 // as they typically just indicate the thread was interupted
326                 // while waiting for input, not that the socket is in error
327                 //onAsyncException(JMSExceptionHelper.newJMSException(e));
328             }
329             catch (IOException e) {
330                 doClose(e);
331             }
332         }
333     }
334 
335     public boolean isClosed() {
336         return closed.get();
337     }
338 
339     /**
340      * pretty print for object
341      *
342      * @return String representation of this object
343      */
344     public String toString() {
345         return "TcpTransportChannel: " + socket;
346     }
347 
348     /**
349      * @return the socket used by the TcpTransportChannel
350      */
351     public Socket getSocket() {
352         return socket;
353     }
354 
355     /**
356      * Can this wireformat process packets of this version
357      *
358      * @param version the version number to test
359      * @return true if can accept the version
360      */
361     public boolean canProcessWireFormatVersion(int version) {
362         return getWireFormat().canProcessWireFormatVersion(version);
363     }
364 
365     /**
366      * @return the current version of this wire format
367      */
368     public int getCurrentWireFormatVersion() {
369         return getWireFormat().getCurrentWireFormatVersion();
370     }
371 
372     // Properties
373     //-------------------------------------------------------------------------
374 
375     /**
376      * @return true if packets are enqueued to a separate queue before dispatching
377      */
378     public boolean isUseAsyncSend() {
379         return useAsyncSend;
380     }
381 
382     /**
383      * set the useAsync flag
384      *
385      * @param useAsyncSend
386      */    
387      public void setUseAsyncSend(boolean useAsyncSend) {
388         this.useAsyncSend = useAsyncSend;
389         try {
390             if (useAsyncSend && executor==null ) {
391                 PooledExecutor pe = new PooledExecutor(new BoundedBuffer(10), 1);
392                 pe.waitWhenBlocked();
393                 pe.setKeepAliveTime(1000);
394                 executor = pe;
395             }
396             else if (!useAsyncSend && executor != null) {
397                 stopExecutor(executor);
398             }
399         }
400         catch (Exception e) {
401             log.warn("problem closing executor", e);
402         }
403     }
404 
405     
406 
407     /**
408      * @return the current so timeout used on the socket
409      */
410     public int getSoTimeout() {
411         return soTimeout;
412     }
413 
414     /**
415      * set the socket so timeout
416      *
417      * @param soTimeout
418      * @throws JMSException
419      */
420     public void setSoTimeout(int soTimeout) throws JMSException {
421         this.soTimeout = soTimeout;
422         if (this.socket != null){
423             try {
424                 socket.setSoTimeout(soTimeout);
425             }
426             catch (SocketException e) {
427                 JMSException jmsEx = new JMSException("Failed to set soTimeout: ", e.getMessage());
428                 jmsEx.setLinkedException(e);
429                 throw jmsEx;
430             }
431         }
432     }
433     
434     /**
435      * @param noDelay The noDelay to set.
436      */
437     public void setNoDelay(boolean noDelay) {
438         super.setNoDelay(noDelay);
439         if (socket != null){
440             try {
441                 socket.setTcpNoDelay(noDelay);
442             }
443             catch (SocketException e) {
444                log.warn("failed to set noDelay on the socket");//should never happen
445             }
446         }
447     }
448 
449     /**
450      * @return Returns the socketBufferSize.
451      */
452     public int getSocketBufferSize() {
453         return socketBufferSize;
454     }
455     /**
456      * @param socketBufferSize The socketBufferSize to set.
457      */
458     public void setSocketBufferSize(int socketBufferSize) {
459         this.socketBufferSize = socketBufferSize;
460     }
461     // Implementation methods
462     //-------------------------------------------------------------------------
463     /**
464      * Actually performs the async send of a packet
465      *
466      * @param packet
467      * @return a response or null
468      * @throws JMSException
469      */
470     protected Packet doAsyncSend(Packet packet) throws JMSException {
471         Packet response = null;
472         try {
473             synchronized (outboundLock) {
474                 response = getWireFormat().writePacket(packet, dataOut);
475                 dataOut.flush();
476             }
477         }
478         catch (IOException e) {
479 //            if (closed.get()) {
480 //                log.trace("Caught exception while closed: " + e, e);
481 //            }
482 //            else {
483                 JMSException exception = JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
484                 onAsyncException(exception);
485                 throw exception;
486 //            }
487         }
488         catch (JMSException e) {
489             if (isClosed()) {
490                 log.trace("Caught exception while closed: " + e, e);
491             }
492             else {
493                 throw e;
494             }
495         }
496         return response;
497     }
498 
499     protected void doClose(Exception ex) {
500         if (!isClosed()) {
501             if (!pendingStop) {
502                 setPendingStop(true);
503                 setTransportConnected(false);
504                 if (ex instanceof EOFException) {
505                     if (!isServerSide() && !isUsedInternally()){
506                         log.warn("Peer closed connection", ex);
507                     }
508                     fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
509                     onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
510                 }
511                 else {
512                     fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
513                     onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
514                 }
515             }
516             stop();
517         }
518     }
519 
520     /**
521      * Configures the socket for use
522      * @param sock
523      * @throws SocketException
524      */
525     protected void initialiseSocket(Socket sock) throws SocketException {
526         try {
527             sock.setReceiveBufferSize(socketBufferSize);
528             sock.setSendBufferSize(socketBufferSize);
529         }
530         catch (SocketException se) {
531             log.debug("Cannot set socket buffer size = " + socketBufferSize, se);
532         }
533         sock.setSoTimeout(soTimeout);
534         sock.setTcpNoDelay(isNoDelay());
535     }
536     
537     protected void initializeStreams() throws IOException{
538         BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream(),8192);
539         this.dataIn = new DataInputStream(buffIn);
540         TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(),8192);
541         this.dataOut = new DataOutputStream(buffOut);
542     }
543 
544     protected void closeStreams() throws IOException {
545         if (dataOut != null) {
546             dataOut.close();
547         }
548         if (dataIn != null) {
549             dataIn.close();
550         }
551     }
552 
553     /**
554      * Factory method to create a new socket
555      *
556      * @param remoteLocation the URI to connect to
557      * @return the newly created socket
558      * @throws UnknownHostException
559      * @throws IOException
560      */
561     protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException {
562         SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort());
563         Socket sock = new Socket();
564         initialiseSocket(sock);
565         sock.connect(sockAddress);
566         return sock;
567     }
568 
569     /**
570      * Factory method to create a new socket
571      *
572      * @param remoteLocation
573      * @param localLocation
574      * @return @throws IOException
575      * @throws IOException
576      * @throws UnknownHostException
577      */
578     protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
579         SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort());
580         SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
581         Socket sock = new Socket();
582         initialiseSocket(sock);
583         sock.bind(localAddress);
584         sock.connect(sockAddress);
585         return sock;
586     }
587 
588 }