Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.resource.adapter » jms » inflow » [javadoc | source]
    1   /*
    2    * JBoss, Home of Professional Open Source.
    3    * Copyright 2006, Red Hat Middleware LLC, and individual contributors
    4    * as indicated by the @author tags. See the copyright.txt file in the
    5    * distribution for a full listing of individual contributors.
    6    *
    7    * This is free software; you can redistribute it and/or modify it
    8    * under the terms of the GNU Lesser General Public License as
    9    * published by the Free Software Foundation; either version 2.1 of
   10    * the License, or (at your option) any later version.
   11    *
   12    * This software is distributed in the hope that it will be useful,
   13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15    * Lesser General Public License for more details.
   16    *
   17    * You should have received a copy of the GNU Lesser General Public
   18    * License along with this software; if not, write to the Free
   19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21    */
   22   package org.jboss.resource.adapter.jms.inflow;
   23   
   24   import java.util.ArrayList;
   25   
   26   import javax.jms.Connection;
   27   import javax.jms.ConnectionConsumer;
   28   import javax.jms.JMSException;
   29   import javax.jms.Queue;
   30   import javax.jms.ServerSession;
   31   import javax.jms.ServerSessionPool;
   32   import javax.jms.Topic;
   33   
   34   import org.jboss.logging.Logger;
   35   
   36   /**
   37    * A generic jms session pool.
   38    * 
   39    * @author <a href="adrian@jboss.com">Adrian Brock</a>
   40    * @version $Revision: 72163 $
   41    */
   42   public class JmsServerSessionPool implements ServerSessionPool
   43   {
   44      /** The logger */
   45      private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
   46         
   47      /** The activation */
   48      JmsActivation activation;
   49   
   50      /** The consumer */
   51      ConnectionConsumer consumer;
   52   
   53      /** The server sessions */
   54      ArrayList serverSessions = new ArrayList();
   55      
   56      /** Whether the pool is stopped */
   57      boolean stopped = false;
   58      
   59      /** The number of sessions */
   60      int sessionCount = 0;
   61      
   62      
   63      /**
   64       * Create a new session pool
   65       * 
   66       * @param activation the jms activation
   67       */
   68      public JmsServerSessionPool(JmsActivation activation)
   69      {
   70         this.activation = activation;
   71      }
   72   
   73      /**
   74       * @return the activation
   75       */
   76      public JmsActivation getActivation()
   77      {
   78         return activation;
   79      }
   80      
   81      /**
   82       * Start the server session pool
   83       * 
   84       * @throws Exeption for any error
   85       */
   86      public void start() throws Exception
   87      {
   88         setupSessions();
   89         setupConsumer();
   90      }
   91   
   92      /**
   93       * Stop the server session pool
   94       */
   95      public void stop()
   96      {
   97         teardownConsumer();
   98         teardownSessions();
   99      }
  100      
  101      public ServerSession getServerSession() throws JMSException
  102      {
  103         boolean trace = log.isTraceEnabled();
  104         if (trace)
  105            log.trace("getServerSession");
  106   
  107         ServerSession result = null;
  108         
  109         try
  110         {
  111            synchronized (serverSessions)
  112            {
  113               while (true)
  114               {
  115                  int sessionsSize = serverSessions.size();
  116                  
  117                  if (stopped)
  118                     throw new Exception("Cannot get a server session after the pool is stopped");
  119                  
  120                  else if (sessionsSize > 0)
  121                  {
  122                     result = (ServerSession) serverSessions.remove(sessionsSize-1);
  123                     break;
  124                  }
  125                  
  126                  else
  127                  {
  128                     try
  129                     {
  130                        serverSessions.wait();
  131                     }
  132                     catch (InterruptedException ignored)
  133                     {
  134                     }
  135                  }
  136               }
  137            }
  138         }
  139         catch (Throwable t)
  140         {
  141            throw new JMSException("Unable to get a server session " + t);
  142         }
  143         
  144         if (trace)
  145            log.trace("Returning server session " + result);
  146         
  147         return result;
  148      }
  149   
  150      /**
  151       * Return the server session
  152       * 
  153       * @param session the session
  154       */
  155      protected void returnServerSession(JmsServerSession session)
  156      {
  157         synchronized (serverSessions)
  158         {
  159            if (stopped)
  160            {
  161               session.teardown();
  162               --sessionCount;
  163            }
  164            else
  165               serverSessions.add(session);
  166            serverSessions.notifyAll();
  167         }
  168      }
  169      
  170      /**
  171       * Setup the sessions
  172       * 
  173       * @throws Exeption for any error
  174       */
  175      protected void setupSessions() throws Exception
  176      {
  177         JmsActivationSpec spec = activation.getActivationSpec();
  178         ArrayList clonedSessions = null;
  179         
  180         // Create the sessions
  181         synchronized (serverSessions)
  182         {
  183            for (int i = 0; i < spec.getMaxSessionInt(); ++i)
  184            {
  185               JmsServerSession session = new JmsServerSession(this);
  186               serverSessions.add(session);
  187            }
  188            sessionCount = serverSessions.size();
  189            clonedSessions = (ArrayList) serverSessions.clone();
  190   
  191         }
  192         
  193         // Start the sessions
  194         for (int i = 0; i < clonedSessions.size(); ++ i)
  195         {
  196            JmsServerSession session = (JmsServerSession) clonedSessions.get(i);
  197            session.setup();
  198         }
  199      }
  200   
  201      /**
  202       * Stop the sessions
  203       */
  204      protected void teardownSessions()
  205      {
  206         synchronized (serverSessions)
  207         {
  208            // Disallow any new sessions
  209            stopped = true;
  210            serverSessions.notifyAll();
  211            
  212            // Stop inactive sessions
  213            for (int i = 0; i < serverSessions.size(); ++i)
  214            {
  215               JmsServerSession session = (JmsServerSession) serverSessions.get(i);
  216               session.teardown();
  217               --sessionCount;
  218            }
  219   
  220            serverSessions.clear();
  221   
  222            if (activation.getActivationSpec().isForceClearOnShutdown())
  223            {        
  224               int attempts = 0;
  225               int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
  226               long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
  227               
  228               log.trace(this + " force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
  229              
  230               while((sessionCount > 0) && (attempts < forceClearAttempts))
  231               {
  232                  try
  233                  {
  234                     int currentSessions = sessionCount;
  235                     serverSessions.wait(forceClearInterval);
  236                     // Number of session didn't change
  237                     if (sessionCount == currentSessions)
  238                     {
  239                        ++attempts;
  240                        log.trace(this + " clear attempt failed " + attempts); 
  241                     }
  242                  }
  243                  catch(InterruptedException ignore)
  244                  {
  245                  }
  246               
  247               }
  248            }
  249            else
  250            {
  251               // Wait for inuse sessions
  252               while (sessionCount > 0)
  253               {
  254                  try
  255                  {
  256                     serverSessions.wait();
  257                  }
  258                  catch (InterruptedException ignore)
  259                  {
  260                  }
  261               }
  262            }
  263         }
  264      }
  265      
  266      /**
  267       * Setup the connection consumer
  268       * 
  269       * @throws Exeption for any error
  270       */
  271      protected void setupConsumer() throws Exception
  272      {
  273         Connection connection = activation.getConnection();
  274         JmsActivationSpec spec = activation.getActivationSpec();
  275         String selector = spec.getMessageSelector();
  276         int maxMessages = spec.getMaxMessagesInt();
  277         if (spec.isTopic())
  278         {
  279            Topic topic = (Topic) activation.getDestination();
  280            String subscriptionName = spec.getSubscriptionName();
  281            if (spec.isDurable())
  282               consumer = connection.createDurableConnectionConsumer(topic, subscriptionName, selector, this, maxMessages);
  283            else
  284               consumer = connection.createConnectionConsumer(topic, selector, this, maxMessages);
  285         }
  286         else
  287         {
  288            Queue queue = (Queue) activation.getDestination();
  289            consumer = connection.createConnectionConsumer(queue, selector, this, maxMessages);
  290         }
  291         log.debug("Created consumer " + consumer);
  292      }
  293   
  294      /**
  295       * Stop the connection consumer
  296       */
  297      protected void teardownConsumer()
  298      {
  299         try
  300         {
  301            if (consumer != null)
  302            {
  303               log.debug("Closing the " + consumer);
  304               consumer.close();
  305            }
  306         }
  307         catch (Throwable t)
  308         {
  309            log.debug("Error closing the consumer " + consumer, t);
  310         }
  311      }
  312   
  313   }

Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.resource.adapter » jms » inflow » [javadoc | source]