Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.resource.adapter » jms » inflow » [javadoc | source]
    1   /*
    2    * JBoss, Home of Professional Open Source.
    3    * Copyright 2006, Red Hat Middleware LLC, and individual contributors
    4    * as indicated by the @author tags. See the copyright.txt file in the
    5    * distribution for a full listing of individual contributors.
    6    *
    7    * This is free software; you can redistribute it and/or modify it
    8    * under the terms of the GNU Lesser General Public License as
    9    * published by the Free Software Foundation; either version 2.1 of
   10    * the License, or (at your option) any later version.
   11    *
   12    * This software is distributed in the hope that it will be useful,
   13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15    * Lesser General Public License for more details.
   16    *
   17    * You should have received a copy of the GNU Lesser General Public
   18    * License along with this software; if not, write to the Free
   19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21    */
   22   package org.jboss.resource.adapter.jms.inflow;
   23   
   24   import java.lang.reflect.Method;
   25   import java.util.concurrent.atomic.AtomicBoolean;
   26   
   27   import javax.jms.Connection;
   28   import javax.jms.Destination;
   29   import javax.jms.ExceptionListener;
   30   import javax.jms.JMSException;
   31   import javax.jms.Message;
   32   import javax.jms.MessageListener;
   33   import javax.jms.Queue;
   34   import javax.jms.QueueConnection;
   35   import javax.jms.QueueConnectionFactory;
   36   import javax.jms.Topic;
   37   import javax.jms.TopicConnection;
   38   import javax.jms.TopicConnectionFactory;
   39   import javax.jms.XAQueueConnectionFactory;
   40   import javax.jms.XATopicConnectionFactory;
   41   import javax.management.Notification;
   42   import javax.naming.Context;
   43   import javax.resource.ResourceException;
   44   import javax.resource.spi.endpoint.MessageEndpointFactory;
   45   import javax.resource.spi.work.Work;
   46   import javax.resource.spi.work.WorkManager;
   47   import javax.transaction.TransactionManager;
   48   
   49   import org.jboss.jms.jndi.JMSProviderAdapter;
   50   import org.jboss.logging.Logger;
   51   import org.jboss.mx.util.JBossNotificationBroadcasterSupport;
   52   import org.jboss.resource.adapter.jms.JmsResourceAdapter;
   53   import org.jboss.tm.TransactionManagerLocator;
   54   import org.jboss.util.Strings;
   55   import org.jboss.util.naming.Util;
   56   
   57   /**
   58    * A generic jms Activation.
   59    * 
   60    * @author <a href="adrian@jboss.com">Adrian Brock</a>
   61    * @version $Revision: 71554 $
   62    */
   63   public class JmsActivation implements ExceptionListener
   64   {
   65      /** The log */
   66      private static final Logger log = Logger.getLogger(JmsActivation.class);
   67      
   68      /** Notification sent before connectioning */
   69      private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING";
   70   
   71      /** Notification sent after connection */
   72      private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED";
   73   
   74      /** Notification sent before disconnection */
   75      private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING";
   76   
   77      /** Notification sent before disconnected */
   78      private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED";
   79   
   80      /** Notification sent at connection failure */
   81      private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE";
   82   
   83      /** The onMessage method */
   84      public static final Method ONMESSAGE; 
   85      
   86      /** The resource adapter */
   87      protected JmsResourceAdapter ra;
   88      
   89      /** The activation spec */
   90      protected JmsActivationSpec spec;
   91   
   92      /** The message endpoint factory */
   93      protected MessageEndpointFactory endpointFactory;
   94      
   95      /** The notification emitter */
   96      protected JBossNotificationBroadcasterSupport emitter;
   97      
   98      /** Whether delivery is active */
   99      protected AtomicBoolean deliveryActive = new AtomicBoolean(false);
  100   
  101      // Whether we are in the failure recovery loop
  102      private AtomicBoolean inFailure = new AtomicBoolean(false);
  103   
  104      /** The jms provider adapter */
  105      protected JMSProviderAdapter adapter;
  106      
  107      /** The destination */
  108      protected Destination destination;
  109      
  110      /** The connection */
  111      protected Connection connection;
  112      
  113      /** The server session pool */
  114      protected JmsServerSessionPool pool;
  115      
  116      /** Is the delivery transacted */
  117      protected boolean isDeliveryTransacted;
  118      
  119      /** The DLQ handler */
  120      protected DLQHandler dlqHandler;
  121      
  122      /** The TransactionManager */
  123      protected TransactionManager tm;
  124      
  125      
  126      static
  127      {
  128         try
  129         {
  130            ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
  131         }
  132         catch (Exception e)
  133         {
  134            throw new RuntimeException(e);
  135         }
  136      }
  137   
  138      public JmsActivation(JmsResourceAdapter ra, MessageEndpointFactory endpointFactory, JmsActivationSpec spec) throws ResourceException
  139      {
  140         this.ra = ra;
  141         this.endpointFactory = endpointFactory;
  142         this.spec = spec;
  143         try
  144         {
  145            this.isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ONMESSAGE);
  146         }
  147         catch (Exception e)
  148         {
  149            throw new ResourceException(e);
  150         }
  151         if (endpointFactory instanceof JBossNotificationBroadcasterSupport)
  152            emitter = (JBossNotificationBroadcasterSupport) endpointFactory;
  153      }
  154   
  155      /**
  156       * @return the activation spec
  157       */
  158      public JmsActivationSpec getActivationSpec()
  159      {
  160         return spec;
  161      }
  162   
  163      /**
  164       * @return the message endpoint factory
  165       */
  166      public MessageEndpointFactory getMessageEndpointFactory()
  167      {
  168         return endpointFactory;
  169      }
  170   
  171      /**
  172       * @return whether delivery is transacted
  173       */
  174      public boolean isDeliveryTransacted()
  175      {
  176         return isDeliveryTransacted;
  177      }
  178   
  179      /**
  180       * @return the work manager
  181       */
  182      public WorkManager getWorkManager()
  183      {
  184         return ra.getWorkManager();
  185      }
  186      
  187      public TransactionManager getTransactionManager()
  188      {
  189         if (tm == null)
  190            tm = TransactionManagerLocator.locateTransactionManager();
  191         return tm;
  192      }
  193   
  194      /**
  195       * @return the connection
  196       */
  197      public Connection getConnection()
  198      {
  199         return connection;
  200      }
  201   
  202      /**
  203       * @return the destination
  204       */
  205      public Destination getDestination()
  206      {
  207         return destination;
  208      }
  209      
  210      /**
  211       * @return the provider adapter 
  212       */
  213      public JMSProviderAdapter getProviderAdapter()
  214      {
  215         return adapter; 
  216      }
  217      
  218      /**
  219       * @return the dlq handler 
  220       */
  221      public DLQHandler getDLQHandler()
  222      {
  223         return dlqHandler; 
  224      }
  225      
  226      /**
  227       * Start the activation
  228       * 
  229       * @throws ResourceException for any error
  230       */
  231      public void start() throws ResourceException
  232      {
  233         deliveryActive.set(true);
  234         ra.getWorkManager().scheduleWork(new SetupActivation());
  235      }
  236   
  237      /**
  238       * Stop the activation
  239       */
  240      public void stop()
  241      {
  242         deliveryActive.set(false);
  243         teardown();
  244      }
  245   
  246      /**
  247       * Handles any failure by trying to reconnect
  248       * 
  249       * @param failure the reason for the failure
  250       */
  251      public void handleFailure(Throwable failure)
  252      {
  253         log.warn("Failure in jms activation " + spec, failure);
  254         int reconnectCount = 0;
  255         
  256         // Only enter the failure loop once
  257         if (inFailure.getAndSet(true))
  258            return;
  259         try
  260         {
  261            while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
  262            {
  263               teardown();
  264   
  265               sendNotification(FAILURE_NOTIFICATION, failure);
  266   
  267               try
  268               {
  269                  Thread.sleep(spec.getReconnectIntervalLong());
  270               }
  271               catch (InterruptedException e)
  272               {
  273                  log.debug("Interrupted trying to reconnect " + spec, e);
  274                  break;
  275               }
  276   
  277               log.info("Attempting to reconnect " + spec);
  278               try
  279               {
  280                  setup();
  281                  log.info("Reconnected with messaging provider.");            
  282                  break;
  283               }
  284               catch (Throwable t)
  285               {
  286                  log.error("Unable to reconnect " + spec, t);
  287               }
  288               ++reconnectCount;
  289            }
  290         }
  291         finally
  292         {
  293            // Leaving failure recovery loop
  294            inFailure.set(false);
  295         }
  296      }
  297   
  298      public void onException(JMSException exception)
  299      {
  300         handleFailure(exception);
  301      }
  302   
  303      public String toString()
  304      {
  305         StringBuffer buffer = new StringBuffer();
  306         buffer.append(Strings.defaultToString(this)).append('(');
  307         buffer.append("spec=").append(Strings.defaultToString(spec));
  308         buffer.append(" mepf=").append(Strings.defaultToString(endpointFactory));
  309         buffer.append(" active=").append(deliveryActive.get());
  310         if (destination != null)
  311            buffer.append(" destination=").append(destination);
  312         if (connection != null)
  313            buffer.append(" connection=").append(connection);
  314         if (pool != null)
  315            buffer.append(" pool=").append(Strings.defaultToString(pool));
  316         if (dlqHandler != null)
  317            buffer.append(" dlq=").append(Strings.defaultToString(dlqHandler));
  318         buffer.append(" transacted=").append(isDeliveryTransacted);
  319         buffer.append(')');
  320         return buffer.toString();
  321      }
  322   
  323      /**
  324       * Setup the activation
  325       * 
  326       * @throws Exception for any error
  327       */
  328      protected void setup() throws Exception
  329      {
  330         log.debug("Setting up " + spec);
  331         
  332         sendNotification(CONNECTING_NOTIFICATION, null);
  333   
  334         setupJMSProviderAdapter();
  335         Context ctx = adapter.getInitialContext();
  336         log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
  337         try
  338         {
  339            setupDLQ(ctx);
  340            setupDestination(ctx);
  341            setupConnection(ctx);
  342         }
  343         finally
  344         {
  345            ctx.close();
  346         }
  347         setupSessionPool();
  348         
  349         log.debug("Setup complete " + this);
  350   
  351         sendNotification(CONNECTED_NOTIFICATION, null);
  352      }
  353      
  354      /**
  355       * Teardown the activation
  356       */
  357      protected void teardown()
  358      {
  359         log.debug("Tearing down " + spec);
  360   
  361         sendNotification(DISCONNECTING_NOTIFICATION, null);
  362   
  363         teardownSessionPool();
  364         teardownConnection();
  365         teardownDestination();
  366         teardownDLQ();
  367   
  368         log.debug("Tearing down complete " + this);
  369         
  370         sendNotification(DISCONNECTED_NOTIFICATION, null);
  371      }
  372   
  373      /**
  374       * Get the jms provider
  375       * 
  376       * @throws Exception for any error
  377       */
  378      protected void setupJMSProviderAdapter() throws Exception
  379      {
  380         String providerAdapterJNDI = spec.getProviderAdapterJNDI();
  381         if (providerAdapterJNDI.startsWith("java:") == false)
  382            providerAdapterJNDI = "java:" + providerAdapterJNDI;
  383   
  384         log.debug("Retrieving the jms provider adapter " + providerAdapterJNDI + " for " + this);
  385         adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI, JMSProviderAdapter.class);
  386         log.debug("Using jms provider adapter " + adapter + " for " + this);
  387      }
  388      
  389      /**
  390       * Setup the DLQ
  391       *
  392       * @param ctx the naming context
  393       * @throws Exception for any error
  394       */
  395      protected void setupDLQ(Context ctx) throws Exception
  396      {
  397         if (spec.isUseDLQ())
  398         {
  399            Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
  400            dlqHandler = (DLQHandler) clazz.newInstance();
  401            dlqHandler.setup(this, ctx);
  402         }
  403         
  404         log.debug("Setup DLQ " + this);
  405      }
  406      
  407      /**
  408       * Teardown the DLQ
  409       */
  410      protected void teardownDLQ()
  411      {
  412         log.debug("Removing DLQ " + this);
  413         try
  414         {
  415            if (dlqHandler != null)
  416               dlqHandler.teardown();
  417         }
  418         catch (Throwable t)
  419         {
  420            log.debug("Error tearing down the DLQ " + dlqHandler, t);
  421         }
  422         dlqHandler = null;
  423      }
  424      
  425      /**
  426       * Setup the Destination
  427       *
  428       * @param ctx the naming context
  429       * @throws Exception for any error
  430       */
  431      protected void setupDestination(Context ctx) throws Exception
  432      {
  433         Class<?> destinationType;
  434         if (spec.isTopic())
  435            destinationType = Topic.class;
  436         else
  437            destinationType = Queue.class;
  438   
  439         String destinationName = spec.getDestination();
  440         log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
  441         destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
  442         log.debug("Got destination " + destination + " from " + destinationName);
  443      }
  444      
  445      /**
  446       * Teardown the destination
  447       */
  448      protected void teardownDestination()
  449      {
  450         destination = null;
  451      }
  452      
  453      /**
  454       * Setup the Connection
  455       *
  456       * @param ctx the naming context
  457       * @throws Exception for any error
  458       */
  459      protected void setupConnection(Context ctx) throws Exception
  460      {
  461         log.debug("setup connection " + this);
  462   
  463         String user = spec.getUser();
  464         String pass = spec.getPassword();
  465         String clientID = spec.getClientId();
  466         if (spec.isTopic())
  467            connection = setupTopicConnection(ctx, user, pass, clientID);
  468         else
  469            connection = setupQueueConnection(ctx, user, pass, clientID);
  470         
  471         log.debug("established connection " + this);
  472      }
  473      
  474      /**
  475       * Setup a Queue Connection
  476       *
  477       * @param ctx the naming context
  478       * @param user the user
  479       * @param pass the password
  480       * @param clientID the client id
  481       * @return the connection
  482       * @throws Exception for any error
  483       */
  484      protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception
  485      {
  486         String queueFactoryRef = adapter.getQueueFactoryRef();
  487         log.debug("Attempting to lookup queue connection factory " + queueFactoryRef);
  488         QueueConnectionFactory qcf = (QueueConnectionFactory) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory.class);
  489         log.debug("Got queue connection factory " + qcf + " from " + queueFactoryRef);
  490         log.debug("Attempting to create queue connection with user " + user);
  491         QueueConnection result;
  492         if (qcf instanceof XAQueueConnectionFactory && isDeliveryTransacted)
  493         {
  494            XAQueueConnectionFactory xaqcf = (XAQueueConnectionFactory) qcf;
  495            if (user != null)
  496               result = xaqcf.createXAQueueConnection(user, pass);
  497            else
  498               result = xaqcf.createXAQueueConnection();
  499         }
  500         else
  501         {
  502            if (user != null)
  503               result = qcf.createQueueConnection(user, pass);
  504            else
  505               result = qcf.createQueueConnection();
  506         }
  507         try
  508         {
  509            if (clientID != null)
  510               result.setClientID(clientID);
  511            result.setExceptionListener(this);
  512            log.debug("Using queue connection " + result);
  513            return result;
  514         }
  515         catch (Throwable t)
  516         {
  517            try
  518            {
  519               result.close();
  520            }
  521            catch (Exception e)
  522            {
  523               log.trace("Ignored error closing connection", e);
  524            }
  525            if (t instanceof Exception)
  526               throw (Exception) t;
  527            throw new RuntimeException("Error configuring connection", t);
  528         }
  529      }
  530      
  531      /**
  532       * Setup a Topic Connection
  533       *
  534       * @param ctx the naming context
  535       * @param user the user
  536       * @param pass the password
  537       * @param clientID the client id
  538       * @return the connection
  539       * @throws Exception for any error
  540       */
  541      protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception
  542      {
  543         String topicFactoryRef = adapter.getTopicFactoryRef();
  544         log.debug("Attempting to lookup topic connection factory " + topicFactoryRef);
  545         TopicConnectionFactory tcf = (TopicConnectionFactory) Util.lookup(ctx, topicFactoryRef, TopicConnectionFactory.class);
  546         log.debug("Got topic connection factory " + tcf + " from " + topicFactoryRef);
  547         log.debug("Attempting to create topic connection with user " + user);
  548         TopicConnection result;
  549         if (tcf instanceof XATopicConnectionFactory && isDeliveryTransacted)
  550         {
  551            XATopicConnectionFactory xatcf = (XATopicConnectionFactory) tcf;
  552            if (user != null)
  553               result = xatcf.createXATopicConnection(user, pass);
  554            else
  555               result = xatcf.createXATopicConnection();
  556         }
  557         else
  558         {
  559            if (user != null)
  560               result = tcf.createTopicConnection(user, pass);
  561            else
  562               result = tcf.createTopicConnection();
  563         }
  564         try
  565         {
  566            if (clientID != null)
  567               result.setClientID(clientID);
  568            result.setExceptionListener(this);
  569            log.debug("Using topic connection " + result);
  570            return result;
  571         }
  572         catch (Throwable t)
  573         {
  574            try
  575            {
  576               result.close();
  577            }
  578            catch (Exception e)
  579            {
  580               log.trace("Ignored error closing connection", e);
  581            }
  582            if (t instanceof Exception)
  583               throw (Exception) t;
  584            throw new RuntimeException("Error configuring connection", t);
  585         }
  586      }
  587      
  588      /**
  589       * Teardown the connection
  590       */
  591      protected void teardownConnection()
  592      {
  593         try
  594         {
  595            if (connection != null)
  596            {
  597               log.debug("Closing the " + connection);
  598               connection.close();
  599            }
  600         }
  601         catch (Throwable t)
  602         {
  603            log.debug("Error closing the connection " + connection, t);
  604         }
  605         connection = null;
  606      }
  607      
  608      /**
  609       * Setup the server session pool
  610       * 
  611       * @throws Exception for any error
  612       */
  613      protected void setupSessionPool() throws Exception
  614      {
  615         pool = new JmsServerSessionPool(this);
  616         log.debug("Created session pool " + pool);
  617         
  618         log.debug("Starting session pool " + pool);
  619         pool.start();
  620         log.debug("Started session pool " + pool);
  621         
  622         log.debug("Starting delivery " + connection);
  623         connection.start();
  624         log.debug("Started delivery " + connection);
  625      }
  626      
  627      /**
  628       * Teardown the server session pool
  629       */
  630      protected void teardownSessionPool()
  631      {
  632         try
  633         {
  634            if (connection != null)
  635            {
  636               log.debug("Stopping delivery " + connection);
  637               connection.stop();
  638            }
  639         }
  640         catch (Throwable t)
  641         {
  642            log.debug("Error stopping delivery " + connection, t);
  643         }
  644   
  645         try
  646         {
  647            if (pool != null)
  648            {
  649               log.debug("Stopping the session pool " + pool);
  650               pool.stop();
  651            }
  652         }
  653         catch (Throwable t)
  654         {
  655            log.debug("Error clearing the pool " + pool, t);
  656         }
  657         pool = null;
  658      }
  659   
  660      /**
  661       * Notify of an event
  662       * 
  663       * @param event the event
  664       * @param userData any user data, e.g. the exception on a failure
  665       */
  666      protected void sendNotification(String event, Object userData)
  667      {
  668         if (emitter == null)
  669            return;
  670         
  671         try
  672         {
  673            Notification notif = new Notification(event, spec, emitter.nextNotificationSequenceNumber());
  674            notif.setUserData(userData);
  675            emitter.sendNotification(notif);
  676         }
  677         catch (Throwable t)
  678         {
  679            log.warn("Error sending notification: " + event, t);
  680         }
  681      }
  682   
  683      /**
  684       * Handles the setup
  685       */
  686      private class SetupActivation implements Work
  687      {
  688         public void run()
  689         {
  690            try
  691            {
  692               setup();
  693            }
  694            catch (Throwable t)
  695            {
  696               handleFailure(t);
  697            }
  698         }
  699   
  700         public void release()
  701         {
  702         }
  703      }
  704   }

Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.resource.adapter » jms » inflow » [javadoc | source]