Home » apache-activemq-5.1.0-src » org.apache » activemq » transport » tcp » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.tcp;
   18   
   19   import java.io.DataInputStream;
   20   import java.io.DataOutputStream;
   21   import java.io.IOException;
   22   import java.io.InterruptedIOException;
   23   import java.net.InetAddress;
   24   import java.net.InetSocketAddress;
   25   import java.net.Socket;
   26   import java.net.SocketException;
   27   import java.net.SocketTimeoutException;
   28   import java.net.URI;
   29   import java.net.UnknownHostException;
   30   import java.util.HashMap;
   31   import java.util.Map;
   32   import java.util.concurrent.CountDownLatch;
   33   import java.util.concurrent.SynchronousQueue;
   34   import java.util.concurrent.ThreadFactory;
   35   import java.util.concurrent.ThreadPoolExecutor;
   36   import java.util.concurrent.TimeUnit;
   37   import java.util.concurrent.atomic.AtomicReference;
   38   
   39   import javax.net.SocketFactory;
   40   
   41   import org.apache.activemq.Service;
   42   import org.apache.activemq.transport.Transport;
   43   import org.apache.activemq.transport.TransportLoggerFactory;
   44   import org.apache.activemq.transport.TransportThreadSupport;
   45   import org.apache.activemq.util.IntrospectionSupport;
   46   import org.apache.activemq.util.ServiceStopper;
   47   import org.apache.activemq.wireformat.WireFormat;
   48   import org.apache.commons.logging.Log;
   49   import org.apache.commons.logging.LogFactory;
   50   
   51   /**
   52    * An implementation of the {@link Transport} interface using raw tcp/ip
   53    * 
   54    * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
   55    * @version $Revision$
   56    */
   57   public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
   58       private static final Log LOG = LogFactory.getLog(TcpTransport.class);
   59       private static final ThreadPoolExecutor SOCKET_CLOSE;
   60       protected final URI remoteLocation;
   61       protected final URI localLocation;
   62       protected final WireFormat wireFormat;
   63   
   64       protected int connectionTimeout = 30000;
   65       protected int soTimeout;
   66       protected int socketBufferSize = 64 * 1024;
   67       protected int ioBufferSize = 8 * 1024;
   68       protected Socket socket;
   69       protected DataOutputStream dataOut;
   70       protected DataInputStream dataIn;
   71       /**
   72        * trace=true -> the Transport stack where this TcpTransport
   73        * object will be, will have a TransportLogger layer
   74        * trace=false -> the Transport stack where this TcpTransport
   75        * object will be, will NOT have a TransportLogger layer, and therefore
   76        * will never be able to print logging messages.
   77        * This parameter is most probably set in Connection or TransportConnector URIs.
   78        */
   79       protected boolean trace = false;
   80       /**
   81        * Name of the LogWriter implementation to use.
   82        * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
   83        * This parameter is most probably set in Connection or TransportConnector URIs.
   84        */
   85       protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
   86       /**
   87        * Specifies if the TransportLogger will be manageable by JMX or not.
   88        * Also, as long as there is at least 1 TransportLogger which is manageable,
   89        * a TransportLoggerControl MBean will me created.
   90        */
   91       protected boolean dynamicManagement = false;
   92       /**
   93        * startLogging=true -> the TransportLogger object of the Transport stack
   94        * will initially write messages to the log.
   95        * startLogging=false -> the TransportLogger object of the Transport stack
   96        * will initially NOT write messages to the log.
   97        * This parameter only has an effect if trace == true.
   98        * This parameter is most probably set in Connection or TransportConnector URIs.
   99        */
  100       protected boolean startLogging = true;
  101       /**
  102        * Specifies the port that will be used by the JMX server to manage
  103        * the TransportLoggers.
  104        * This should only be set in an URI by a client (producer or consumer) since
  105        * a broker will already create a JMX server.
  106        * It is useful for people who test a broker and clients in the same machine
  107        * and want to control both via JMX; a different port will be needed.
  108        */
  109       protected int jmxPort = 1099;
  110       protected boolean useLocalHost = true;
  111       protected int minmumWireFormatVersion;
  112       protected SocketFactory socketFactory;
  113       protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
  114   
  115       private Map<String, Object> socketOptions;
  116       private Boolean keepAlive;
  117       private Boolean tcpNoDelay;
  118       private Thread runnerThread;
  119   
  120       /**
  121        * Connect to a remote Node - e.g. a Broker
  122        * 
  123        * @param wireFormat
  124        * @param socketFactory
  125        * @param remoteLocation
  126        * @param localLocation - e.g. local InetAddress and local port
  127        * @throws IOException
  128        * @throws UnknownHostException
  129        */
  130       public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
  131                           URI localLocation) throws UnknownHostException, IOException {
  132           this.wireFormat = wireFormat;
  133           this.socketFactory = socketFactory;
  134           try {
  135               this.socket = socketFactory.createSocket();
  136           } catch (SocketException e) {
  137               this.socket = null;
  138           }
  139           this.remoteLocation = remoteLocation;
  140           this.localLocation = localLocation;
  141           setDaemon(false);
  142       }
  143   
  144       /**
  145        * Initialize from a server Socket
  146        * 
  147        * @param wireFormat
  148        * @param socket
  149        * @throws IOException
  150        */
  151       public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
  152           this.wireFormat = wireFormat;
  153           this.socket = socket;
  154           this.remoteLocation = null;
  155           this.localLocation = null;
  156           setDaemon(true);
  157       }
  158   
  159       /**
  160        * A one way asynchronous send
  161        */
  162       public void oneway(Object command) throws IOException {
  163           checkStarted();
  164           wireFormat.marshal(command, dataOut);
  165           dataOut.flush();
  166       }
  167   
  168       /**
  169        * @return pretty print of 'this'
  170        */
  171       public String toString() {
  172           return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
  173       }
  174   
  175       /**
  176        * reads packets from a Socket
  177        */
  178       public void run() {
  179           LOG.trace("TCP consumer thread starting");
  180           this.runnerThread=Thread.currentThread();
  181           try {
  182               while (!isStopped()) {
  183                   doRun();
  184               }
  185           } catch (IOException e) {
  186               stoppedLatch.get().countDown();
  187               onException(e);
  188           } finally {
  189               stoppedLatch.get().countDown();
  190           }
  191       }
  192   
  193       protected void doRun() throws IOException {
  194           try {
  195               Object command = readCommand();
  196               doConsume(command);
  197           } catch (SocketTimeoutException e) {
  198           } catch (InterruptedIOException e) {
  199           }
  200       }
  201   
  202       protected Object readCommand() throws IOException {
  203           return wireFormat.unmarshal(dataIn);
  204       }
  205   
  206       // Properties
  207       // -------------------------------------------------------------------------
  208   
  209       public boolean isTrace() {
  210           return trace;
  211       }
  212   
  213       public void setTrace(boolean trace) {
  214           this.trace = trace;
  215       }
  216       
  217       public String getLogWriterName() {
  218           return logWriterName;
  219       }
  220   
  221       public void setLogWriterName(String logFormat) {
  222           this.logWriterName = logFormat;
  223       }
  224   
  225       public boolean isDynamicManagement() {
  226           return dynamicManagement;
  227       }
  228   
  229       public void setDynamicManagement(boolean useJmx) {
  230           this.dynamicManagement = useJmx;
  231       }
  232   
  233       public boolean isStartLogging() {
  234           return startLogging;
  235       }
  236   
  237       public void setStartLogging(boolean startLogging) {
  238           this.startLogging = startLogging;
  239       }
  240   
  241       public int getJmxPort() {
  242           return jmxPort;
  243       }
  244   
  245       public void setJmxPort(int jmxPort) {
  246           this.jmxPort = jmxPort;
  247       }
  248       
  249       public int getMinmumWireFormatVersion() {
  250           return minmumWireFormatVersion;
  251       }
  252   
  253       public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
  254           this.minmumWireFormatVersion = minmumWireFormatVersion;
  255       }
  256   
  257       public boolean isUseLocalHost() {
  258           return useLocalHost;
  259       }
  260   
  261       /**
  262        * Sets whether 'localhost' or the actual local host name should be used to
  263        * make local connections. On some operating systems such as Macs its not
  264        * possible to connect as the local host name so localhost is better.
  265        */
  266       public void setUseLocalHost(boolean useLocalHost) {
  267           this.useLocalHost = useLocalHost;
  268       }
  269   
  270       public int getSocketBufferSize() {
  271           return socketBufferSize;
  272       }
  273   
  274       /**
  275        * Sets the buffer size to use on the socket
  276        */
  277       public void setSocketBufferSize(int socketBufferSize) {
  278           this.socketBufferSize = socketBufferSize;
  279       }
  280   
  281       public int getSoTimeout() {
  282           return soTimeout;
  283       }
  284   
  285       /**
  286        * Sets the socket timeout
  287        */
  288       public void setSoTimeout(int soTimeout) {
  289           this.soTimeout = soTimeout;
  290       }
  291   
  292       public int getConnectionTimeout() {
  293           return connectionTimeout;
  294       }
  295   
  296       /**
  297        * Sets the timeout used to connect to the socket
  298        */
  299       public void setConnectionTimeout(int connectionTimeout) {
  300           this.connectionTimeout = connectionTimeout;
  301       }
  302   
  303       public Boolean getKeepAlive() {
  304           return keepAlive;
  305       }
  306   
  307       /**
  308        * Enable/disable TCP KEEP_ALIVE mode
  309        */
  310       public void setKeepAlive(Boolean keepAlive) {
  311           this.keepAlive = keepAlive;
  312       }
  313   
  314       public Boolean getTcpNoDelay() {
  315           return tcpNoDelay;
  316       }
  317   
  318       /**
  319        * Enable/disable the TCP_NODELAY option on the socket
  320        */
  321       public void setTcpNoDelay(Boolean tcpNoDelay) {
  322           this.tcpNoDelay = tcpNoDelay;
  323       }
  324   
  325       /**
  326        * @return the ioBufferSize
  327        */
  328       public int getIoBufferSize() {
  329           return this.ioBufferSize;
  330       }
  331   
  332       /**
  333        * @param ioBufferSize the ioBufferSize to set
  334        */
  335       public void setIoBufferSize(int ioBufferSize) {
  336           this.ioBufferSize = ioBufferSize;
  337       }
  338   
  339       // Implementation methods
  340       // -------------------------------------------------------------------------
  341       protected String resolveHostName(String host) throws UnknownHostException {
  342           String localName = InetAddress.getLocalHost().getHostName();
  343           if (localName != null && isUseLocalHost()) {
  344               if (localName.equals(host)) {
  345                   return "localhost";
  346               }
  347           }
  348           return host;
  349       }
  350   
  351       /**
  352        * Configures the socket for use
  353        * 
  354        * @param sock
  355        * @throws SocketException
  356        */
  357       protected void initialiseSocket(Socket sock) throws SocketException {
  358           if (socketOptions != null) {
  359               IntrospectionSupport.setProperties(socket, socketOptions);
  360           }
  361   
  362           try {
  363               sock.setReceiveBufferSize(socketBufferSize);
  364               sock.setSendBufferSize(socketBufferSize);
  365           } catch (SocketException se) {
  366               LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
  367               LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
  368           }
  369           sock.setSoTimeout(soTimeout);
  370   
  371           if (keepAlive != null) {
  372               sock.setKeepAlive(keepAlive.booleanValue());
  373           }
  374           if (tcpNoDelay != null) {
  375               sock.setTcpNoDelay(tcpNoDelay.booleanValue());
  376           }
  377       }
  378   
  379       protected void doStart() throws Exception {
  380           connect();
  381           stoppedLatch.set(new CountDownLatch(1));
  382           super.doStart();
  383       }
  384   
  385       protected void connect() throws Exception {
  386   
  387           if (socket == null && socketFactory == null) {
  388               throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
  389           }
  390   
  391           InetSocketAddress localAddress = null;
  392           InetSocketAddress remoteAddress = null;
  393   
  394           if (localLocation != null) {
  395               localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
  396                                                    localLocation.getPort());
  397           }
  398   
  399           if (remoteLocation != null) {
  400               String host = resolveHostName(remoteLocation.getHost());
  401               remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
  402           }
  403   
  404           if (socket != null) {
  405   
  406               if (localAddress != null) {
  407                   socket.bind(localAddress);
  408               }
  409   
  410               // If it's a server accepted socket.. we don't need to connect it
  411               // to a remote address.
  412               if (remoteAddress != null) {
  413                   if (connectionTimeout >= 0) {
  414                       socket.connect(remoteAddress, connectionTimeout);
  415                   } else {
  416                       socket.connect(remoteAddress);
  417                   }
  418               }
  419   
  420           } else {
  421               // For SSL sockets.. you can't create an unconnected socket :(
  422               // This means the timout option are not supported either.
  423               if (localAddress != null) {
  424                   socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
  425                                                       localAddress.getAddress(), localAddress.getPort());
  426               } else {
  427                   socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
  428               }
  429           }
  430   
  431           initialiseSocket(socket);
  432           initializeStreams();
  433       }
  434   
  435       protected void doStop(ServiceStopper stopper) throws Exception {
  436           if (LOG.isDebugEnabled()) {
  437               LOG.debug("Stopping transport " + this);
  438           }
  439   
  440           // Closing the streams flush the sockets before closing.. if the socket
  441           // is hung.. then this hangs the close.
  442           // closeStreams();
  443           if (socket != null) {
  444               //closing the socket can hang also 
  445               final CountDownLatch latch = new CountDownLatch(1);
  446               SOCKET_CLOSE.execute(new Runnable() {
  447   
  448                   public void run() {
  449                       try {
  450                           socket.close();
  451                       } catch (IOException e) {
  452                           LOG.debug("Caught exception closing socket",e);
  453                       }finally {
  454                           latch.countDown();
  455                       }
  456                   }
  457                   
  458               });
  459               latch.await(1,TimeUnit.SECONDS);
  460              
  461           }
  462       }
  463   
  464       /**
  465        * Override so that stop() blocks until the run thread is no longer running.
  466        */
  467       @Override
  468       public void stop() throws Exception {
  469           super.stop();
  470           CountDownLatch countDownLatch = stoppedLatch.get();
  471           if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
  472               countDownLatch.await(1,TimeUnit.SECONDS);
  473           }
  474       }
  475   
  476       protected void initializeStreams() throws Exception {
  477           TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
  478           this.dataIn = new DataInputStream(buffIn);
  479           TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
  480           this.dataOut = new DataOutputStream(buffOut);
  481       }
  482   
  483       protected void closeStreams() throws IOException {
  484           if (dataOut != null) {
  485               dataOut.close();
  486           }
  487           if (dataIn != null) {
  488               dataIn.close();
  489           }
  490       }
  491   
  492       public void setSocketOptions(Map<String, Object> socketOptions) {
  493           this.socketOptions = new HashMap<String, Object>(socketOptions);
  494       }
  495   
  496       public String getRemoteAddress() {
  497           if (socket != null) {
  498               return "" + socket.getRemoteSocketAddress();
  499           }
  500           return null;
  501       }
  502       
  503       @Override
  504       public <T> T narrow(Class<T> target) {
  505           if (target == Socket.class) {
  506               return target.cast(socket);
  507           }
  508           return super.narrow(target);
  509       }
  510   
  511       static {
  512           SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
  513               public Thread newThread(Runnable runnable) {
  514                   Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
  515                   thread.setDaemon(true);
  516                   return thread;
  517               }
  518           });
  519       }
  520   }

Save This Page
Home » apache-activemq-5.1.0-src » org.apache » activemq » transport » tcp » [javadoc | source]