Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » resource » connectionmanager » [javadoc | source]
    1   /*
    2    * JBoss, Home of Professional Open Source.
    3    * Copyright 2006, Red Hat Middleware LLC, and individual contributors
    4    * as indicated by the @author tags. See the copyright.txt file in the
    5    * distribution for a full listing of individual contributors.
    6    *
    7    * This is free software; you can redistribute it and/or modify it
    8    * under the terms of the GNU Lesser General Public License as
    9    * published by the Free Software Foundation; either version 2.1 of
   10    * the License, or (at your option) any later version.
   11    *
   12    * This software is distributed in the hope that it will be useful,
   13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15    * Lesser General Public License for more details.
   16    *
   17    * You should have received a copy of the GNU Lesser General Public
   18    * License along with this software; if not, write to the Free
   19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21    */
   22   package org.jboss.resource.connectionmanager;
   23   
   24   import java.util.ArrayList;
   25   import java.util.Collection;
   26   import java.util.HashSet;
   27   import java.util.Iterator;
   28   import java.util.Set;
   29   
   30   import javax.management.ObjectName;
   31   import javax.naming.InitialContext;
   32   import javax.resource.ResourceException;
   33   import javax.resource.spi.ConnectionEvent;
   34   import javax.resource.spi.ConnectionRequestInfo;
   35   import javax.resource.spi.LocalTransaction;
   36   import javax.resource.spi.ManagedConnection;
   37   import javax.security.auth.Subject;
   38   import javax.transaction.RollbackException;
   39   import javax.transaction.Status;
   40   import javax.transaction.Synchronization;
   41   import javax.transaction.SystemException;
   42   import javax.transaction.Transaction;
   43   import javax.transaction.TransactionManager;
   44   import javax.transaction.xa.XAException;
   45   import javax.transaction.xa.XAResource;
   46   import javax.transaction.xa.Xid;
   47   
   48   import org.jboss.logging.Logger;
   49   import org.jboss.resource.JBossResourceException;
   50   import org.jboss.resource.connectionmanager.xa.XAResourceWrapper;
   51   import org.jboss.tm.LastResource;
   52   import org.jboss.tm.TransactionTimeoutConfiguration;
   53   import org.jboss.tm.TxUtils;
   54   import org.jboss.util.NestedRuntimeException;
   55   
   56   import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
   57   
   58   /**
   59    * The TxConnectionManager is a JBoss ConnectionManager
   60    * implementation for jca adapters implementing LocalTransaction and XAResource support.
   61    * 
   62    * It implements a ConnectionEventListener that implements XAResource to
   63    * manage transactions through the Transaction Manager. To assure that all
   64    * work in a local transaction occurs over the same ManagedConnection, it
   65    * includes a xid to ManagedConnection map.  When a Connection is requested
   66    * or a transaction started with a connection handle in use, it checks to
   67    * see if a ManagedConnection already exists enrolled in the global
   68    * transaction and uses it if found. Otherwise a free ManagedConnection
   69    * has its LocalTransaction started and is used.  From the
   70    * BaseConnectionManager2, it includes functionality to obtain managed
   71    * connections from
   72    * a ManagedConnectionPool mbean, find the Subject from a SubjectSecurityDomain,
   73    * and interact with the CachedConnectionManager for connections held over
   74    * transaction and method boundaries.  Important mbean references are to a
   75    * ManagedConnectionPool supplier (typically a JBossManagedConnectionPool), and a
   76    * RARDeployment representing the ManagedConnectionFactory.
   77    *
   78    * This connection manager has to perform the following operations:
   79    *
   80    * 1. When an application component requests a new ConnectionHandle,
   81    *    it must find a ManagedConnection, and make sure a
   82    *    ConnectionEventListener is registered. It must inform the
   83    *    CachedConnectionManager that a connection handle has been given
   84    *    out. It needs to count the number of handles for each
   85    *    ManagedConnection.  If there is a current transaction, it must
   86    *    enlist the ManagedConnection's LocalTransaction in the transaction
   87    *    using the ConnectionEventListeners XAResource XAResource implementation.
   88    * Entry point: ConnectionManager.allocateConnection.
   89    * written.
   90    *
   91    * 2. When a ConnectionClosed event is received from the
   92    *    ConnectionEventListener, it must reduce the handle count.  If
   93    *    the handle count is zero, the XAResource should be delisted from
   94    *    the Transaction, if any. The CachedConnectionManager must be
   95    *    notified that the connection is closed.
   96    * Entry point: ConnectionEventListener.ConnectionClosed.
   97    * written
   98    *
   99    *3. When a transaction begun notification is received from the
  100    * UserTransaction (via the CachedConnectionManager, all
  101    * managedConnections associated with the current object must be
  102    * enlisted in the transaction.
  103    *  Entry point: (from
  104    * CachedConnectionManager)
  105    * ConnectionCacheListener.transactionStarted(Transaction,
  106    * Collection). The collection is of ConnectionRecord objects.
  107    * written.
  108    *
  109    * 5. When an "entering object" notification is received from the
  110    * CachedConnectionInterceptor, all the connections for the current
  111    * object must be associated with a ManagedConnection.  if there is a
  112    * Transaction, the XAResource must be enlisted with it.
  113    *  Entry point: ConnectionCacheListener.reconnect(Collection conns) The Collection
  114    * is of ConnectionRecord objects.
  115    * written.
  116    *
  117    * 6. When a "leaving object" notification is received from the
  118    * CachedConnectionInterceptor, all the managedConnections for the
  119    * current object must have their XAResources delisted from the
  120    * current Transaction, if any, and cleanup called on each
  121    * ManagedConnection.
  122    * Entry point: ConnectionCacheListener.disconnect(Collection conns).
  123    * written.
  124    *
  125    * @author <a href="mailto:d_jencks@users.sourceforge.net">David Jencks</a>
  126    * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
  127    * @author <a href="weston.price@jboss.com">Weston Price</a>
  128    * @version $Revision: 71783 $
  129    */
  130   public class TxConnectionManager extends BaseConnectionManager2 implements TxConnectionManagerMBean
  131   {
  132      private static final Throwable FAILED_TO_ENLIST = new Throwable("Unabled to enlist resource, see the previous warnings."); 
  133   
  134      private ObjectName transactionManagerService;
  135      
  136      //use the object name, please
  137      private String tmName;
  138   
  139      private TransactionManager tm;
  140   
  141      private boolean trackConnectionByTx = false;
  142   
  143      private boolean localTransactions;
  144      
  145      private int xaResourceTimeout = 0;
  146      
  147      private boolean padXid;
  148      
  149      private boolean wrapXAResource;
  150   
  151      private Boolean isSameRMOverrideValue;
  152      
  153      protected static void rethrowAsSystemException(String context, Transaction tx, Throwable t)
  154         throws SystemException
  155      {
  156         if (t instanceof SystemException)
  157            throw (SystemException) t;
  158         if (t instanceof RuntimeException)
  159            throw (RuntimeException) t;
  160         if (t instanceof Error)
  161            throw (Error) t;
  162         if (t instanceof RollbackException)
  163            throw new IllegalStateException(context + " tx=" + tx + " marked for rollback.");
  164         throw new NestedRuntimeException(context + " tx=" + tx + " got unexpected error ", t);
  165      }
  166      
  167      /**
  168       * Default managed TxConnectionManager constructor for mbean instances.
  169       */
  170      public TxConnectionManager()
  171      {
  172      }
  173   
  174      /**
  175       * Creates a new <code>TxConnectionManager</code> instance.
  176       * for TESTING ONLY!!! not a managed constructor!!
  177       * 
  178       * @param ccm a <code>CachedConnectionManager</code> value
  179       * @param poolingStrategy a <code>ManagedConnectionPool</code> value
  180       * @param tm a <code>TransactionManager</code> value
  181       */
  182      public TxConnectionManager (final CachedConnectionManager ccm,
  183                                  final ManagedConnectionPool poolingStrategy,
  184                                  final TransactionManager tm)
  185      {
  186         super(ccm, poolingStrategy);
  187         this.tm = tm;
  188      }
  189   
  190      public ObjectName getTransactionManagerService()
  191      {
  192         return transactionManagerService;
  193      }
  194   
  195      public void setTransactionManagerService(ObjectName transactionManagerService)
  196      {
  197         this.transactionManagerService = transactionManagerService;
  198      }
  199   
  200      /**
  201       * @deprecated
  202       */
  203      public void setTransactionManager(final String tmName)
  204      {
  205         this.tmName = tmName;
  206      }
  207   
  208      /**
  209       * @deprecated
  210       */
  211      public String getTransactionManager()
  212      {
  213         return this.tmName;
  214      }
  215   
  216      public TransactionManager getTransactionManagerInstance()
  217      {
  218         return tm;
  219      }
  220   
  221      public void setTransactionManagerInstance(TransactionManager tm)
  222      {
  223         this.tm = tm;
  224      }
  225   
  226      public boolean isTrackConnectionByTx()
  227      {
  228         return trackConnectionByTx;
  229      }
  230   
  231      public void setTrackConnectionByTx(boolean trackConnectionByTx)
  232      {
  233         this.trackConnectionByTx = trackConnectionByTx;
  234      }
  235   
  236      public boolean isLocalTransactions()
  237      {
  238         return localTransactions;
  239      }
  240   
  241      public void setLocalTransactions(boolean localTransactions)
  242      {
  243         this.localTransactions = localTransactions;
  244         if (localTransactions)
  245            setTrackConnectionByTx(true);
  246      }
  247   
  248      public int getXAResourceTransactionTimeout()
  249      {
  250         return xaResourceTimeout;
  251      }
  252      
  253      public void setXAResourceTransactionTimeout(int timeout)
  254      {
  255         this.xaResourceTimeout = timeout;
  256      }
  257      
  258      /**
  259       * Get the IsSameRMOverrideValue value.
  260       * 
  261       * @return the IsSameRMOverrideValue value.
  262       */
  263      public Boolean getIsSameRMOverrideValue()
  264      {
  265         return isSameRMOverrideValue;
  266      }
  267      
  268      public boolean getWrapXAResource()
  269      {      
  270         return wrapXAResource;      
  271      }
  272      
  273      public void setWrapXAResource(boolean useXAWrapper)
  274      {
  275         this.wrapXAResource = useXAWrapper;
  276         
  277      }
  278      
  279      public boolean getPadXid()
  280      {
  281         return this.padXid;
  282         
  283      }
  284      
  285      public void setPadXid(boolean padXid)
  286      {
  287         this.padXid = padXid;
  288      }
  289      /**
  290       * Set the IsSameRMOverrideValue value.
  291       * 
  292       * @param isSameRMOverrideValue The new IsSameRMOverrideValue value.
  293       */
  294      public void setIsSameRMOverrideValue(Boolean isSameRMOverrideValue)
  295      {
  296         this.isSameRMOverrideValue = isSameRMOverrideValue;
  297      }
  298      
  299      public long getTimeLeftBeforeTransactionTimeout(boolean errorRollback) throws RollbackException
  300      {
  301         if (tm == null)
  302            throw new IllegalStateException("No transaction manager: " + ccmName);
  303         if (tm instanceof TransactionTimeoutConfiguration)
  304            return ((TransactionTimeoutConfiguration) tm).getTimeLeftBeforeTransactionTimeout(errorRollback);
  305         return -1;
  306      }
  307   
  308      @Override
  309      public void checkTransactionActive() throws RollbackException, SystemException
  310      {
  311         if (tm == null)
  312            throw new IllegalStateException("No transaction manager: " + ccmName);
  313         Transaction tx = tm.getTransaction();
  314         if (tx != null)
  315         {
  316            int status = tx.getStatus();
  317            // Only allow states that will actually succeed
  318            if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING && status != Status.STATUS_PREPARED && status != Status.STATUS_COMMITTING)
  319               throw new RollbackException("Transaction " + tx + " cannot proceed " + TxUtils.getStatusAsString(status));
  320         }
  321      }
  322   
  323      protected void startService() throws Exception
  324      {
  325         if (transactionManagerService != null)
  326            tm = (TransactionManager)getServer().getAttribute(transactionManagerService, "TransactionManager");
  327         else
  328         {
  329            log.warn("----------------------------------------------------------");
  330            log.warn("----------------------------------------------------------");
  331            log.warn("Please change your datasource setup to use <depends optional-attribute-name\"TransactionManagerService\">jboss:service=TransactionManager</depends>");
  332            log.warn("instead of <attribute name=\"TransactionManager\">java:/TransactionManager</attribute>");
  333            log.warn("Better still, use a *-ds.xml file");
  334            log.warn("----------------------------------------------------------");
  335            log.warn("----------------------------------------------------------");
  336            tm = (TransactionManager)new InitialContext().lookup(tmName);
  337         }
  338         
  339         
  340         super.startService();
  341      }
  342   
  343      protected void stopService() throws Exception
  344      {
  345         this.tm = null;
  346         super.stopService();
  347      }
  348   
  349      public ConnectionListener getManagedConnection(Subject subject, ConnectionRequestInfo cri)
  350         throws ResourceException
  351      {
  352         Transaction trackByTransaction = null;
  353         try
  354         {
  355            Transaction tx = tm.getTransaction();
  356            if (tx != null && TxUtils.isActive(tx) == false)
  357               throw new ResourceException("Transaction is not active: tx=" + tx);
  358            if (trackConnectionByTx)
  359               trackByTransaction = tx;
  360         }
  361         catch (Throwable t)
  362         {
  363            JBossResourceException.rethrowAsResourceException("Error checking for a transaction.", t);
  364         }
  365   
  366         if (trace)
  367            log.trace("getManagedConnection trackByTx=" + trackConnectionByTx + " tx=" + trackByTransaction);
  368         return super.getManagedConnection(trackByTransaction, subject, cri);
  369      }
  370   
  371      public void transactionStarted(Collection crs) throws SystemException
  372      {
  373         Set cls = new HashSet();
  374         for (Iterator i = crs.iterator(); i.hasNext(); )
  375         {
  376            ConnectionRecord cr = (ConnectionRecord)i.next();
  377            ConnectionListener cl = cr.cl;
  378            if (!cls.contains(cl))
  379            {
  380               cls.add(cl);
  381               cl.enlist();
  382            }
  383         }
  384      }
  385   
  386      protected void managedConnectionReconnected(ConnectionListener cl) throws ResourceException
  387      {
  388         try
  389         {
  390            cl.enlist();
  391         }
  392         catch (Throwable t)
  393         {
  394            if (trace)
  395               log.trace("Could not enlist in transaction on entering meta-aware object! " + cl, t);
  396            throw new JBossResourceException("Could not enlist in transaction on entering meta-aware object!", t);
  397         }
  398      }
  399   
  400      protected void managedConnectionDisconnected(ConnectionListener cl) throws ResourceException
  401      {
  402         Throwable throwable = null;
  403         try
  404         {
  405            cl.delist();
  406         }
  407         catch (Throwable t)
  408         {
  409            throwable = t;
  410         }
  411   
  412         //if there are no more handles and tx is complete, we can return to pool.
  413         boolean isFree = cl.isManagedConnectionFree();
  414         if (trace)
  415            log.trace("Disconnected isManagedConnectionFree=" + isFree + " cl=" + cl);
  416         if (isFree)
  417            returnManagedConnection(cl, false);
  418   
  419         // Rethrow the error
  420         if (throwable != null)
  421            JBossResourceException.rethrowAsResourceException("Could not delist resource, probably a transaction rollback? ", throwable);      
  422      }
  423   
  424      public ConnectionListener createConnectionListener(ManagedConnection mc, Object context)
  425         throws ResourceException
  426      {
  427         XAResource xaResource = null;
  428         
  429         if (localTransactions)
  430         {
  431            xaResource = new LocalXAResource(log);
  432       
  433            if (xaResourceTimeout != 0)
  434               log.debug("XAResource transaction timeout cannot be set for local transactions: " + getJndiName());
  435         }
  436         
  437         else
  438         {
  439            
  440            if(wrapXAResource)
  441            {
  442               log.trace("Generating XAResourceWrapper for TxConnectionManager" + this);
  443               xaResource = new XAResourceWrapper(isSameRMOverrideValue, padXid, mc.getXAResource());
  444               
  445            }
  446            
  447            else
  448            {
  449               log.trace("Not wrapping XAResource.");
  450               xaResource = mc.getXAResource();
  451               
  452            }
  453                                   
  454            if (xaResourceTimeout != 0)
  455            {
  456               try
  457               {
  458                  if (xaResource.setTransactionTimeout(xaResourceTimeout) == false)
  459                     log.debug("XAResource does not support transaction timeout configuration: " + getJndiName());
  460               }
  461               catch (XAException e)
  462               {
  463                  throw new JBossResourceException("Unable to set XAResource transaction timeout: " + getJndiName(), e);
  464               }
  465            }
  466         }
  467   
  468         ConnectionListener cli = new TxConnectionEventListener(mc, poolingStrategy, context, log, xaResource);
  469         mc.addConnectionEventListener(cli);
  470         return cli;
  471      }
  472   
  473      public boolean isTransactional()
  474      {
  475         return TxUtils.isCompleted(tm) == false;
  476      }
  477   
  478      // implementation of javax.resource.spi.ConnectionEventListener interface
  479      //there is one of these for each ManagedConnection instance.  It lives as long as the ManagedConnection.
  480      protected class TxConnectionEventListener
  481         extends BaseConnectionManager2.BaseConnectionEventListener
  482      {
  483         /** Use our own logger to prevent MNFE caused by compiler bug with nested classes. */
  484         protected Logger log;
  485   
  486         protected TransactionSynchronization transactionSynchronization;
  487         
  488         private final XAResource xaResource;
  489   
  490         /** Whether there is a local transaction */
  491         private SynchronizedBoolean localTransaction = new SynchronizedBoolean(false);
  492         
  493         public TxConnectionEventListener(final ManagedConnection mc, final ManagedConnectionPool mcp, final Object context, Logger log, final XAResource xaResource) throws ResourceException
  494         {
  495            super(mc, mcp, context, log);
  496            this.log = log;
  497            this.xaResource = xaResource;
  498            
  499            if (xaResource instanceof LocalXAResource)
  500               ((LocalXAResource) xaResource).setConnectionListener(this);
  501         }
  502   
  503         public void enlist() throws SystemException
  504         {
  505            // This method is a bit convulted, but it has to be such because
  506            // there is a race condition in the transaction manager where it
  507            // unlocks during the enlist of the XAResource. It does this 
  508            // to avoid distributed deadlocks and to ensure the transaction
  509            // timeout can fail a badly behaving resource during the enlist.
  510            //
  511            // When two threads in the same transaction are trying to enlist 
  512            // connections they could be from the same resource manager 
  513            // or even the same connection when tracking the connection by transaction.
  514            //
  515            // For the same connection, we only want to do the real enlist once.
  516            // For two connections from the same resource manager we don't
  517            // want the join before the initial start request.
  518            //
  519            // The solution is to build up a list of unenlisted resources
  520            // in the TransactionSynchronizer and then choose one of the
  521            // threads that is contending in the transaction to enlist them
  522            // in order. The actual order doesn't really matter as it is the
  523            // transaction manager that calculates the enlist flags and determines
  524            // whether the XAResource was already enlisted.
  525            //
  526            // Once there are no unenlisted resources the threads are released
  527            // to return the result of the enlistments.
  528            //
  529            // In practice, a thread just takes a snapshot to try to avoid one
  530            // thread having to do all the work. If it did not do them all
  531            // the next waiting thread will do the next snapshot until there
  532            // there is either no snapshot or no waiting threads.
  533            //
  534            // A downside to this design is a thread could have its resource enlisted by
  535            // an earlier thread while it enlists some later thread's resource.
  536            // Since they are all a part of the same transaction, this is probably
  537            // not a real issue.
  538            
  539            // No transaction associated with the thread
  540            int status = tm.getStatus();
  541            if (status == Status.STATUS_NO_TRANSACTION)
  542            {
  543               if (transactionSynchronization != null && transactionSynchronization.currentTx != null)
  544               {
  545                  String error = "Attempt to use connection outside a transaction when already a tx!";
  546                  if (trace)
  547                     log.trace(error + " " + this);
  548                  throw new IllegalStateException(error);
  549               }
  550               if (trace)
  551                  log.trace("No transaction, no need to enlist: " + this);
  552               return;
  553            }
  554            
  555            // Inactive transaction
  556            Transaction threadTx = tm.getTransaction();
  557            if (threadTx == null || status != Status.STATUS_ACTIVE)
  558            {
  559               String error = "Transaction " + threadTx + " is not active " + TxUtils.getStatusAsString(status);
  560               if (trace)
  561                  log.trace(error + " cl=" + this);
  562               throw new IllegalStateException(error);
  563            }
  564   
  565            if (trace)
  566               log.trace("Pre-enlist: " + this + " threadTx=" + threadTx);
  567            
  568            // Our synchronization 
  569            TransactionSynchronization ourSynchronization = null;
  570   
  571            // Serializes enlistment when two different threads are enlisting 
  572            // different connections in the same transaction concurrently
  573            TransactionSynchronizer synchronizer = null;
  574   
  575            TransactionSynchronizer.lock(threadTx);
  576            try
  577            {
  578               // Interleaving should have an unenlisted transaction
  579               // TODO: We should be able to do some sharing shouldn't we?
  580               if (isTrackByTx() == false && transactionSynchronization != null)
  581               {
  582                  String error = "Can't enlist - already a tx!";
  583                  if (trace)
  584                     log.trace(error + " " + this);
  585                  throw new IllegalStateException(error);
  586               }
  587               
  588               // Check for different transaction
  589               if (transactionSynchronization != null && transactionSynchronization.currentTx.equals(threadTx) == false)
  590               {
  591                  String error = "Trying to change transaction " + threadTx + " in enlist!";
  592                  if (trace)
  593                     log.trace(error +" " + this);
  594                  throw new IllegalStateException(error);
  595               }
  596   
  597               // Get the synchronizer
  598               try
  599               {
  600                  if (trace)
  601                     log.trace("Get synchronizer " + this + " threadTx=" + threadTx);
  602                  synchronizer = TransactionSynchronizer.getRegisteredSynchronizer(threadTx);
  603               }
  604               catch (Throwable t)
  605               {
  606                  setTrackByTx(false);
  607                  rethrowAsSystemException("Cannot register synchronization", threadTx, t);
  608               }
  609   
  610               // First time through, create a transaction synchronization
  611               if (transactionSynchronization == null)
  612               {
  613                  TransactionSynchronization synchronization = new TransactionSynchronization(threadTx, isTrackByTx());
  614                  synchronizer.addUnenlisted(synchronization);
  615                  transactionSynchronization = synchronization;
  616               }
  617               ourSynchronization = transactionSynchronization;
  618            }
  619            finally
  620            {
  621               TransactionSynchronizer.unlock(threadTx);
  622            }
  623   
  624            // Perform the enlistment(s)
  625            ArrayList unenlisted = synchronizer.getUnenlisted();
  626            if (unenlisted != null)
  627            {
  628               try
  629               {
  630                  for (int i = 0; i < unenlisted.size(); ++i)
  631                  {
  632                     TransactionSynchronization sync = (TransactionSynchronization) unenlisted.get(i);
  633                     if (sync.enlist())
  634                        synchronizer.addEnlisted(sync);
  635                  }
  636               }
  637               finally
  638               {
  639                  synchronizer.enlisted();
  640               }
  641            }
  642            
  643            // What was the result of our enlistment?
  644            if (trace)
  645               log.trace("Check enlisted " + this + " threadTx=" + threadTx);
  646            ourSynchronization.checkEnlisted();
  647         }
  648   
  649         public void delist() throws ResourceException
  650         {
  651            if (trace)
  652               log.trace("delisting " + this);
  653   
  654            try
  655            {
  656               if (isTrackByTx() == false && transactionSynchronization != null)
  657               {
  658                  Transaction tx = transactionSynchronization.currentTx;
  659                  TransactionSynchronization synchronization = transactionSynchronization;
  660                  transactionSynchronization = null;
  661                  if (TxUtils.isUncommitted(tx))
  662                  {
  663                     TransactionSynchronizer synchronizer = TransactionSynchronizer.getRegisteredSynchronizer(tx);
  664                     if (synchronization.enlisted)
  665                        synchronizer.removeEnlisted(synchronization);
  666                     if (tx.delistResource(getXAResource(), XAResource.TMSUSPEND) == false)
  667                        throw new ResourceException("Failure to delist resource: " + this);
  668                  }
  669               }
  670            }
  671            catch (Throwable t)
  672            {
  673               JBossResourceException.rethrowAsResourceException("Error in delist!", t);
  674            }
  675         }
  676   
  677         //local will return this, xa will return one from mc.
  678         protected XAResource getXAResource()
  679         {
  680            return xaResource;
  681         }
  682   
  683         public void connectionClosed(ConnectionEvent ce)
  684         {
  685            if (trace)
  686               log.trace("connectionClosed called mc=" + this.getManagedConnection());
  687            if (this.getManagedConnection() != (ManagedConnection)ce.getSource())
  688               throw new IllegalArgumentException("ConnectionClosed event received from wrong ManagedConnection! Expected: " + this.getManagedConnection() + ", actual: " + ce.getSource());
  689            try
  690            {
  691               getCcm().unregisterConnection(TxConnectionManager.this, ce.getConnectionHandle());
  692            }
  693            catch (Throwable t)
  694            {
  695               log.info("throwable from unregister connection", t);
  696            }
  697   
  698            try
  699            {
  700               unregisterAssociation(this, ce.getConnectionHandle());
  701               boolean isFree = isManagedConnectionFree();
  702               if (trace)
  703                  log.trace("isManagedConnectionFree=" + isFree + " mc=" + this.getManagedConnection());
  704               //no more handles
  705               if (isFree)
  706               {
  707                  delist();
  708                  returnManagedConnection(this, false);
  709               }
  710            }
  711            catch (Throwable t)
  712            {
  713               log.error("Error while closing connection handle!", t);
  714               returnManagedConnection(this, true);
  715            }
  716         }
  717   
  718         public void localTransactionStarted(ConnectionEvent ce)
  719         {
  720            localTransaction.set(true);
  721         }
  722   
  723         public void localTransactionCommitted(ConnectionEvent ce)
  724         {
  725            localTransaction.set(false);
  726         }
  727   
  728         public void localTransactionRolledback(ConnectionEvent ce)
  729         {
  730            localTransaction.set(false);
  731         }
  732   
  733         public void tidyup() throws ResourceException
  734         {
  735            // We have a hanging transaction
  736            if (localTransaction.get())
  737            {
  738               LocalTransaction local = null;
  739               ManagedConnection mc = getManagedConnection();
  740               try
  741               {
  742                  local = mc.getLocalTransaction();
  743               }
  744               catch (Throwable t)
  745               {
  746                  JBossResourceException.rethrowAsResourceException("Unfinished local transaction - error getting local transaction from " + this, t);
  747               }
  748               if (local == null)
  749                  throw new ResourceException("Unfinished local transaction but managed connection does not provide a local transaction. " + this);
  750               else
  751               {
  752                  local.rollback();
  753                  log.debug("Unfinished local transaction was rolled back." + this);
  754               }
  755            }
  756         }
  757   
  758         public void connectionErrorOccurred(ConnectionEvent ce)
  759         {
  760            transactionSynchronization = null;
  761            super.connectionErrorOccurred(ce);
  762         }
  763   
  764         //Important method!!
  765         public boolean isManagedConnectionFree()
  766         {
  767            if (isTrackByTx() && transactionSynchronization != null)
  768               return false;
  769            return super.isManagedConnectionFree();
  770         }
  771   
  772         private class TransactionSynchronization implements Synchronization
  773         {
  774            /** Transaction */
  775            private Transaction currentTx;
  776            
  777            /** This is the status when we were registered */
  778            private boolean wasTrackByTx;
  779   
  780            /** Whether we are enlisted */
  781            private boolean enlisted = false;
  782            
  783            /** Any error during enlistment */
  784            private Throwable enlistError;
  785            
  786            /**
  787             * Create a new TransactionSynchronization.
  788             * 
  789             * @param trackByTx whether this is track by connection
  790             */
  791            public TransactionSynchronization(Transaction tx, boolean trackByTx)
  792            {
  793               this.currentTx = tx;
  794               wasTrackByTx = trackByTx;
  795            }
  796            
  797            /**
  798             * Get the result of the enlistment
  799             * 
  800             * @throws SystemExeption for any error
  801             */
  802            public void checkEnlisted() throws SystemException
  803            {
  804               if (enlistError != null)
  805               {
  806                  String error = "Error enlisting resource in transaction=" + currentTx;
  807                  if (trace)
  808                     log.trace(error + " " + TxConnectionEventListener.this);
  809   
  810                  // Wrap the error to give a reasonable stacktrace since the resource
  811                  // could have been enlisted by a different thread
  812                  if (enlistError == FAILED_TO_ENLIST)
  813                     throw new SystemException(FAILED_TO_ENLIST + " tx=" + currentTx);
  814                  else
  815                  {
  816                     SystemException e = new SystemException(error);
  817                     e.initCause(enlistError);
  818                     throw e;
  819                  }
  820               }
  821               if (enlisted == false)
  822               {
  823                  String error = "Resource is not enlisted in transaction=" + currentTx;
  824                  if (trace)
  825                     log.trace(error + " " + TxConnectionEventListener.this);
  826                  throw new IllegalStateException("Resource was not enlisted.");
  827               }
  828            }
  829            
  830            /**
  831             * Enlist the resource
  832             * 
  833             * @return true when enlisted, false otherwise
  834             */
  835            public boolean enlist()
  836            {
  837               if (trace)
  838                  log.trace("Enlisting resource " + TxConnectionEventListener.this);
  839               try
  840               {
  841                  XAResource resource = getXAResource();
  842                  if (false == currentTx.enlistResource(resource))
  843                     enlistError = FAILED_TO_ENLIST;
  844               }
  845               catch (Throwable t)
  846               {
  847                  enlistError = t;
  848               }
  849   
  850               synchronized (this)
  851               {
  852                  if (enlistError != null)
  853                  {
  854                     if (trace)
  855                        log.trace("Failed to enlist resource " + TxConnectionEventListener.this, enlistError);
  856                     setTrackByTx(false);
  857                     transactionSynchronization = null;
  858                     return false;
  859                  }
  860                  
  861                  if (trace)
  862                     log.trace("Enlisted resource " + TxConnectionEventListener.this);
  863                  enlisted = true;
  864                  return true;
  865               }
  866            }
  867            
  868            public void beforeCompletion()
  869            {
  870            }
  871   
  872            public void afterCompletion(int status)
  873            {
  874               // The connection got destroyed during the transaction
  875               if (getState() == DESTROYED)
  876                  return;
  877               
  878               // Are we still in the original transaction?
  879               if (this.equals(transactionSynchronization) == false)
  880               {
  881                  // If we are interleaving transactions we have nothing to do
  882                  if (wasTrackByTx == false)
  883                     return;
  884                  else
  885                  {
  886                     // There is something wrong with the pooling
  887                     String message = "afterCompletion called with wrong tx! Expected: " + this + ", actual: " + transactionSynchronization;
  888                     IllegalStateException e = new IllegalStateException(message);
  889                     log.error("There is something wrong with the pooling?", e);
  890                  }
  891               }
  892               // "Delist"
  893               transactionSynchronization = null;
  894               // This is where we close when doing track by transaction
  895               if (wasTrackByTx)
  896               {
  897                  setTrackByTx(false);
  898                  if (isManagedConnectionFree())
  899                     returnManagedConnection(TxConnectionEventListener.this, false);
  900               }
  901            }
  902            
  903            public String toString()
  904            {
  905               StringBuffer buffer = new StringBuffer();
  906               buffer.append("TxSync").append(System.identityHashCode(this));
  907               buffer.append("{tx=").append(currentTx);
  908               buffer.append(" wasTrackByTx=").append(wasTrackByTx);
  909               buffer.append(" enlisted=").append(enlisted);
  910               buffer.append("}");
  911               return buffer.toString();
  912            }
  913         }
  914         
  915         // For debugging
  916         protected void toString(StringBuffer buffer)
  917         {
  918            buffer.append(" xaResource=").append(xaResource);
  919            buffer.append(" txSync=").append(transactionSynchronization);
  920         }
  921      }
  922   
  923      private class LocalXAResource implements XAResource, LastResource
  924      {
  925         protected Logger log;
  926   
  927         private ConnectionListener cl;
  928   
  929         /**
  930          * <code>warned</code> is set after one warning about a local participant
  931          * in a multi-branch jta transaction is logged.
  932          *
  933          */
  934         private boolean warned = false;
  935   
  936         private Xid currentXid;
  937   
  938         public LocalXAResource(final Logger log)
  939         {
  940            this.log = log;
  941         }
  942   
  943         void setConnectionListener(ConnectionListener cl)
  944         {
  945            this.cl = cl;
  946         }
  947   
  948         // implementation of javax.transaction.xa.XAResource interface
  949   
  950         public void start(Xid xid, int flags) throws XAException
  951         {
  952            if (trace)
  953               log.trace("start, xid: " + xid + ", flags: " + flags);
  954            if (currentXid  != null && flags == XAResource.TMNOFLAGS)
  955               throw new JBossLocalXAException("Trying to start a new tx when old is not complete! old: " + currentXid  + ", new " + xid + ", flags " + flags, XAException.XAER_PROTO);
  956            if (currentXid  == null && flags != XAResource.TMNOFLAGS)
  957               throw new JBossLocalXAException("Trying to start a new tx with wrong flags!  new " + xid + ", flags " + flags, XAException.XAER_PROTO);
  958            if (currentXid == null)
  959            {
  960               try
  961               {
  962                  cl.getManagedConnection().getLocalTransaction().begin();
  963               }
  964               catch (ResourceException re)
  965               {
  966                  throw new JBossLocalXAException("Error trying to start local tx: ", XAException.XAER_RMERR, re);
  967               }
  968               catch (Throwable t)
  969               {
  970                  throw new JBossLocalXAException("Throwable trying to start local transaction!", XAException.XAER_RMERR, t);
  971               }
  972   
  973               currentXid = xid;
  974            }
  975         }
  976   
  977         public void end(Xid xid, int flags) throws XAException
  978         {
  979            if (trace)
  980               log.trace("end on xid: " + xid + " called with flags " + flags);
  981         }
  982   
  983         public void commit(Xid xid, boolean onePhase) throws XAException
  984         {
  985            if (xid.equals(currentXid) == false)
  986               throw new JBossLocalXAException("wrong xid in commit: expected: " + currentXid + ", got: " + xid, XAException.XAER_PROTO);
  987            currentXid = null;
  988            try
  989            {
  990               cl.getManagedConnection().getLocalTransaction().commit();
  991            }
  992            catch (ResourceException re)
  993            {
  994               returnManagedConnection(cl, true);
  995               if (trace)
  996                  log.trace("commit problem: ", re);
  997               throw new JBossLocalXAException("could not commit local tx", XAException.XA_RBROLLBACK, re);
  998            }
  999         }
 1000   
 1001         public void forget(Xid xid) throws XAException
 1002         {
 1003            throw new JBossLocalXAException("forget not supported in local tx", XAException.XAER_RMERR);
 1004         }
 1005   
 1006         public int getTransactionTimeout() throws XAException
 1007         {
 1008            // TODO: implement this javax.transaction.xa.XAResource method
 1009            return 0;
 1010         }
 1011   
 1012         public boolean isSameRM(XAResource xaResource) throws XAException
 1013         {
 1014            return xaResource == this;
 1015         }
 1016   
 1017         public int prepare(Xid xid) throws XAException
 1018         {
 1019            if (!warned)
 1020               log.warn("Prepare called on a local tx. Use of local transactions on a jta transaction with more than one branch may result in inconsistent data in some cases of failure.");
 1021            warned = true;
 1022            return XAResource.XA_OK;
 1023         }
 1024   
 1025         public Xid[] recover(int flag) throws XAException
 1026         {
 1027            throw new JBossLocalXAException("no recover with local-tx only resource managers", XAException.XAER_RMERR);
 1028         }
 1029   
 1030         public void rollback(Xid xid) throws XAException
 1031         {
 1032            if (xid.equals(currentXid) == false)
 1033               throw new JBossLocalXAException("wrong xid in rollback: expected: " + currentXid + ", got: " + xid, XAException.XAER_PROTO);
 1034            currentXid = null;
 1035            try
 1036            {
 1037               cl.getManagedConnection().getLocalTransaction().rollback();
 1038            }
 1039            catch (ResourceException re)
 1040            {
 1041               returnManagedConnection(cl, true);
 1042               if (trace)
 1043                  log.trace("rollback problem: ", re);
 1044               throw new JBossLocalXAException("could not rollback local tx", XAException.XAER_RMERR, re);
 1045            }
 1046         }
 1047   
 1048         public boolean setTransactionTimeout(int seconds) throws XAException
 1049         {
 1050            // TODO: implement this javax.transaction.xa.XAResource method
 1051            return false;
 1052         }
 1053      }
 1054   }

Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » resource » connectionmanager » [javadoc | source]