Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » il » uil2 » [javadoc | source]
    1   /*
    2   * JBoss, Home of Professional Open Source
    3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
    4   * by the @authors tag. See the copyright.txt in the distribution for a
    5   * full listing of individual contributors.
    6   *
    7   * This is free software; you can redistribute it and/or modify it
    8   * under the terms of the GNU Lesser General Public License as
    9   * published by the Free Software Foundation; either version 2.1 of
   10   * the License, or (at your option) any later version.
   11   *
   12   * This software is distributed in the hope that it will be useful,
   13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15   * Lesser General Public License for more details.
   16   *
   17   * You should have received a copy of the GNU Lesser General Public
   18   * License along with this software; if not, write to the Free
   19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21   */
   22   package org.jboss.mq.il.uil2;
   23   
   24   import java.io.Serializable;
   25   import java.io.IOException;
   26   import java.net.InetAddress;
   27   import java.net.ConnectException;
   28   import java.net.Socket;
   29   import javax.jms.Destination;
   30   import javax.jms.JMSException;
   31   import javax.jms.Queue;
   32   import javax.jms.TemporaryQueue;
   33   import javax.jms.TemporaryTopic;
   34   import javax.jms.Topic;
   35   import javax.net.SocketFactory;
   36   import javax.transaction.xa.Xid;
   37   
   38   import org.jboss.logging.Logger;
   39   import org.jboss.mq.AcknowledgementRequest;
   40   import org.jboss.mq.Connection;
   41   import org.jboss.mq.ConnectionToken;
   42   import org.jboss.mq.DurableSubscriptionID;
   43   import org.jboss.mq.Recoverable;
   44   import org.jboss.mq.SpyDestination;
   45   import org.jboss.mq.SpyMessage;
   46   import org.jboss.mq.TransactionRequest;
   47   import org.jboss.mq.il.ServerIL;
   48   import org.jboss.mq.il.uil2.msgs.MsgTypes;
   49   import org.jboss.mq.il.uil2.msgs.ConnectionTokenMsg;
   50   import org.jboss.mq.il.uil2.msgs.EnableConnectionMsg;
   51   import org.jboss.mq.il.uil2.msgs.GetIDMsg;
   52   import org.jboss.mq.il.uil2.msgs.RecoverMsg;
   53   import org.jboss.mq.il.uil2.msgs.TemporaryDestMsg;
   54   import org.jboss.mq.il.uil2.msgs.AcknowledgementRequestMsg;
   55   import org.jboss.mq.il.uil2.msgs.AddMsg;
   56   import org.jboss.mq.il.uil2.msgs.BrowseMsg;
   57   import org.jboss.mq.il.uil2.msgs.CheckIDMsg;
   58   import org.jboss.mq.il.uil2.msgs.CheckUserMsg;
   59   import org.jboss.mq.il.uil2.msgs.CloseMsg;
   60   import org.jboss.mq.il.uil2.msgs.CreateDestMsg;
   61   import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
   62   import org.jboss.mq.il.uil2.msgs.DeleteSubscriptionMsg;
   63   import org.jboss.mq.il.uil2.msgs.PingMsg;
   64   import org.jboss.mq.il.uil2.msgs.ReceiveMsg;
   65   import org.jboss.mq.il.uil2.msgs.SubscribeMsg;
   66   import org.jboss.mq.il.uil2.msgs.TransactMsg;
   67   import org.jboss.mq.il.uil2.msgs.UnsubscribeMsg;
   68   
   69   /** The UILServerIL is created on the server and copied to the client during
   70    * connection factory lookups. It represents the transport interface to the
   71    * JMS server.
   72    * 
   73    * @author Scott.Stark@jboss.org
   74    * @version $Revision: 45317 $
   75    */
   76   public class UILServerIL
   77      implements Cloneable, MsgTypes, Serializable, ServerIL, Recoverable
   78   {
   79      /** @since 1.7, at least jboss-3.2.5, jboss-4.0.0 */
   80      private static final long serialVersionUID = 853594001646066224L;
   81      private static Logger log = Logger.getLogger(UILServerIL.class);
   82   
   83      /** The org.jboss.mq.il.uil2.useServerHost system property allows a client to
   84       * to connect to the host name rather than the ip address
   85       */
   86      private final static String USE_SERVER_HOST = "org.jboss.mq.il.uil2.useServerHost";
   87   
   88      /** The org.jboss.mq.il.uil2.localAddr system property allows a client to
   89       *define the local interface to which its sockets should be bound
   90       */
   91      private final static String LOCAL_ADDR = "org.jboss.mq.il.uil2.localAddr";
   92      /** The org.jboss.mq.il.uil2.localPort system property allows a client to
   93       *define the local port to which its sockets should be bound
   94       */
   95      private final static String LOCAL_PORT = "org.jboss.mq.il.uil2.localPort";
   96      /** The org.jboss.mq.il.uil2.serverAddr system property allows a client to
   97       * override the address to which it attempts to connect to. This is useful
   98       * for networks where NAT is ocurring between the client and jms server.
   99       */
  100      private final static String SERVER_ADDR = "org.jboss.mq.il.uil2.serverAddr";
  101      /** The org.jboss.mq.il.uil2.serverPort system property allows a client to
  102       * override the port to which it attempts to connect. This is useful for
  103       * for networks where port forwarding is ocurring between the client and jms
  104       * server.
  105       */
  106      private final static String SERVER_PORT = "org.jboss.mq.il.uil2.serverPort";
  107      /** The org.jboss.mq.il.uil2.retryCount controls the number of attempts to
  108       * retry connecting to the jms server. Retries are only made for 
  109       * java.net.ConnectException failures. A value <= 0 means no retry atempts
  110       * will be made.
  111       */
  112      private final static String RETRY_COUNT = "org.jboss.mq.il.uil2.retryCount";
  113      /** The org.jboss.mq.il.uil2.retryDelay controls the delay in milliseconds
  114       * between retries due to ConnectException failures.
  115       */
  116      private final static String RETRY_DELAY = "org.jboss.mq.il.uil2.retryDelay";
  117   
  118      /** The server host name/IP to connect to as defined by the jms server.
  119       */
  120      private InetAddress addr;
  121      /** The server port to connect to as defined by the jms server.
  122       */
  123      private int port;
  124      /** The name of the class implementing the javax.net.SocketFactory to
  125       * use for creating the client socket.
  126       */
  127      private String socketFactoryName;
  128   
  129      /**
  130       * If the TcpNoDelay option should be used on the socket.
  131       */
  132      private boolean enableTcpNoDelay = false;
  133   
  134      /**
  135       * The client side read timeout
  136       */
  137      private int soTimeout = 0;
  138   
  139      /**
  140       * The connect address
  141       */
  142      private String connectAddress;
  143   
  144      /**
  145       * The connect port
  146       */
  147      private int connectPort = 0;
  148   
  149      /**
  150       * The buffer size.
  151       */
  152      private int bufferSize;
  153   
  154      /**
  155       * The chunk size.
  156       */
  157      private int chunkSize;
  158   
  159      /** The local interface name/IP to use for the client
  160       */
  161      private transient InetAddress localAddr;
  162      /** The local port to use for the client
  163       */
  164      private transient int localPort;
  165   
  166      /**
  167       * Description of the Field
  168       */
  169      protected transient Socket socket;
  170      /**
  171       * Description of the Field
  172       */
  173      protected transient SocketManager socketMgr;
  174   
  175      public UILServerIL(InetAddress addr, int port, String socketFactoryName,
  176         boolean enableTcpNoDelay, int bufferSize, int chunkSize, int soTimeout, String connectAddress, int connectPort)
  177         throws Exception
  178      {
  179         this.addr = addr;
  180         this.port = port;
  181         this.socketFactoryName = socketFactoryName;
  182         this.enableTcpNoDelay = enableTcpNoDelay;
  183         this.bufferSize = bufferSize;
  184         this.chunkSize = chunkSize;
  185         this.soTimeout = soTimeout;
  186         this.connectAddress = connectAddress;
  187         this.connectPort = connectPort;
  188      }
  189   
  190      public void setConnectionToken(ConnectionToken dest)
  191             throws Exception
  192      {
  193         ConnectionTokenMsg msg = new ConnectionTokenMsg(dest);
  194         getSocketMgr().sendMessage(msg);
  195      }
  196   
  197      public void setEnabled(ConnectionToken dc, boolean enabled)
  198             throws JMSException, Exception
  199      {
  200         EnableConnectionMsg msg = new EnableConnectionMsg(enabled);
  201         getSocketMgr().sendMessage(msg);
  202      }
  203   
  204      public String getID()
  205             throws Exception
  206      {
  207         GetIDMsg msg = new GetIDMsg();
  208         getSocketMgr().sendMessage(msg);
  209         String id = msg.getID();
  210         return id;
  211      }
  212   
  213      public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
  214             throws JMSException, Exception
  215      {
  216         TemporaryDestMsg msg = new TemporaryDestMsg(true);
  217         getSocketMgr().sendMessage(msg);
  218         TemporaryQueue dest = msg.getQueue();
  219         return dest;
  220      }
  221   
  222      public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
  223             throws JMSException, Exception
  224      {
  225         TemporaryDestMsg msg = new TemporaryDestMsg(false);
  226         getSocketMgr().sendMessage(msg);
  227         TemporaryTopic dest = msg.getTopic();
  228         return dest;
  229      }
  230   
  231      public void acknowledge(ConnectionToken dc, AcknowledgementRequest item)
  232             throws JMSException, Exception
  233      {
  234         AcknowledgementRequestMsg msg = new AcknowledgementRequestMsg(item);
  235         if (item.isAck())
  236            getSocketMgr().sendMessage(msg);
  237         else
  238            getSocketMgr().sendOneWay(msg);
  239      }
  240   
  241      public void addMessage(ConnectionToken dc, SpyMessage val)
  242             throws Exception
  243      {
  244         AddMsg msg = new AddMsg(val);
  245         getSocketMgr().sendMessage(msg);
  246      }
  247   
  248      public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector)
  249             throws JMSException, Exception
  250      {
  251         BrowseMsg msg = new BrowseMsg(dest, selector);
  252         getSocketMgr().sendMessage(msg);
  253         SpyMessage[] msgs = msg.getMessages();
  254         return msgs;
  255      }
  256   
  257      public void checkID(String id)
  258             throws JMSException, Exception
  259      {
  260         CheckIDMsg msg = new CheckIDMsg(id);
  261         getSocketMgr().sendMessage(msg);
  262      }
  263   
  264      public String checkUser(String username, String password)
  265             throws JMSException, Exception
  266      {
  267         CheckUserMsg msg = new CheckUserMsg(username, password, false);
  268         getSocketMgr().sendMessage(msg);
  269         String clientID = msg.getID();
  270         return clientID;
  271      }
  272   
  273      public String authenticate(String username, String password)
  274             throws JMSException, Exception
  275      {
  276         CheckUserMsg msg = new CheckUserMsg(username, password, true);
  277         getSocketMgr().sendMessage(msg);
  278         String sessionID = msg.getID();
  279         return sessionID;
  280      }
  281   
  282      public Object clone()
  283             throws CloneNotSupportedException
  284      {
  285         return super.clone();
  286      }
  287   
  288      public ServerIL cloneServerIL()
  289             throws Exception
  290      {
  291         return (ServerIL)clone();
  292      }
  293   
  294      public void connectionClosing(ConnectionToken dc)
  295             throws JMSException, Exception
  296      {
  297         CloseMsg msg = new CloseMsg();
  298         try
  299         {
  300            getSocketMgr().sendMessage(msg);
  301         }
  302         catch (IOException ignored)
  303         {
  304         }
  305         destroyConnection();
  306      }
  307   
  308      public Queue createQueue(ConnectionToken dc, String destName)
  309             throws JMSException, Exception
  310      {
  311         CreateDestMsg msg = new CreateDestMsg(destName, true);
  312         getSocketMgr().sendMessage(msg);
  313         Queue dest = msg.getQueue();
  314         return dest;
  315      }
  316   
  317      public Topic createTopic(ConnectionToken dc, String destName)
  318             throws JMSException, Exception
  319      {
  320         CreateDestMsg msg = new CreateDestMsg(destName, false);
  321         getSocketMgr().sendMessage(msg);
  322         Topic dest = msg.getTopic();
  323         return dest;
  324      }
  325   
  326      public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest)
  327             throws JMSException, Exception
  328      {
  329         DeleteTemporaryDestMsg msg = new DeleteTemporaryDestMsg(dest);
  330         getSocketMgr().sendMessage(msg);
  331      }
  332   
  333      public void destroySubscription(ConnectionToken dc,DurableSubscriptionID id)
  334             throws JMSException, Exception
  335      {
  336         DeleteSubscriptionMsg msg = new DeleteSubscriptionMsg(id);
  337         getSocketMgr().sendMessage(msg);
  338      }
  339   
  340      public void ping(ConnectionToken dc, long clientTime)
  341             throws Exception
  342      {
  343         PingMsg msg = new PingMsg(clientTime, true);
  344         msg.getMsgID();
  345         getSocketMgr().sendReply(msg);
  346      }
  347   
  348      public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait)
  349             throws Exception, Exception
  350      {
  351         ReceiveMsg msg = new ReceiveMsg(subscriberId, wait);
  352         getSocketMgr().sendMessage(msg);
  353         SpyMessage reply = msg.getMessage();
  354         return reply;
  355      }
  356   
  357      public void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s)
  358             throws JMSException, Exception
  359      {
  360         SubscribeMsg msg = new SubscribeMsg(s);
  361         getSocketMgr().sendMessage(msg);
  362      }
  363   
  364      public void transact(ConnectionToken dc, TransactionRequest t)
  365             throws JMSException, Exception
  366      {
  367         TransactMsg msg = new TransactMsg(t);
  368         getSocketMgr().sendMessage(msg);
  369      }
  370   
  371      public Xid[] recover(ConnectionToken dc, int flags) throws Exception
  372      {
  373         RecoverMsg msg = new RecoverMsg(flags);
  374         getSocketMgr().sendMessage(msg);
  375         Xid[] reply = msg.getXids();
  376         return reply;
  377      }
  378   
  379      public void unsubscribe(ConnectionToken dc, int subscriptionID)
  380             throws JMSException, Exception
  381      {
  382         UnsubscribeMsg msg = new UnsubscribeMsg(subscriptionID);
  383         getSocketMgr().sendMessage(msg);
  384      }
  385   
  386      final SocketManager getSocketMgr()
  387         throws Exception
  388      {
  389         if( socketMgr == null )
  390            createConnection();
  391         return socketMgr;
  392      }
  393   
  394      protected void checkConnection()
  395             throws Exception
  396      {
  397         if (socketMgr == null)
  398         {
  399            createConnection();
  400         }
  401      }
  402   
  403      /**
  404       * Used to establish a new connection to the server
  405       *
  406       * @exception Exception  Description of Exception
  407       */
  408      protected void createConnection()
  409             throws Exception
  410      {
  411         boolean tracing = log.isTraceEnabled();
  412   
  413         /** Attempt to load the socket factory and if this fails, use the
  414          * default socket factory impl.
  415          */
  416         SocketFactory socketFactory = null;
  417         if( socketFactoryName != null )
  418         {
  419            try
  420            {
  421               ClassLoader loader = Thread.currentThread().getContextClassLoader();
  422               Class factoryClass = loader.loadClass(socketFactoryName);
  423               socketFactory = (SocketFactory) factoryClass.newInstance();
  424            }
  425            catch(Exception e)
  426            {
  427               log.debug("Failed to load socket factory: "+socketFactoryName, e);
  428            }
  429         }
  430         // Use the default socket factory
  431         if( socketFactory == null )
  432         {
  433            socketFactory = SocketFactory.getDefault();
  434         }
  435   
  436         // Look for a local address and port as properties
  437         String tmp = getProperty(LOCAL_ADDR);
  438         if( tmp != null )
  439            this.localAddr = InetAddress.getByName(tmp);
  440         tmp = getProperty(LOCAL_PORT);
  441         if( tmp != null )
  442            this.localPort = Integer.parseInt(tmp);
  443   
  444         // Look for client side overrides of the server address/port
  445         InetAddress serverAddr = addr;
  446         int serverPort = port;
  447         tmp = getProperty(SERVER_ADDR);
  448         if (tmp == null)
  449            tmp = connectAddress;
  450         if( tmp != null )
  451            serverAddr = InetAddress.getByName(tmp);
  452         tmp = getProperty(SERVER_PORT);
  453         if( tmp != null )
  454            serverPort = Integer.parseInt(tmp);
  455         else if (connectPort != 0)
  456            serverPort = connectPort;
  457         
  458         String useHostNameProp = getProperty(USE_SERVER_HOST);
  459         String serverHost = serverAddr.getHostAddress();
  460         if (Boolean.valueOf(useHostNameProp).booleanValue())
  461            serverHost = serverAddr.getHostName();
  462         
  463         int retries = 0;
  464         // Default to 10 retries, no delay in the absence of user override
  465         int maxRetries = 10;
  466         tmp = getProperty(RETRY_COUNT);
  467         if( tmp != null )
  468            maxRetries = Integer.parseInt(tmp);
  469         long retryDelay = 0;
  470         tmp = getProperty(RETRY_DELAY);
  471         if( tmp != null )
  472         {
  473            retryDelay = Long.parseLong(tmp);
  474            if( retryDelay < 0 )
  475               retryDelay = 0;
  476         }
  477         if( tracing )
  478            log.trace("Begin connect loop, maxRetries="+maxRetries+", delay="+retryDelay);
  479   
  480         while (true)
  481         {
  482            try
  483            {
  484               if( tracing )
  485               {
  486                  log.trace("Connecting with addr="+serverHost+", port="+serverPort
  487                     + ", localAddr="+localAddr+", localPort="+localPort
  488                     + ", socketFactory="+socketFactory
  489                     + ", enableTcpNoDelay="+enableTcpNoDelay
  490                     + ", bufferSize="+bufferSize
  491                     + ", chunkSize="+chunkSize
  492                     );
  493               }
  494               if( localAddr != null )
  495                  socket = socketFactory.createSocket(serverHost, serverPort, localAddr, localPort);
  496               else
  497                  socket = socketFactory.createSocket(serverHost, serverPort);
  498               break;
  499            }
  500            catch (ConnectException e)
  501            {
  502               if (++retries > maxRetries)
  503                  throw e;
  504               if( tracing )
  505                  log.trace("Failed to connect, retries="+retries, e);
  506            }
  507            try
  508            {
  509               Thread.sleep(retryDelay);
  510            }
  511            catch(InterruptedException e)
  512            {
  513               break;
  514            }
  515         }
  516   
  517         socket.setTcpNoDelay(enableTcpNoDelay);
  518         if (soTimeout != 0)
  519            socket.setSoTimeout(soTimeout);
  520         socketMgr = new SocketManager(socket);
  521         socketMgr.setBufferSize(bufferSize);
  522         socketMgr.setChunkSize(chunkSize);
  523         socketMgr.start(Connection.getThreadGroup());
  524      }
  525   
  526      /**
  527       * Used to close the current connection with the server
  528       *
  529       */
  530      protected void destroyConnection()
  531      {
  532         try
  533         {
  534           if( socket != null )
  535           {
  536              try
  537              {
  538                 socketMgr.stop();
  539              }
  540              finally
  541              {
  542                 socket.close();
  543              }
  544           }
  545         }
  546         catch(IOException ignore)
  547         {
  548         }
  549      }
  550   
  551      private String getProperty(String name)
  552      {
  553         String value = null;
  554         try
  555         {
  556            value = System.getProperty(name);
  557         }
  558         catch (Throwable ignored)
  559         {
  560            log.trace("Cannot retrieve system property " + name);
  561         }
  562         return value;
  563      }
  564   }

Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » il » uil2 » [javadoc | source]