Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » jms » asf » [javadoc | source]
    1   /*
    2    * JBoss, the OpenSource J2EE webOS
    3    *
    4    * Distributable under LGPL license.
    5    * See terms of license at gnu.org.
    6    */
    7   package org.jboss.jms.asf;
    8   
    9   import javax.jms.JMSException;
   10   import javax.jms.Message;
   11   import javax.jms.MessageListener;
   12   import javax.jms.ServerSession;
   13   import javax.jms.Session;
   14   import javax.jms.XASession;
   15   import javax.naming.InitialContext;
   16   import javax.transaction.Status;
   17   import javax.transaction.Transaction;
   18   import javax.transaction.TransactionManager;
   19   import javax.transaction.xa.XAResource;
   20   import javax.transaction.xa.Xid;
   21   
   22   import org.jboss.logging.Logger;
   23   import org.jboss.tm.TransactionManagerService;
   24   import org.jboss.tm.XidFactoryMBean;
   25   
   26   /**
   27    * An implementation of ServerSession. <p>
   28    *
   29    * Created: Thu Dec 7 18:25:40 2000
   30    *
   31    * @author    <a href="mailto:peter.antman@tim.se">Peter Antman</a> .
   32    * @author    <a href="mailto:jason@planet57.com">Jason Dillon</a>
   33    * @author    <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a> .
   34    * @version   $Revision: 1.14.2.2 $
   35    */
   36   public class StdServerSession
   37      implements Runnable, ServerSession, MessageListener
   38   {
   39      /**
   40       * Instance logger.
   41       */
   42      static Logger log = Logger.getLogger(StdServerSession.class);
   43      
   44      /**
   45       * The server session pool which we belong to.
   46       */
   47      private StdServerSessionPool serverSessionPool;
   48      
   49      /**
   50       * Our session resource.
   51       */
   52      private Session session;
   53      
   54      /**
   55       * Our XA session resource.
   56       */
   57      private XASession xaSession;
   58      
   59      /**
   60       * The transaction manager that we will use for transactions.
   61       */
   62      private TransactionManager tm;
   63      
   64      /**
   65       * Use the session's XAResource directly if we have an JBossMQ XASession.
   66       * this allows us to get around the TX timeout problem when you have
   67       * extensive message processing.
   68       */
   69      private boolean useLocalTX;
   70      
   71      /**
   72       * The listener to delegate calls, to. In our case the container invoker.
   73       */
   74      private MessageListener delegateListener;
   75   
   76      private XidFactoryMBean xidFactory;
   77      
   78      /**
   79       * Create a <tt>StdServerSession</tt> .
   80       *
   81       * @param pool              The server session pool which we belong to.
   82       * @param session           Our session resource.
   83       * @param xaSession         Our XA session resource.
   84       * @param delegateListener  Listener to call when messages arrives.
   85       * @param useLocalTX       Will this session be used in a global TX (we can optimize with 1 phase commit)
   86       * @throws JMSException     Transation manager was not found.
   87       * @exception JMSException  Description of Exception
   88       */
   89      StdServerSession(final StdServerSessionPool pool,
   90                       final Session session,
   91                       final XASession xaSession,
   92                       final MessageListener delegateListener,
   93                       boolean useLocalTX,
   94                       final XidFactoryMBean xidFactory)
   95         throws JMSException
   96      {
   97         // assert pool != null
   98         // assert session != null
   99         
  100         this.serverSessionPool = pool;
  101         this.session = session;
  102         this.xaSession = xaSession;
  103         this.delegateListener = delegateListener;
  104         if( xaSession == null )
  105            useLocalTX = false;
  106         this.useLocalTX = useLocalTX;
  107         this.xidFactory = xidFactory;
  108   
  109         if (log.isDebugEnabled())
  110            log.debug("initializing (pool, session, xaSession, useLocalTX): " +
  111               pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
  112         
  113         // Set out self as message listener
  114         if (xaSession != null)
  115            xaSession.setMessageListener(this);
  116         else
  117            session.setMessageListener(this);
  118         
  119         InitialContext ctx = null;
  120         try
  121         {
  122            ctx = new InitialContext();
  123            tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
  124         }
  125         catch (Exception e)
  126         {
  127            throw new JMSException("Transation manager was not found");
  128         }
  129         finally
  130         {
  131            if (ctx != null)
  132            {
  133               try
  134               {
  135                  ctx.close();
  136               }
  137               catch (Exception ignore)
  138               {
  139               }
  140            }
  141         }
  142      }
  143      
  144      // --- Impl of JMS standard API
  145      
  146      /**
  147       * Returns the session. <p>
  148       *
  149       * This simply returns what it has fetched from the connection. It is up to
  150       * the jms provider to typecast it and have a private API to stuff messages
  151       * into it.
  152       *
  153       * @return                  The session.
  154       * @exception JMSException  Description of Exception
  155       */
  156      public Session getSession() throws JMSException
  157      {
  158         if (xaSession != null)
  159            return xaSession;
  160         else
  161            return session;
  162      }
  163      
  164      //--- Protected parts, used by other in the package
  165      
  166      /**
  167       * Runs in an own thread, basically calls the session.run(), it is up to the
  168       * session to have been filled with messages and it will run against the
  169       * listener set in StdServerSessionPool. When it has send all its messages it
  170       * returns.
  171       */
  172      public void run()
  173      {
  174         boolean trace = log.isTraceEnabled();
  175         if (trace)
  176            log.trace("running...");
  177         try
  178         {
  179            if (xaSession != null)
  180               xaSession.run();
  181            else
  182               session.run();
  183         }
  184         finally
  185         {
  186            if (trace)
  187               log.trace("recycling...");
  188           
  189            recycle();
  190   
  191            if (trace)
  192               log.trace("finished run");
  193         }
  194      }
  195   
  196      /**
  197       * Will get called from session for each message stuffed into it.
  198       *
  199       * Starts a transaction with the TransactionManager
  200       * and enlists the XAResource of the JMS XASession if a XASession was
  201       * available. A good JMS implementation should provide the XASession for use
  202       * in the ASF. So we optimize for the case where we have an XASession. So,
  203       * for the case where we do not have an XASession and the bean is not
  204       * transacted, we have the unneeded overhead of creating a Transaction. I'm
  205       * leaving it this way since it keeps the code simpler and that case should
  206       * not be too common (JBossMQ provides XASessions).
  207       */
  208      public void onMessage(Message msg)
  209      {      
  210         boolean trace = log.isTraceEnabled();
  211         if( trace )
  212            log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
  213            ", " + session + ", " + xaSession + ", " + useLocalTX);
  214   
  215         // Used if run with useLocalTX if true
  216         Xid localXid = null;
  217         boolean localRollbackFlag=false;
  218         // Used if run with useLocalTX if false
  219         Transaction trans = null;
  220         try
  221         {
  222            
  223            if (useLocalTX)
  224            {
  225               // Use JBossMQ One Phase Commit to commit the TX
  226               localXid = xidFactory.newXid();//new XidImpl();
  227               XAResource res = xaSession.getXAResource();
  228               res.start(localXid, XAResource.TMNOFLAGS);
  229               
  230               if( trace )
  231                  log.trace("Using optimized 1p commit to control TX.");
  232            }
  233            else
  234            {
  235               
  236               // Use the TM to control the TX
  237               tm.begin();
  238               trans = tm.getTransaction();
  239               
  240               if (xaSession != null)
  241               {
  242                  XAResource res = xaSession.getXAResource();
  243                  if (!trans.enlistResource(res))
  244   	       {
  245   		  throw new JMSException("could not enlist resource");
  246   	       }
  247                  if( trace )
  248                     log.trace("XAResource '" + res + "' enlisted.");
  249               }
  250            }
  251            //currentTransactionId = connection.spyXAResourceManager.startTx();
  252            
  253            // run the session
  254            //session.run();
  255            // Call delegate listener
  256            delegateListener.onMessage(msg);
  257         }
  258         catch (Exception e)
  259         {
  260            log.error("session failed to run; setting rollback only", e);
  261            
  262            if (useLocalTX)
  263            {
  264               // Use JBossMQ One Phase Commit to commit the TX
  265               localRollbackFlag = true;
  266            }
  267            else
  268            {
  269               // Mark for tollback TX via TM
  270               try
  271               {
  272                  // The transaction will be rolledback in the finally
  273                  if( trace )
  274                     log.trace("Using TM to mark TX for rollback.");
  275                  trans.setRollbackOnly();
  276               }
  277               catch (Exception x)
  278               {
  279                  log.error("failed to set rollback only", x);
  280               }
  281            }
  282            
  283         }
  284         finally
  285         {
  286            try
  287            {
  288               if (useLocalTX)
  289               {
  290                  if( localRollbackFlag == true  )
  291                  {
  292                     if( trace )
  293                        log.trace("Using optimized 1p commit to rollback TX.");
  294                     
  295                     XAResource res = xaSession.getXAResource();
  296                     res.end(localXid, XAResource.TMSUCCESS);
  297                     res.rollback(localXid);
  298                     
  299                  }
  300                  else
  301                  {
  302                     if( trace )
  303                        log.trace("Using optimized 1p commit to commit TX.");
  304                     
  305                     XAResource res = xaSession.getXAResource();
  306                     res.end(localXid, XAResource.TMSUCCESS);
  307                     res.commit(localXid, true);
  308                  }
  309               }
  310               else
  311               {
  312                  // Use the TM to commit the Tx (assert the correct association) 
  313                  Transaction currentTx = tm.getTransaction();
  314                  if (trans.equals(currentTx) == false)
  315                     throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
  316   
  317                  // Marked rollback
  318                  if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
  319                  {
  320                     if( trace )
  321                        log.trace("Rolling back JMS transaction");
  322                     // actually roll it back
  323                     tm.rollback();
  324                     
  325                     // NO XASession? then manually rollback.
  326                     // This is not so good but
  327                     // it's the best we can do if we have no XASession.
  328                     if (xaSession == null && serverSessionPool.isTransacted())
  329                     {
  330                        session.rollback();
  331                     }
  332                  }
  333                  else if (trans.getStatus() == Status.STATUS_ACTIVE)
  334                  {
  335                     // Commit tx
  336                     // This will happen if
  337                     // a) everything goes well
  338                     // b) app. exception was thrown
  339                     if( trace )
  340                        log.trace("Commiting the JMS transaction");
  341                     tm.commit();
  342   
  343                     // NO XASession? then manually commit.  This is not so good but
  344                     // it's the best we can do if we have no XASession.
  345                     if (xaSession == null && serverSessionPool.isTransacted())
  346                     {
  347                        session.commit();
  348                     }
  349                  }
  350               }            
  351            }
  352            catch (Exception e)
  353            {
  354               log.error("failed to commit/rollback", e);
  355            }
  356         }
  357         if( trace )
  358            log.trace("onMessage done");
  359      }
  360   
  361      /**
  362       * Start the session and begin consuming messages.
  363       *
  364       * @throws JMSException  No listener has been specified.
  365       */
  366      public void start() throws JMSException
  367      {
  368         log.trace("starting invokes on server session");
  369         
  370         if (session != null)
  371         {
  372            try
  373            {
  374               serverSessionPool.getExecutor().execute(this);
  375            }
  376            catch (InterruptedException ignore)
  377            {
  378            }
  379         }
  380         else
  381         {
  382            throw new JMSException("No listener has been specified");
  383         }
  384      }
  385   
  386      /**
  387       * Called by the ServerSessionPool when the sessions should be closed.
  388       */
  389      void close()
  390      {
  391         if (session != null)
  392         {
  393            try
  394            {
  395               session.close();
  396            }
  397            catch (Exception ignore)
  398            {
  399            }
  400   
  401            session = null;
  402         }
  403   
  404         if (xaSession != null)
  405         {
  406            try
  407            {
  408               xaSession.close();
  409            }
  410            catch (Exception ignore)
  411            {
  412            }
  413            xaSession = null;
  414         }
  415   
  416         log.debug("closed");
  417      }
  418      
  419      /**
  420       * This method is called by the ServerSessionPool when it is ready to be
  421       * recycled intot the pool
  422       */
  423      void recycle()
  424      {
  425         serverSessionPool.recycle(this);
  426      }
  427      
  428   }
  429   

Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » jms » asf » [javadoc | source]