Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » [javadoc | source]
    1   /*
    2    * JBoss, Home of Professional Open Source.
    3    * Copyright 2006, Red Hat Middleware LLC, and individual contributors
    4    * as indicated by the @author tags. See the copyright.txt file in the
    5    * distribution for a full listing of individual contributors.
    6    *
    7    * This is free software; you can redistribute it and/or modify it
    8    * under the terms of the GNU Lesser General Public License as
    9    * published by the Free Software Foundation; either version 2.1 of
   10    * the License, or (at your option) any later version.
   11    *
   12    * This software is distributed in the hope that it will be useful,
   13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15    * Lesser General Public License for more details.
   16    *
   17    * You should have received a copy of the GNU Lesser General Public
   18    * License along with this software; if not, write to the Free
   19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21    */
   22   package org.jboss.mq;
   23   
   24   import java.io.IOException;
   25   import java.io.Serializable;
   26   import java.util.Arrays;
   27   import java.util.HashMap;
   28   import java.util.HashSet;
   29   import java.util.LinkedList;
   30   
   31   import javax.jms.ConnectionMetaData;
   32   import javax.jms.Destination;
   33   import javax.jms.ExceptionListener;
   34   import javax.jms.IllegalStateException;
   35   import javax.jms.JMSException;
   36   import javax.jms.JMSSecurityException;
   37   import javax.jms.Queue;
   38   import javax.jms.TemporaryQueue;
   39   import javax.jms.TemporaryTopic;
   40   import javax.transaction.xa.Xid;
   41   
   42   import org.jboss.logging.Logger;
   43   import org.jboss.mq.il.ClientILService;
   44   import org.jboss.mq.il.ServerIL;
   45   import org.jboss.util.UnreachableStatementException;
   46   
   47   import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
   48   import EDU.oswego.cs.dl.util.concurrent.Semaphore;
   49   import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
   50   import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
   51   
   52   /**
   53    * This class implements javax.jms.Connection.
   54    * 
   55    * <p>
   56    * It is also the gateway through wich all calls to the JMS server is done. To
   57    * do its work it needs a ServerIL to invoke (@see
   58    * org.jboss.mq.server.ServerIL).
   59    * </p>
   60    * 
   61    * <p>
   62    * The (new from february 2002) logic for clientID is the following: if logging
   63    * in with a user and passwork a preconfigured clientID may be automatically
   64    * delivered from the server.
   65    * </p>
   66    * 
   67    * <p>
   68    * If the client wants to set it's own clientID it must do so on a connection
   69    * wich does not have a prefonfigured clientID and it must do so before it
   70    * calls any other methods on the connection (even getClientID()). It is not
   71    * allowable to use a clientID that either looks like JBossMQ internal one
   72    * (beginning with ID) or a clientID that is allready in use by someone, or a
   73    * clientID that is already preconfigured in the server.
   74    * </p>
   75    * 
   76    * <p>
   77    * If a preconfigured ID is not get, or a valid one is not set, the server will
   78    * set an internal ID. This ID is NEVER possible to use for durable
   79    * subscriptions. If a prefconfigured ID or one manually set is possible to use
   80    * to create a durable subscriptions is governed by the security configuration
   81    * of JBossMQ. In the default setup, only preconfigured clientID's are possible
   82    * to use. If using a SecurityManager, permissions to create a surable
   83    * subscriptions is * the resiult of a combination of the following:
   84    * </p>
   85    * <p>- The clientID is not one of JBossMQ's internal.
   86    * </p>
   87    * <p>- The user is authenticated and has a role that has create set to true
   88    * in the security config of the destination.
   89    * </p>
   90    * 
   91    * <p>
   92    * Notes for JBossMQ developers: All calls, except close(), that is possible to
   93    * do on a connection must call checkClientID()
   94    * </p>
   95    * 
   96    * @author Norbert Lataille (Norbert.Lataille@m4x.org)
   97    * @author Hiram Chirino (Cojonudo14@hotmail.com)
   98    * @author <a href="pra@tim.se">Peter Antman</a>
   99    * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
  100    * @version $Revision: 61741 $
  101    */
  102   public abstract class Connection implements Serializable, javax.jms.Connection
  103   {
  104      /** The serialVersionUID */
  105      private static final long serialVersionUID = 87938199839407082L;
  106   
  107      /** The threadGroup */
  108      private static ThreadGroup threadGroup = new ThreadGroup("JBossMQ Client Threads");
  109   
  110      /** The log */
  111      static Logger log = Logger.getLogger(Connection.class);
  112   
  113      /** Whether trace is enabled */
  114      static boolean trace = log.isTraceEnabled();
  115   
  116      /** Manages the thread that pings the connection to see if it is 'alive' */
  117      static protected ClockDaemon clockDaemon = new ClockDaemon();
  118   
  119      /** Maps a destination to a LinkedList of Subscriptions */
  120      public HashMap destinationSubscriptions = new HashMap();
  121   
  122      /** Maps a subscription id to a Subscription */
  123      public HashMap subscriptions = new HashMap();
  124   
  125      /** Is the connection stopped ? */
  126      public boolean modeStop;
  127   
  128      /** This is our connection to the JMS server */
  129      protected ServerIL serverIL;
  130   
  131      /** This is the clientID */
  132      protected String clientID;
  133   
  134      /** The connection token is used to identify our connection to the server. */
  135      protected ConnectionToken connectionToken;
  136   
  137      /** The object that sets up the client IL */
  138      protected ClientILService clientILService;
  139   
  140      /** How often to ping the connection */
  141      protected long pingPeriod = 1000 * 60;
  142   
  143      /** This field is reset when a ping is sent, set when ponged. */
  144      protected boolean ponged = true;
  145   
  146      /** This is used to know when the PingTask is running */
  147      Semaphore pingTaskSemaphore = new Semaphore(1);
  148   
  149      /** Identifies the PinkTask in the ClockDaemon */
  150      Object pingTaskId;
  151   
  152      /** Set a soon as close() is called on the connection. */
  153      private SynchronizedBoolean closing = new SynchronizedBoolean(false);
  154   
  155      /** Whether setClientId is Allowed */
  156      private volatile boolean setClientIdAllowed = true;
  157   
  158      /** LinkedList of all created sessions by this connection */
  159      HashSet createdSessions;
  160   
  161      /** Numbers subscriptions */
  162      int subscriptionCounter = Integer.MIN_VALUE;
  163   
  164      /** The lock for subscriptionCounter */
  165      Object subCountLock = new Object();
  166   
  167      /** Is the connection closed */
  168      private SynchronizedBoolean closed = new SynchronizedBoolean(false);
  169   
  170      /** Used to control tranactions */
  171      SpyXAResourceManager spyXAResourceManager;
  172   
  173      /** The class that created this connection */
  174      GenericConnectionFactory genericConnectionFactory;
  175   
  176      /** Last message ID returned */
  177      private int lastMessageID;
  178   
  179      /** the exceptionListener */
  180      private ExceptionListener exceptionListener;
  181   
  182      /** The exception listener lock */
  183      private Object elLock = new Object();
  184      
  185      /** The exception listener invocation thread */
  186      private Thread elThread;
  187      
  188      /** Used in message id generation */
  189      private StringBuffer sb = new StringBuffer();
  190   
  191      /** Used in message id generation */
  192      private char[] charStack = new char[22];
  193   
  194      /** The next session id */
  195      String sessionId;
  196   
  197      /** Temporary destinations created by this connection */
  198      protected HashSet temps = new HashSet();
  199      
  200      static
  201      {
  202         log.debug("Setting the clockDaemon's thread factory");
  203         clockDaemon.setThreadFactory(new ThreadFactory()
  204         {
  205            public Thread newThread(Runnable r)
  206            {
  207               Thread t = new Thread(getThreadGroup(), r, "Connection Monitor Thread");
  208               t.setDaemon(true);
  209               return t;
  210            }
  211         });
  212      }
  213   
  214      public static ThreadGroup getThreadGroup()
  215      {
  216         if (threadGroup.isDestroyed())
  217            threadGroup = new ThreadGroup("JBossMQ Client Threads");
  218         return threadGroup;
  219      }
  220   
  221      /**
  222   	 * Create a new Connection
  223   	 * 
  224   	 * @param userName the username
  225   	 * @param password the password
  226   	 * @param genericConnectionFactory the constructing class
  227   	 * @throws JMSException for any error
  228   	 */
  229      Connection(String userName, String password, GenericConnectionFactory genericConnectionFactory) throws JMSException
  230      {
  231         //Set the attributes
  232         createdSessions = new HashSet();
  233         connectionToken = null;
  234         lastMessageID = 0;
  235         modeStop = true;
  236   
  237         if (trace)
  238            log.trace("Connection Initializing userName=" + userName + " " + this);
  239         this.genericConnectionFactory = genericConnectionFactory;
  240         genericConnectionFactory.initialise(this);
  241   
  242         // Connect to the server
  243         if (trace)
  244            log.trace("Getting the serverIL " + this);
  245         serverIL = genericConnectionFactory.createServerIL();
  246         if (trace)
  247            log.trace("serverIL=" + serverIL + " " + this);
  248   
  249         // Register ourselves as a client
  250         try
  251         {
  252            authenticate(userName, password);
  253   
  254            if (userName != null)
  255               askForAnID(userName, password);
  256   
  257            startILService();
  258         }
  259         catch (Throwable t)
  260         {
  261            // Client registeration failed, close the connection
  262            try
  263            {
  264               serverIL.connectionClosing(null);
  265            }
  266            catch (Throwable t2)
  267            {
  268               log.debug("Error closing the connection", t2);
  269            }
  270   
  271            SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
  272         }
  273   
  274         // Finish constructing the connection
  275         try
  276         {
  277            if (trace)
  278               log.trace("Creating XAResourceManager " + this);
  279   
  280            // Setup the XA Resource manager,
  281            spyXAResourceManager = new SpyXAResourceManager(this);
  282   
  283            if (trace)
  284               log.trace("Starting the ping thread " + this);
  285            startPingThread();
  286   
  287            if (trace)
  288               log.trace("Connection establishment successful " + this);
  289         }
  290         catch (Throwable t)
  291         {
  292            // Could not complete the connection, tidy up
  293            // the server and client ILs.
  294            try
  295            {
  296               serverIL.connectionClosing(connectionToken);
  297            }
  298            catch (Throwable t2)
  299            {
  300               log.debug("Error closing the connection", t2);
  301            }
  302            try
  303            {
  304               stopILService();
  305            }
  306            catch (Throwable t2)
  307            {
  308               log.debug("Error stopping the client IL", t2);
  309            }
  310   
  311            SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
  312         }
  313      }
  314   
  315      /**
  316   	 * Create a new Connection
  317   	 * 
  318   	 * @param genericConnectionFactory the constructing class
  319   	 * @throws JMSException for any error
  320   	 */
  321      Connection(GenericConnectionFactory genericConnectionFactory) throws JMSException
  322      {
  323         this(null, null, genericConnectionFactory);
  324      }
  325   
  326      /**
  327   	 * Gets the ServerIL attribute of the Connection object
  328   	 * 
  329   	 * @return The ServerIL value
  330   	 */
  331      public ServerIL getServerIL()
  332      {
  333         return serverIL;
  334      }
  335   
  336      /**
  337   	 * Notification from the server that the connection is closed
  338   	 */
  339      public void asynchClose()
  340      {
  341         // If we receive a close and we did not initiate it, then fire the exception listener
  342         if (closing.get() == false)
  343            asynchFailure("Asynchronous close from server.", new IOException("Close request from the server or transport layer."));
  344      }
  345   
  346      /**
  347   	 * Called by a TemporaryDestination which is going to be deleted()
  348   	 * 
  349   	 * @param dest the temporary destination
  350   	 */
  351      public void asynchDeleteTemporaryDestination(SpyDestination dest)
  352      {
  353         if (trace)
  354            log.trace("Deleting temporary destination " + dest);
  355         try
  356         {
  357            deleteTemporaryDestination(dest);
  358         }
  359         catch (Throwable t)
  360         {
  361            asynchFailure("Error deleting temporary destination " + dest, t);
  362         }
  363      }
  364   
  365      /**
  366   	 * Gets the first consumer that is listening to a destination.
  367   	 * 
  368   	 * @param requests the receive requests
  369   	 */
  370      public void asynchDeliver(ReceiveRequest requests[])
  371      {
  372         // If we are closing the connection, the server will nack the messages
  373         if (closing.get())
  374            return;
  375   
  376         if (trace)
  377            log.trace("Async deliver requests=" + Arrays.asList(requests) + " " + this);
  378         
  379         try
  380         {
  381            for (int i = 0; i < requests.length; i++)
  382            {
  383               ReceiveRequest r = requests[i];
  384               if (trace)
  385                  log.trace("Processing request=" + r + " " + this);
  386               
  387               SpyConsumer consumer = (SpyConsumer) subscriptions.get(r.subscriptionId);
  388               r.message.createAcknowledgementRequest(r.subscriptionId.intValue());
  389   
  390               if (consumer == null)
  391               {
  392                  send(r.message.getAcknowledgementRequest(false));
  393                  log.debug("WARNING: NACK issued due to non existent subscription " + r.message.header.messageId);
  394                  continue;
  395               }
  396   
  397               if (trace)
  398                  log.trace("Delivering messageid=" + r.message.header.messageId + " to consumer=" + consumer);
  399               
  400               consumer.addMessage(r.message);
  401            }
  402         }
  403         catch (Throwable t)
  404         {
  405            asynchFailure("Error during async delivery", t);
  406         }
  407      }
  408      /**
  409   	 * Notification of a failure on this connection
  410   	 * 
  411   	 * @param reason the reason for the failure
  412   	 * @param t the throwable
  413   	 */
  414      public void asynchFailure(String reason, Throwable t)
  415      {
  416         if (trace)
  417            log.trace("Notified of failure reason=" + reason + " " + this, t);
  418   
  419         // Exceptions due to closing will be ignored.
  420         if (closing.get())
  421            return;
  422   
  423         JMSException excep = SpyJMSException.getAsJMSException(reason, t);
  424   
  425         synchronized (elLock)
  426         {
  427            ExceptionListener el = exceptionListener;
  428            if (el != null && elThread == null)
  429            {
  430               try
  431               {
  432                  Runnable run = new ExceptionListenerRunnable(el, excep);
  433                  elThread = new Thread(getThreadGroup(), run, "ExceptionListener " + this);
  434                  elThread.setDaemon(false);
  435                  elThread.start();
  436               }
  437               catch (Throwable t1)
  438               {
  439                  log.warn("Connection failure: ", excep);
  440                  log.warn("Unable to start exception listener thread: ", t1);
  441               }
  442            }
  443            else if (elThread != null)
  444               log.warn("Connection failure, already in the exception listener", excep);
  445            else
  446               log.warn("Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect", excep);
  447         }
  448      }
  449   
  450      /**
  451   	 * Invoked when the server pong us
  452   	 * 
  453   	 * @param serverTime the server time
  454   	 */
  455      public void asynchPong(long serverTime)
  456      {
  457         if (trace)
  458            log.trace("PONG serverTime=" + serverTime + " " + this);
  459         ponged = true;
  460      }
  461   
  462      /**
  463   	 * Called by a TemporaryDestination which is going to be deleted
  464   	 * 
  465   	 * @param dest the temporary destination
  466   	 * @exception JMSException for any error
  467   	 */
  468      public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
  469      {
  470         checkClosed();
  471         if (trace)
  472            log.trace("DeleteDestination dest=" + dest + " " + this);
  473         try
  474         {
  475            //Ask the broker to delete() this TemporaryDestination
  476            serverIL.deleteTemporaryDestination(connectionToken, dest);
  477   
  478            //Remove it from the destinations list
  479            synchronized (subscriptions)
  480            {
  481               destinationSubscriptions.remove(dest);
  482            }
  483   
  484            // Remove it from the temps list
  485            synchronized (temps)
  486            {
  487               temps.remove(dest);
  488            }
  489         }
  490         catch (Throwable t)
  491         {
  492            
  493            SpyJMSException.rethrowAsJMSException("Cannot delete the TemporaryDestination", t);
  494         }
  495      }
  496   
  497      public void setClientID(String cID) throws JMSException
  498      {
  499         checkClosed();
  500         if (clientID != null)
  501            throw new IllegalStateException("The connection has already a clientID");
  502         if (setClientIdAllowed == false)
  503            throw new IllegalStateException("SetClientID was not called emediately after creation of connection");
  504   
  505         if (trace)
  506            log.trace("SetClientID clientID=" + clientID + " " + this);
  507   
  508         try
  509         {
  510            serverIL.checkID(cID);
  511         }
  512         catch (Throwable t)
  513         {
  514            SpyJMSException.rethrowAsJMSException("Cannot connect to the JMSServer", t);
  515         }
  516   
  517         clientID = cID;
  518         connectionToken.setClientID(clientID);
  519      }
  520   
  521      public String getClientID() throws JMSException
  522      {
  523         checkClosed();
  524         return clientID;
  525      }
  526   
  527      public ExceptionListener getExceptionListener() throws JMSException
  528      {
  529         checkClosed();
  530         checkClientID();
  531         return exceptionListener;
  532      }
  533   
  534      public void setExceptionListener(ExceptionListener listener) throws JMSException
  535      {
  536         checkClosed();
  537         checkClientID();
  538   
  539         exceptionListener = listener;
  540      }
  541   
  542      public ConnectionMetaData getMetaData() throws JMSException
  543      {
  544         checkClosed();
  545         checkClientID();
  546   
  547         return new SpyConnectionMetaData();
  548      }
  549   
  550      public synchronized void close() throws JMSException
  551      {
  552         if (closed.get())
  553            return;
  554         if (trace)
  555            log.trace("Closing connection " + this);
  556         
  557         closing.set(true);
  558   
  559         // We don't want to notify the exception listener
  560         exceptionListener = null;
  561   
  562         // The first exception
  563         JMSException exception = null;
  564   
  565         try
  566         {
  567            doStop();
  568         }
  569         catch (Throwable t)
  570         {
  571            log.trace("Error during stop", t);
  572         }
  573         
  574         if (trace)
  575            log.trace("Closing sessions " + this);
  576         Object[] vect = null;
  577         synchronized (createdSessions)
  578         {
  579            vect = createdSessions.toArray();
  580         }
  581         for (int i = 0; i < vect.length; i++)
  582         {
  583            SpySession session = (SpySession) vect[i];
  584            try
  585            {
  586               session.close();
  587            }
  588            catch (Throwable t)
  589            {
  590               if (trace)
  591                  log.trace("Error closing session " + session, t);
  592            }
  593         }
  594         if (trace)
  595            log.trace("Closed sessions " + this);
  596   
  597         if (trace)
  598            log.trace("Notifying the server of close " + this);
  599         try
  600         {
  601            serverIL.connectionClosing(connectionToken);
  602         }
  603         catch (Throwable t)
  604         {
  605            log.trace("Cannot close properly the connection", t);
  606         }
  607   
  608         if (trace)
  609            log.trace("Stopping ping thread " + this);
  610         try
  611         {
  612            stopPingThread();
  613         }
  614         catch (Throwable t)
  615         {
  616            if (exception == null)
  617               exception = SpyJMSException.getAsJMSException("Cannot stop the ping thread", t);
  618         }
  619   
  620         if (trace)
  621            log.trace("Stopping the ClientIL service " + this);
  622         try
  623         {
  624            stopILService();
  625         }
  626         catch (Throwable t)
  627         {
  628            log.trace("Cannot stop the client il service", t);
  629         }
  630   
  631         // Only set the closed flag after all the objects that depend
  632         // on this connection have been closed.
  633         closed.set(true);
  634   
  635         if (trace)
  636            log.trace("Disconnected from server " + this);
  637   
  638         // Throw the first exception
  639         if (exception != null)
  640            throw exception;
  641      }
  642   
  643      public void start() throws JMSException
  644      {
  645         checkClosed();
  646         checkClientID();
  647   
  648         if (modeStop == false)
  649            return;
  650         modeStop = false;
  651   
  652         if (trace)
  653            log.trace("Starting connection " + this);
  654   
  655         try
  656         {
  657            serverIL.setEnabled(connectionToken, true);
  658         }
  659         catch (Throwable t)
  660         {
  661            SpyJMSException.rethrowAsJMSException("Cannot enable the connection with the JMS server", t);
  662         }
  663      }
  664   
  665      public void stop() throws JMSException
  666      {
  667         checkClosed();
  668         checkClientID();
  669         doStop();
  670      }
  671   
  672      public String toString()
  673      {
  674         StringBuffer buffer = new StringBuffer();
  675         buffer.append("Connection@").append(System.identityHashCode(this));
  676         buffer.append('[');
  677         if (connectionToken != null)
  678            buffer.append("token=").append(connectionToken);
  679         else
  680            buffer.append("clientID=").append(clientID);
  681         if (closed.get())
  682            buffer.append(" CLOSED");
  683         else if (closing.get())
  684            buffer.append(" CLOSING");
  685         buffer.append(" rcvstate=");
  686         if (modeStop)
  687            buffer.append("STOPPED");
  688         else
  689            buffer.append("STARTED");
  690         buffer.append(']');
  691         return buffer.toString();
  692      }
  693   
  694      /**
  695   	 * Get the next message id
  696   	 * <p>
  697   	 * 
  698   	 * All longs are less than 22 digits long
  699   	 * <p>
  700   	 * 
  701   	 * Note that in this routine we assume that System.currentTimeMillis() is
  702   	 * non-negative always be non-negative (so don't set lastMessageID to a
  703   	 * positive for a start).
  704   	 * 
  705   	 * @return the next message id
  706   	 * @throws JMSException for any error
  707   	 */
  708      String getNewMessageID() throws JMSException
  709      {
  710         checkClosed();
  711         synchronized (sb)
  712         {
  713            sb.setLength(0);
  714            sb.append(clientID);
  715            sb.append('-');
  716            long time = System.currentTimeMillis();
  717            int count = 0;
  718            do
  719            {
  720               charStack[count] = (char) ('0' + (time % 10));
  721               time = time / 10;
  722               ++count;
  723            }
  724            while (time != 0);
  725            --count;
  726            for (; count >= 0; --count)
  727            {
  728               sb.append(charStack[count]);
  729            }
  730            ++lastMessageID;
  731            //avoid having to deal with negative numbers.
  732            if (lastMessageID < 0)
  733            {
  734               lastMessageID = 0;
  735            }
  736            int id = lastMessageID;
  737            count = 0;
  738            do
  739            {
  740               charStack[count] = (char) ('0' + (id % 10));
  741               id = id / 10;
  742               ++count;
  743            }
  744            while (id != 0);
  745            --count;
  746            for (; count >= 0; --count)
  747            {
  748               sb.append(charStack[count]);
  749            }
  750            return sb.toString();
  751         }
  752      }
  753   
  754      /**
  755   	 * A new Consumer has been created.
  756   	 * <p>
  757   	 * We have to handle security issues, a consumer may actually not be allowed
  758   	 * to be created
  759   	 * 
  760   	 * @param consumer the consumer added
  761   	 * @throws JMSException for any error
  762   	 */
  763      void addConsumer(SpyConsumer consumer) throws JMSException
  764      {
  765         checkClosed();
  766         Subscription req = consumer.getSubscription();
  767         synchronized (subCountLock)
  768         {
  769            req.subscriptionId = subscriptionCounter++;
  770         }
  771         req.connectionToken = connectionToken;
  772         if (trace)
  773            log.trace("addConsumer sub=" + req);
  774   
  775         try
  776         {
  777            synchronized (subscriptions)
  778            {
  779               subscriptions.put(new Integer(req.subscriptionId), consumer);
  780   
  781               LinkedList ll = (LinkedList) destinationSubscriptions.get(req.destination);
  782               if (ll == null)
  783               {
  784                  ll = new LinkedList();
  785                  destinationSubscriptions.put(req.destination, ll);
  786               }
  787   
  788               ll.add(consumer);
  789            }
  790   
  791            serverIL.subscribe(connectionToken, req);
  792         }
  793         catch (JMSSecurityException ex)
  794         {
  795            removeConsumerInternal(consumer);
  796            throw ex;
  797         }
  798         catch (Throwable t)
  799         {
  800            SpyJMSException.rethrowAsJMSException("Cannot subscribe to this Destination: ", t);
  801         }
  802      }
  803   
  804      /**
  805   	 * Browse a queue
  806   	 * 
  807   	 * @param queue the queue
  808   	 * @param selector the selector
  809   	 * @return an array of messages
  810   	 * @exception JMSException for any error
  811   	 */
  812      SpyMessage[] browse(Queue queue, String selector) throws JMSException
  813      {
  814         checkClosed();
  815         if (trace)
  816            log.trace("Browsing queue=" + queue + " selector=" + selector + " " + this);
  817   
  818         try
  819         {
  820            return serverIL.browse(connectionToken, queue, selector);
  821         }
  822         catch (Throwable t)
  823         {
  824            SpyJMSException.rethrowAsJMSException("Cannot browse the Queue", t);
  825            throw new UnreachableStatementException();
  826         }
  827      }
  828   
  829      /**
  830   	 * Ping the server
  831   	 * 
  832   	 * @param clientTime the start of the ping
  833   	 * @throws JMSException for any error
  834   	 */
  835      void pingServer(long clientTime) throws JMSException
  836      {
  837         checkClosed();
  838         trace = log.isTraceEnabled();
  839         if (trace)
  840            log.trace("PING " + clientTime + " " + this);
  841   
  842         try
  843         {
  844            serverIL.ping(connectionToken, clientTime);
  845         }
  846         catch (Throwable t)
  847         {
  848            SpyJMSException.rethrowAsJMSException("Cannot ping the JMS server", t);
  849         }
  850      }
  851   
  852      /**
  853   	 * Receive a message
  854   	 * 
  855   	 * @param sub the subscription
  856   	 * @param wait the wait time
  857   	 * @return the message or null if there isn't one
  858   	 * @throws JMSException for any error
  859   	 */
  860      SpyMessage receive(Subscription sub, long wait) throws JMSException
  861      {
  862         checkClosed();
  863         if (trace)
  864            log.trace("Receive subscription=" + sub + " wait=" + wait);
  865   
  866         try
  867         {
  868            SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, wait);
  869            if (message != null)
  870               message.createAcknowledgementRequest(sub.subscriptionId);
  871            return message;
  872         }
  873         catch (Throwable t)
  874         {
  875            SpyJMSException.rethrowAsJMSException("Cannot receive ", t);
  876            throw new UnreachableStatementException();
  877         }
  878      }
  879   
  880      /**
  881       * Remove a consumer
  882       *
  883       * @param consumer the consumer
  884       * @throws JMSException for any error
  885       */
  886      void removeConsumer(SpyConsumer consumer) throws JMSException
  887      {
  888         checkClosed();
  889         Subscription req = consumer.getSubscription();
  890         if (trace)
  891            log.trace("removeConsumer req=" + req);
  892   
  893         try
  894         {
  895            serverIL.unsubscribe(connectionToken, req.subscriptionId);
  896   
  897            removeConsumerInternal(consumer);
  898         }
  899         catch (Throwable t)
  900         {
  901            SpyJMSException.rethrowAsJMSException("Cannot unsubscribe to this destination", t);
  902         }
  903   
  904      }
  905   
  906      /**
  907       * Send a message to the server
  908       *
  909       * @param mes the message
  910       * @throws JMSException for any error
  911       */
  912      void sendToServer(SpyMessage mes) throws JMSException
  913      {
  914         checkClosed();
  915         if (trace)
  916            log.trace("SendToServer message=" + mes.header.jmsMessageID + " " + this);
  917         
  918         try
  919         {
  920            serverIL.addMessage(connectionToken, mes);
  921         }
  922         catch (Throwable t)
  923         {
  924            SpyJMSException.rethrowAsJMSException("Cannot send a message to the JMS server", t);
  925         }
  926      }
  927   
  928      /**
  929       * Closing a session
  930       *
  931       * @param who the session
  932       */
  933      void sessionClosing(SpySession who)
  934      {
  935         if (trace)
  936            log.trace("Closing session " + who);
  937         
  938         synchronized (createdSessions)
  939         {
  940            createdSessions.remove(who);
  941         }
  942   
  943         //This session should not be in the "destinations" object anymore.
  944         //We could check this, though
  945      }
  946   
  947      void unsubscribe(DurableSubscriptionID id) throws JMSException
  948      {
  949         if (trace)
  950            log.trace("Unsubscribe id=" + id + " " + this);
  951         
  952         try
  953         {
  954            serverIL.destroySubscription(connectionToken, id);
  955         }
  956         catch (Throwable t)
  957         {
  958            SpyJMSException.rethrowAsJMSException("Cannot destroy durable subscription " + id, t);
  959         }
  960      }
  961   
  962      /**
  963       * Check a tempoary destination
  964       *
  965       * @param destination the destination
  966       */
  967      void checkTemporary(Destination destination) throws JMSException
  968      {
  969         if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic)
  970         {
  971            synchronized (temps)
  972            {
  973               if (temps.contains(destination) == false)
  974                  throw new JMSException("Cannot create a consumer for a temporary destination from a different session. " + destination);
  975            }
  976         }
  977      }
  978   
  979      /**
  980   	 * Check that a clientID exists. If not get one from server.
  981   	 * 
  982   	 * Also sets the setClientIdAllowed to false.
  983   	 * 
  984   	 * Check clientId, must be called by all public methods on the
  985   	 * jacax.jmx.Connection interface and its children.
  986   	 * 
  987   	 * @exception JMSException if clientID is null as post condition
  988   	 */
  989      synchronized protected void checkClientID() throws JMSException
  990      {
  991         if (setClientIdAllowed == false)
  992            return;
  993   
  994         setClientIdAllowed = false;
  995         if (trace)
  996            log.trace("Checking clientID=" + clientID + " " + this);
  997         if (clientID == null)
  998         {
  999            askForAnID();//Request a random one
 1000            if (clientID == null)
 1001               throw new JMSException("Could not get a clientID");
 1002            connectionToken.setClientID(clientID);
 1003   
 1004            if (trace)
 1005               log.trace("ClientID established " + this);
 1006         }
 1007      }
 1008   
 1009      /**
 1010   	 * Ask the server for an id
 1011   	 * 
 1012   	 * @exception JMSException for any error
 1013   	 */
 1014      protected void askForAnID() throws JMSException
 1015      {
 1016         if (trace)
 1017            log.trace("Ask for an id " + this);
 1018         
 1019         try
 1020         {
 1021            if (clientID == null)
 1022               clientID = serverIL.getID();
 1023         }
 1024         catch (Throwable t)
 1025         {
 1026            SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
 1027         }
 1028      }
 1029   
 1030      /**
 1031   	 * Ask the server for an id
 1032   	 * 
 1033   	 * @param userName the user
 1034   	 * @param password the password
 1035   	 * @exception JMSException for any error
 1036   	 */
 1037      protected void askForAnID(String userName, String password) throws JMSException
 1038      {
 1039         if (trace)
 1040            log.trace("Ask for an id user=" +  userName + " " + this);
 1041   
 1042         try
 1043         {
 1044            String configuredClientID = serverIL.checkUser(userName, password);
 1045            if (configuredClientID != null)
 1046               clientID = configuredClientID;
 1047         }
 1048         catch (Throwable t)
 1049         {
 1050            SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
 1051         }
 1052      }
 1053   
 1054      /**
 1055       * Authenticate a user
 1056       *
 1057       * @param userName the user
 1058       * @param password the password
 1059       * @throws JMSException for any error
 1060       */
 1061      protected void authenticate(String userName, String password) throws JMSException
 1062      {
 1063         if (trace)
 1064            log.trace("Authenticating user " + userName + " " + this);
 1065         try
 1066         {
 1067            sessionId = serverIL.authenticate(userName, password);
 1068         }
 1069         catch (Throwable t)
 1070         {
 1071            SpyJMSException.rethrowAsJMSException("Cannot authenticate user", t);
 1072         }
 1073      }
 1074   
 1075      // used to acknowledge a message
 1076      /**
 1077   	 * Acknowledge/Nack a message
 1078   	 * 
 1079   	 * @param item the acknowledgement
 1080   	 * @exception JMSException for any error
 1081   	 */
 1082      protected void send(AcknowledgementRequest item) throws JMSException
 1083      {
 1084         checkClosed();
 1085         if (trace)
 1086            log.trace("Acknowledge item=" + item + " " + this);
 1087   
 1088         try
 1089         {
 1090            serverIL.acknowledge(connectionToken, item);
 1091         }
 1092         catch (Throwable t)
 1093         {
 1094            SpyJMSException.rethrowAsJMSException("Cannot acknowlege a message", t);
 1095         }
 1096      }
 1097   
 1098      /**
 1099   	 * Commit/rollback
 1100   	 * 
 1101   	 * @param transaction the transaction request
 1102   	 * @exception JMSException for any error
 1103   	 */
 1104      protected void send(TransactionRequest transaction) throws JMSException
 1105      {
 1106         checkClosed();
 1107         if (trace)
 1108            log.trace("Transact request=" + transaction + " " + this);
 1109   
 1110         try
 1111         {
 1112            serverIL.transact(connectionToken, transaction);
 1113         }
 1114         catch (Throwable t)
 1115         {
 1116            SpyJMSException.rethrowAsJMSException("Cannot process a transaction", t);
 1117         }
 1118      }
 1119   
 1120      /**
 1121       * Recover
 1122       * 
 1123       * @param flags the flags
 1124       * @throws JMSException for any error
 1125       */
 1126      protected Xid[] recover(int flags) throws JMSException
 1127      {
 1128         checkClosed();
 1129         if (trace)
 1130            log.trace("Recover flags=" + flags + " " + this);
 1131   
 1132         try
 1133         {
 1134            if (serverIL instanceof Recoverable)
 1135            {
 1136               Recoverable recoverableIL = (Recoverable) serverIL;
 1137               return recoverableIL.recover(connectionToken, flags);
 1138            }
 1139         }
 1140         catch (Throwable t)
 1141         {
 1142            SpyJMSException.rethrowAsJMSException("Cannot recover", t);
 1143         }
 1144         
 1145         log.warn(serverIL + " does not implement " + Recoverable.class.getName());
 1146         return new Xid[0];
 1147      }
 1148   
 1149      /**
 1150   	 * Start the il
 1151   	 * 
 1152   	 * @exception JMSException for any error
 1153   	 */
 1154      protected void startILService() throws JMSException
 1155      {
 1156         if (trace)
 1157            log.trace("Starting the client il " + this);
 1158         try
 1159         {
 1160            clientILService = genericConnectionFactory.createClientILService(this);
 1161            clientILService.start();
 1162            if (trace)
 1163               log.trace("Using client id " + clientILService + " " + this);
 1164            connectionToken = new ConnectionToken(clientID, clientILService.getClientIL(), sessionId);
 1165            serverIL.setConnectionToken(connectionToken);
 1166         }
 1167         catch (Throwable t)
 1168         {
 1169            SpyJMSException.rethrowAsJMSException("Cannot start a the client IL service", t);
 1170         }
 1171      }
 1172   
 1173      /**
 1174   	 * Stop the il
 1175   	 * 
 1176   	 * @exception JMSException for any error
 1177   	 */
 1178      protected void stopILService() throws JMSException
 1179      {
 1180         try
 1181         {
 1182            clientILService.stop();
 1183         }
 1184         catch (Throwable t)
 1185         {
 1186            SpyJMSException.rethrowAsJMSException("Cannot stop a the client IL service", t);
 1187         }
 1188      }
 1189      
 1190      /**
 1191       * Stop delivery
 1192       *
 1193       * @param consumer the consumer
 1194       */
 1195      public void doStop() throws JMSException
 1196      {
 1197         if (modeStop)
 1198            return;
 1199         modeStop = true;
 1200   
 1201         if (trace)
 1202            log.trace("Stopping connection " + this);
 1203   
 1204         try
 1205         {
 1206            serverIL.setEnabled(connectionToken, false);
 1207         }
 1208         catch (Throwable t)
 1209         {
 1210            SpyJMSException.rethrowAsJMSException("Cannot disable the connection with the JMS server", t);
 1211         }
 1212      }
 1213      
 1214      /**
 1215       * Remove a consumer
 1216       *
 1217       * @param consumer the consumer
 1218       */
 1219      private void removeConsumerInternal(SpyConsumer consumer)
 1220      {
 1221         synchronized (subscriptions)
 1222         {
 1223            Subscription req = consumer.getSubscription();
 1224            subscriptions.remove(new Integer(req.subscriptionId));
 1225   
 1226            LinkedList ll = (LinkedList) destinationSubscriptions.get(req.destination);
 1227            if (ll != null)
 1228            {
 1229               ll.remove(consumer);
 1230               if (ll.size() == 0)
 1231               {
 1232                  destinationSubscriptions.remove(req.destination);
 1233               }
 1234            }
 1235         }
 1236      }
 1237      
 1238      /**
 1239       * Check whether we are closed
 1240       * 
 1241       * @throws IllegalStateException when the session is closed
 1242       */
 1243      protected void checkClosed() throws IllegalStateException
 1244      {
 1245         if (closed.get())
 1246            throw new IllegalStateException("The connection is closed");
 1247      }
 1248   
 1249      /**
 1250       * Start the ping thread
 1251       */
 1252      private void startPingThread()
 1253      {
 1254         // Ping thread does not need to be running if the ping period is 0.
 1255         if (pingPeriod == 0)
 1256            return;
 1257         pingTaskId = clockDaemon.executePeriodically(pingPeriod, new PingTask(), true);
 1258      }
 1259   
 1260      /**
 1261       * Stop the ping thread
 1262       */
 1263      private void stopPingThread()
 1264      {
 1265         // Ping thread was not running if ping period is 0.
 1266         if (pingPeriod == 0)
 1267            return;
 1268   
 1269         ClockDaemon.cancel(pingTaskId);
 1270   
 1271         //Aquire the Semaphore to make sure the ping task is not running.
 1272         try
 1273         {
 1274            pingTaskSemaphore.attempt(1000 * 10);
 1275         }
 1276         catch (InterruptedException e)
 1277         {
 1278            Thread.currentThread().interrupt();
 1279         }
 1280      }
 1281   
 1282      /**
 1283   	 * The ping task
 1284   	 */
 1285      class PingTask implements Runnable
 1286      {
 1287         public void run()
 1288         {
 1289            // Don't bother if we are closing
 1290            if (closing.get())
 1291               return;
 1292            
 1293            try
 1294            {
 1295               // If we can't aquire the semaphore then it
 1296               // almost certainly means the close has got it
 1297               // Try for 10 seconds to make sure the problem
 1298               // is not just a long garbage collection that has suspended threads
 1299               if (pingTaskSemaphore.attempt(1000 * 10) == false)
 1300                  return;
 1301            }
 1302            catch (InterruptedException e)
 1303            {
 1304               log.debug("Interrupted requesting ping semaphore");
 1305               return;
 1306            }
 1307            try
 1308            {
 1309               if (ponged == false)
 1310               {
 1311                  // Server did not pong use with in the timeout
 1312                  // period.. Assuming the connection is dead.
 1313                  throw new SpyJMSException("No pong received", new IOException("ping timeout."));
 1314               }
 1315   
 1316               ponged = false;
 1317               pingServer(System.currentTimeMillis());
 1318            }
 1319            catch (Throwable t)
 1320            {
 1321               asynchFailure("Unexpected ping failure", t);
 1322            }
 1323            finally
 1324            {
 1325               pingTaskSemaphore.release();
 1326            }
 1327         }
 1328      }
 1329      
 1330      /**
 1331       * The Exception listener runnable
 1332       */
 1333      class ExceptionListenerRunnable implements Runnable
 1334      {
 1335         ExceptionListener el;
 1336         JMSException excep;
 1337         
 1338         /**
 1339          * Create a new ExceptionListener runnable
 1340          * 
 1341          * @param el the exception exception
 1342          * @param excep the jms exception
 1343          */
 1344         public ExceptionListenerRunnable(ExceptionListener el, JMSException excep)
 1345         {
 1346            this.el = el;
 1347            this.excep = excep;
 1348         }
 1349         
 1350         public void run()
 1351         {
 1352            try
 1353            {
 1354               synchronized (elLock)
 1355               {
 1356                  el.onException(excep);
 1357               }
 1358            }
 1359            catch (Throwable t)
 1360            {
 1361               log.warn("Connection failure: ", excep);
 1362               log.warn("Exception listener ended abnormally: ", t);
 1363            }
 1364            
 1365            synchronized (elLock)
 1366            {
 1367               elThread = null;
 1368            }
 1369         }
 1370      }
 1371   }

Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » [javadoc | source]