Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » pm » jdbc2 » [javadoc | source]
    1   /*
    2    * JBoss, Home of Professional Open Source.
    3    * Copyright 2006, Red Hat Middleware LLC, and individual contributors
    4    * as indicated by the @author tags. See the copyright.txt file in the
    5    * distribution for a full listing of individual contributors.
    6    *
    7    * This is free software; you can redistribute it and/or modify it
    8    * under the terms of the GNU Lesser General Public License as
    9    * published by the Free Software Foundation; either version 2.1 of
   10    * the License, or (at your option) any later version.
   11    *
   12    * This software is distributed in the hope that it will be useful,
   13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15    * Lesser General Public License for more details.
   16    *
   17    * You should have received a copy of the GNU Lesser General Public
   18    * License along with this software; if not, write to the Free
   19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21    */
   22   package org.jboss.mq.pm.jdbc2;
   23   
   24   import java.io.ByteArrayInputStream;
   25   import java.io.ByteArrayOutputStream;
   26   import java.io.IOException;
   27   import java.io.ObjectInputStream;
   28   import java.io.ObjectOutputStream;
   29   import java.io.StreamCorruptedException;
   30   import java.sql.Connection;
   31   import java.sql.PreparedStatement;
   32   import java.sql.ResultSet;
   33   import java.sql.SQLException;
   34   import java.util.HashMap;
   35   import java.util.Iterator;
   36   import java.util.Map;
   37   import java.util.Properties;
   38   
   39   import javax.jms.JMSException;
   40   import javax.management.AttributeNotFoundException;
   41   import javax.management.InstanceNotFoundException;
   42   import javax.management.MBeanException;
   43   import javax.management.ObjectName;
   44   import javax.management.ReflectionException;
   45   import javax.naming.InitialContext;
   46   import javax.naming.NamingException;
   47   import javax.sql.DataSource;
   48   import javax.transaction.Transaction;
   49   import javax.transaction.TransactionManager;
   50   import javax.transaction.xa.Xid;
   51   
   52   import org.jboss.mq.SpyDestination;
   53   import org.jboss.mq.SpyJMSException;
   54   import org.jboss.mq.SpyMessage;
   55   import org.jboss.mq.SpyTopic;
   56   import org.jboss.mq.pm.CacheStore;
   57   import org.jboss.mq.pm.PersistenceManagerExt;
   58   import org.jboss.mq.pm.Tx;
   59   import org.jboss.mq.pm.TxManager;
   60   import org.jboss.mq.server.JMSDestination;
   61   import org.jboss.mq.server.MessageCache;
   62   import org.jboss.mq.server.MessageReference;
   63   import org.jboss.system.ServiceMBeanSupport;
   64   import org.jboss.tm.TransactionManagerLocator;
   65   import org.jboss.tm.TransactionTimeoutConfiguration;
   66   import org.jboss.util.UnreachableStatementException;
   67   
   68   import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
   69   
   70   /**
   71    * This class manages all persistence related services for JDBC based
   72    * persistence.
   73    *
   74    * @author Jayesh Parayali (jayeshpk1@yahoo.com)
   75    * @author Hiram Chirino (cojonudo14@hotmail.com)
   76    * @author Adrian Brock (adrian@jboss.com)
   77    * @version $Revision: 73848 $
   78    */
   79   public class PersistenceManager extends ServiceMBeanSupport
   80      implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, CacheStore, PersistenceManagerExt
   81   {
   82      /** FAQ about concurrency problems */
   83      private static String CONCURRENCY_WARNING = "\nCommon reasons for missing messages are \n1) You have multiple JBossMQs running over the same database.\n2) You are using a replicating database that is not keeping up with replication.";
   84      
   85      /////////////////////////////////////////////////////////////////////////////////
   86      //
   87      // TX state attibutes
   88      //
   89      /////////////////////////////////////////////////////////////////////////////////
   90   
   91      /** The next transaction id */
   92      protected SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
   93      
   94      /** The jta transaction manager */
   95      protected TxManager txManager;
   96      
   97      /** The DataSource */
   98      protected DataSource datasource;
   99      
  100      /** The JBossMQ transaction mananger */
  101      protected TransactionManager tm;
  102      
  103      /** The override recovery timeout */
  104      private int recoveryTimeout = 0;
  105      
  106      /** The recovery retries */
  107      private int recoveryRetries = 0;
  108      
  109      /** The recover messages chunk  */
  110      private int recoverMessagesChunk = 0;
  111   
  112      /** The statement retries */
  113      private int statementRetries = 5;
  114      
  115      /////////////////////////////////////////////////////////////////////////////////
  116      //
  117      // JDBC Access Attributes
  118      //
  119      /////////////////////////////////////////////////////////////////////////////////
  120   
  121      protected String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
  122      protected String UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)";
  123      protected String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
  124      protected String DELETE_MARKED_MESSAGES_WITH_TX =
  125         "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
  126      protected String DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY =
  127         "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?";
  128      protected String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
  129      protected String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
  130      protected String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
  131      protected String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
  132      protected String INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)";
  133      protected String DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS";
  134      protected String DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL";
  135      protected String SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)";
  136      protected String SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS";
  137      protected String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
  138      protected String SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?";
  139      protected String SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?";
  140      protected String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
  141      protected String SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
  142      protected String INSERT_MESSAGE =
  143         "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
  144      protected String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
  145      protected String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
  146      protected String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
  147      protected String CREATE_MESSAGE_TABLE =
  148         "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, "
  149            + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),"
  150            + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
  151      protected String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
  152      protected String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
  153      protected String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
  154      protected String CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )";
  155   
  156      protected static final int OBJECT_BLOB = 0;
  157      protected static final int BYTES_BLOB = 1;
  158      protected static final int BINARYSTREAM_BLOB = 2;
  159      protected static final int BLOB_BLOB = 3;
  160   
  161      protected int blobType = OBJECT_BLOB;
  162      protected boolean createTables;
  163   
  164      protected  int connectionRetryAttempts = 5;
  165   
  166      protected boolean xaRecovery = false;
  167      
  168      /////////////////////////////////////////////////////////////////////////////////
  169      //
  170      // Constructor.
  171      //
  172      /////////////////////////////////////////////////////////////////////////////////
  173      public PersistenceManager() throws javax.jms.JMSException
  174      {
  175         txManager = new TxManager(this);
  176      }
  177   
  178      /**
  179       * This inner class helps handle the tx management of the jdbc connections.
  180       * 
  181       */
  182      protected class TransactionManagerStrategy
  183      {
  184         boolean rollback = false;
  185   
  186         Transaction threadTx;
  187   
  188         void startTX() throws JMSException
  189         {
  190            try
  191            {
  192               // Thread arriving must be clean (jboss doesn't set the thread
  193               // previously). However optimized calls come with associated
  194               // thread for example. We suspend the thread association here, and
  195               // resume in the finally block of the following try.
  196               threadTx = tm.suspend();
  197   
  198               // Always begin a transaction
  199               tm.begin();
  200            }
  201            catch (Exception e)
  202            {
  203               try
  204               {
  205                  if (threadTx != null)
  206                     tm.resume(threadTx);
  207               }
  208               catch (Exception ignore)
  209               {
  210               }
  211               throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
  212            }
  213         }
  214   
  215         void setRollbackOnly() throws JMSException
  216         {
  217            rollback = true;
  218         }
  219   
  220         void endTX() throws JMSException
  221         {
  222            try
  223            {
  224               if (rollback)
  225               {
  226                  tm.rollback();
  227               }
  228               else
  229               {
  230                  tm.commit();
  231               }
  232            }
  233            catch (Exception e)
  234            {
  235               throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
  236            }
  237            finally
  238            {
  239               try
  240               {
  241                  if (threadTx != null)
  242                     tm.resume(threadTx);
  243               }
  244               catch (Exception ignore)
  245               {
  246               }
  247            }
  248         }
  249      }
  250   
  251      /////////////////////////////////////////////////////////////////////////////////
  252      //
  253      // TX Resolution.
  254      //
  255      /////////////////////////////////////////////////////////////////////////////////
  256   
  257      synchronized protected void createSchema() throws JMSException
  258      {
  259         TransactionManagerStrategy tms = new TransactionManagerStrategy();
  260         tms.startTX();
  261         Connection c = null;
  262         PreparedStatement stmt = null;
  263         boolean threadWasInterrupted = Thread.interrupted();
  264         try
  265         {
  266            if (createTables)
  267            {
  268               c = this.getConnection();
  269   
  270               boolean createdMessageTable = false;
  271               try
  272               {
  273                  stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
  274                  stmt.executeUpdate();
  275                  createdMessageTable = true;
  276               }
  277               catch (SQLException e)
  278               {
  279                  log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE, e);
  280               }
  281               finally
  282               {
  283                  try
  284                  {
  285                     if (stmt != null)
  286                        stmt.close();
  287                  }
  288                  catch (Throwable ignored)
  289                  {
  290                     log.trace("Ignored: " + ignored);
  291                  }
  292                  stmt = null;
  293               }
  294   
  295               if (createdMessageTable)
  296               {
  297                  try
  298                  {
  299                     stmt = c.prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID);
  300                     stmt.executeUpdate();
  301                  }
  302                  catch (SQLException e)
  303                  {
  304                     log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_TXOP_TXID, e);
  305                  }
  306                  finally
  307                  {
  308                     try
  309                     {
  310                        if (stmt != null)
  311                           stmt.close();
  312                     }
  313                     catch (Throwable ignored)
  314                     {
  315                        log.trace("Ignored: " + ignored);
  316                     }
  317                     stmt = null;
  318                  }
  319                  try
  320                  {
  321                     stmt = c.prepareStatement(CREATE_IDX_MESSAGE_DESTINATION);
  322                     stmt.executeUpdate();
  323                  }
  324                  catch (SQLException e)
  325                  {
  326                     log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_DESTINATION, e);
  327                  }
  328                  finally
  329                  {
  330                     try
  331                     {
  332                        if (stmt != null)
  333                           stmt.close();
  334                     }
  335                     catch (Throwable ignored)
  336                     {
  337                        log.trace("Ignored: " + ignored);
  338                     }
  339                     stmt = null;
  340                  }
  341               }
  342   
  343               String createTxTable = CREATE_TX_TABLE;
  344               if (xaRecovery)
  345                  createTxTable = CREATE_TX_TABLE_XARECOVERY;
  346               try
  347               {
  348                  stmt = c.prepareStatement(createTxTable);
  349                  stmt.executeUpdate();
  350               }
  351               catch (SQLException e)
  352               {
  353                  log.debug("Could not create table with SQL: " + createTxTable, e);
  354               }
  355               finally
  356               {
  357                  try
  358                  {
  359                     if (stmt != null)
  360                        stmt.close();
  361                  }
  362                  catch (Throwable ignored)
  363                  {
  364                     log.trace("Ignored: " + ignored);
  365                  }
  366                  stmt = null;
  367               }
  368            }
  369         }
  370         catch (SQLException e)
  371         {
  372            tms.setRollbackOnly();
  373            throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
  374         }
  375         finally
  376         {
  377            try
  378            {
  379               if (stmt != null)
  380                  stmt.close();
  381            }
  382            catch (Throwable ignore)
  383            {
  384            }
  385            stmt = null;
  386            try
  387            {
  388               if (c != null)
  389                  c.close();
  390            }
  391            catch (Throwable ignore)
  392            {
  393            }
  394            c = null;
  395            tms.endTX();
  396   
  397            // Restore the interrupted state of the thread
  398            if (threadWasInterrupted)
  399               Thread.currentThread().interrupt();
  400         }
  401      }
  402   
  403      synchronized protected void resolveAllUncommitedTXs() throws JMSException
  404      {
  405         // We perform recovery in a different thread to the table creation
  406         // Postgres doesn't like create table failing in the same transaction
  407         // as other operations
  408   
  409         TransactionManagerStrategy tms = new TransactionManagerStrategy();
  410         tms.startTX();
  411         Connection c = null;
  412         PreparedStatement stmt = null;
  413         ResultSet rs = null;
  414         boolean threadWasInterrupted = Thread.interrupted();
  415         try
  416         {
  417            c = this.getConnection();
  418   
  419            // Find out what the next TXID should be
  420            stmt = c.prepareStatement(SELECT_MAX_TX);
  421            rs = stmt.executeQuery();
  422            if (rs.next())
  423               nextTransactionId.set(rs.getLong(1) + 1);
  424            rs.close();
  425            rs = null;
  426            stmt.close();
  427            stmt = null;
  428   
  429            // Delete all the temporary messages.
  430            stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
  431            stmt.executeUpdate();
  432            stmt.close();
  433            stmt = null;
  434   
  435            // Delete all the messages that were added but thier tx's were not commited.
  436            String deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX;
  437            if (xaRecovery)
  438               deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY;
  439            stmt = c.prepareStatement(deleteMarkedMessagesWithTx);
  440            stmt.setString(1, "A");
  441            stmt.executeUpdate();
  442            stmt.close();
  443            stmt = null;
  444   
  445            // Restore all the messages that were removed but their tx's were not commited.
  446            String updateMarkedMessages = UPDATE_MARKED_MESSAGES;
  447            if (xaRecovery)
  448               updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY;
  449            stmt = c.prepareStatement(updateMarkedMessages);
  450            stmt.setNull(1, java.sql.Types.BIGINT);
  451            stmt.setString(2, "A");
  452            stmt.setString(3, "D");
  453            stmt.executeUpdate();
  454            stmt.close();
  455            stmt = null;
  456   
  457            // Now recovery is complete, clear the transaction table.
  458            String deleteAllTx = DELETE_ALL_TX;
  459            if (xaRecovery)
  460               deleteAllTx = DELETE_ALL_TX_XARECOVERY;
  461            stmt = c.prepareStatement(deleteAllTx);
  462            stmt.execute();
  463            stmt.close();
  464            stmt = null;
  465   
  466            // If we are doing XARecovery restore the prepared transactions
  467            if (xaRecovery)
  468            {
  469               stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY);
  470               rs = stmt.executeQuery();
  471               while (rs.next())
  472               {
  473                  long txid = rs.getLong(1);
  474                  Xid xid = extractXid(rs, 2);
  475                  Tx tx = new Tx(txid);
  476                  tx.setXid(xid);
  477                  tx.checkPersisted();
  478                  txManager.restoreTx(tx);
  479               }
  480               rs.close();
  481               rs = null;
  482               stmt.close();
  483               stmt = null;
  484            }
  485         }
  486         catch (Exception e)
  487         {
  488            tms.setRollbackOnly();
  489            throw new SpyJMSException("Could not resolve uncommited transactions.  Message recovery may not be accurate", e);
  490         }
  491         finally
  492         {
  493            try
  494            {
  495               if (rs != null)
  496                  rs.close();
  497            }
  498            catch (Throwable ignore)
  499            {
  500            }
  501            try
  502            {
  503               if (stmt != null)
  504                  stmt.close();
  505            }
  506            catch (Throwable ignore)
  507            {
  508            }
  509            try
  510            {
  511               if (c != null)
  512                  c.close();
  513            }
  514            catch (Throwable ignore)
  515            {
  516            }
  517            tms.endTX();
  518   
  519            // Restore the interrupted state of the thread
  520            if (threadWasInterrupted)
  521               Thread.currentThread().interrupt();
  522         }
  523      }
  524   
  525      /////////////////////////////////////////////////////////////////////////////////
  526      //
  527      // Message Recovery
  528      //
  529      /////////////////////////////////////////////////////////////////////////////////
  530      
  531      synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
  532      {
  533         if (jmsDest == null)
  534            throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue");
  535         if (dest == null)
  536            throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue");
  537   
  538         boolean canOverrideTimeout = (tm instanceof TransactionTimeoutConfiguration);
  539         int previousTimeout = 0;
  540         try
  541         {
  542            // Set our timeout
  543            if (recoveryTimeout != 0)
  544            {
  545               if (canOverrideTimeout)
  546               {
  547                  previousTimeout = ((TransactionTimeoutConfiguration) tm).getTransactionTimeout();
  548                  tm.setTransactionTimeout(recoveryTimeout);
  549               }
  550               else
  551               {
  552                  log.debug("Cannot override recovery timeout, TransactionManager does implement " + TransactionTimeoutConfiguration.class.getName());
  553               }
  554            }
  555            
  556            // restore the queue
  557            try
  558            {
  559               internalRestoreQueue(jmsDest, dest);
  560            }
  561            finally
  562            {
  563               // restore the transaction timeout
  564               if (recoveryTimeout != 0 && canOverrideTimeout)
  565                  tm.setTransactionTimeout(previousTimeout);
  566            }
  567         }
  568         catch (Exception e)
  569         {
  570            SpyJMSException.rethrowAsJMSException("Unexpected error in recovery", e);
  571         }
  572      }
  573      
  574      synchronized protected void internalRestoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
  575      {
  576         // Work out the prepared transactions
  577         Map prepared = null;
  578         if (xaRecovery)
  579         {
  580            prepared = new HashMap();
  581            Map map = txManager.getPreparedTransactions();
  582            for (Iterator i = map.values().iterator(); i.hasNext();)
  583            {
  584               TxManager.PreparedInfo info = (TxManager.PreparedInfo) i.next();
  585               for (Iterator j = info.getTxids().iterator(); j.hasNext();)
  586               {
  587                  Tx tx = (Tx) j.next();
  588                  prepared.put(new Long(tx.longValue()), tx);
  589               }
  590            }
  591         }
  592         
  593         TransactionManagerStrategy tms = new TransactionManagerStrategy();
  594         tms.startTX();
  595         Connection c = null;
  596         PreparedStatement stmt = null;
  597         PreparedStatement stmt2 = null; 
  598         ResultSet rs = null;
  599         boolean threadWasInterrupted = Thread.interrupted();
  600         try
  601         {
  602            String selectMessagesInDest = SELECT_MESSAGES_IN_DEST;
  603            String selectMessage = SELECT_MESSAGE;
  604            if (xaRecovery)
  605            {
  606               selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY;
  607               selectMessage = SELECT_MESSAGE_XARECOVERY;
  608            }
  609            c = this.getConnection();
  610            if (recoverMessagesChunk == 0)
  611               stmt = c.prepareStatement(selectMessagesInDest);
  612            else
  613            {
  614               stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST);
  615               stmt2 = c.prepareStatement(selectMessage);
  616            }
  617            stmt.setString(1, dest.toString());
  618   
  619            long txid = 0;
  620            String txop = null;
  621            rs = stmt.executeQuery();
  622            int counter = 0;
  623            int recovery = 0;
  624            while (rs.next())
  625            {
  626               long msgid = rs.getLong(1);
  627               SpyMessage message = null;
  628               if (recoverMessagesChunk == 0)
  629               {
  630                  message = extractMessage(rs);
  631                  if (xaRecovery)
  632                  {
  633                     txid = rs.getLong(3);
  634                     txop = rs.getString(4);
  635                  }
  636               }
  637               else
  638               {
  639                  ResultSet rs2 = null;
  640                  try
  641                  {
  642                     stmt2.setLong(1, msgid);
  643                     stmt2.setString(2, dest.toString());
  644                     rs2 = stmt2.executeQuery();
  645                     if (rs2.next())
  646                     {
  647                        message = extractMessage(rs2);
  648                        if (xaRecovery)
  649                        {
  650                           txid = rs2.getLong(3);
  651                           txop = rs2.getString(4);
  652                        }
  653                     }
  654                     else
  655                        log.warn("Failed to find message msgid=" + msgid +" dest=" + dest);
  656                  }
  657                  finally
  658                  {
  659                     if (rs2 != null)
  660                     {
  661                        try
  662                        {
  663                           rs2.close();
  664                        }
  665                        catch (Exception ignored)
  666                        {
  667                        }
  668                     }
  669                  }
  670               }
  671               // The durable subscription is not serialized
  672               if (dest instanceof SpyTopic)
  673                  message.header.durableSubscriberID = ((SpyTopic) dest).getDurableSubscriptionID();
  674   
  675               if (xaRecovery == false || txid == 0 || txop == null)
  676                  jmsDest.restoreMessage(message);
  677               else
  678               {
  679                  Tx tx = (Tx) prepared.get(new Long(txid));
  680                  if (tx == null)
  681                     jmsDest.restoreMessage(message);
  682                  else if ("A".equals(txop))
  683                  {
  684                     jmsDest.restoreMessage(message, tx, Tx.ADD);
  685                     recovery++;
  686                  }
  687                  else if ("D".equals(txop))
  688                  {
  689                     jmsDest.restoreMessage(message, tx, Tx.REMOVE);
  690                     recovery++;
  691                  }
  692                  else
  693                     throw new IllegalStateException("Unknown txop=" + txop + " for msg=" + msgid + " dest=" + dest);
  694               }
  695               counter++;
  696            }
  697   
  698            log.debug("Restored " + counter + " message(s) to: " + dest + " " + recovery + " need recovery.");
  699         }
  700         catch (IOException e)
  701         {
  702            tms.setRollbackOnly();
  703            throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
  704         }
  705         catch (SQLException e)
  706         {
  707            tms.setRollbackOnly();
  708            throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
  709         }
  710         finally
  711         {
  712            try
  713            {
  714               if (rs != null)
  715                  rs.close();
  716            }
  717            catch (Throwable ignore)
  718            {
  719            }
  720            try
  721            {
  722               if (stmt != null)
  723                  stmt.close();
  724            }
  725            catch (Throwable ignore)
  726            {
  727            }
  728            try
  729            {
  730               if (c != null)
  731                  c.close();
  732            }
  733            catch (Throwable ignore)
  734            {
  735            }
  736            tms.endTX();
  737   
  738            // Restore the interrupted state of the thread
  739            if (threadWasInterrupted)
  740               Thread.currentThread().interrupt();
  741         }
  742   
  743      }
  744   
  745      SpyMessage extractMessage(ResultSet rs) throws SQLException, IOException
  746      {
  747         try
  748         {
  749            long messageid = rs.getLong(1);
  750   
  751            SpyMessage message = null;
  752   
  753            if (blobType == OBJECT_BLOB)
  754            {
  755   
  756               message = (SpyMessage) rs.getObject(2);
  757   
  758            }
  759            else if (blobType == BYTES_BLOB)
  760            {
  761   
  762               byte[] st = rs.getBytes(2);
  763               ByteArrayInputStream baip = new ByteArrayInputStream(st);
  764               ObjectInputStream ois = new ObjectInputStream(baip);
  765               message = SpyMessage.readMessage(ois);
  766   
  767            }
  768            else if (blobType == BINARYSTREAM_BLOB)
  769            {
  770   
  771               ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(2));
  772               message = SpyMessage.readMessage(ois);
  773   
  774            }
  775            else if (blobType == BLOB_BLOB)
  776            {
  777   
  778               ObjectInputStream ois = new ObjectInputStream(rs.getBlob(2).getBinaryStream());
  779               message = SpyMessage.readMessage(ois);
  780            }
  781   
  782            message.header.messageId = messageid;
  783            return message;
  784         }
  785         catch (StreamCorruptedException e)
  786         {
  787            throw new IOException("Could not load the message: " + e);
  788         }
  789      }
  790   
  791      Xid extractXid(ResultSet rs, int column) throws SQLException, IOException, ClassNotFoundException
  792      {
  793         try
  794         {
  795            Xid xid = null;
  796   
  797            if (blobType == OBJECT_BLOB)
  798            {
  799               xid = (Xid) rs.getObject(column);
  800            }
  801            else if (blobType == BYTES_BLOB)
  802            {
  803               byte[] st = rs.getBytes(column);
  804               ByteArrayInputStream baip = new ByteArrayInputStream(st);
  805               ObjectInputStream ois = new ObjectInputStream(baip);
  806               xid = (Xid) ois.readObject();
  807            }
  808            else if (blobType == BINARYSTREAM_BLOB)
  809            {
  810               ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(column));
  811               xid = (Xid) ois.readObject();
  812            }
  813            else if (blobType == BLOB_BLOB)
  814            {
  815               ObjectInputStream ois = new ObjectInputStream(rs.getBlob(column).getBinaryStream());
  816               xid = (Xid) ois.readObject();
  817            }
  818   
  819            return xid;
  820         }
  821         catch (StreamCorruptedException e)
  822         {
  823            throw new IOException("Could not load the message: " + e);
  824         }
  825      }
  826      
  827      public void forcePersistentTx(Tx txId) throws javax.jms.JMSException
  828      {
  829         // No need if already done or not doing xa recovery
  830         if (txId.wasPersisted() || xaRecovery == false)
  831            return;
  832         
  833         TransactionManagerStrategy tms = new TransactionManagerStrategy();
  834         tms.startTX();
  835         Connection c = null;
  836         boolean threadWasInterrupted = Thread.interrupted();
  837         try
  838         {
  839   
  840            c = this.getConnection();
  841            insertPersistentTx(tms, c, txId);
  842         }
  843         catch (SQLException e)
  844         {
  845            tms.setRollbackOnly();
  846            throw new SpyJMSException("Could not commit tx: " + txId, e);
  847         }
  848         finally
  849         {
  850            try
  851            {
  852               if (c != null)
  853                  c.close();
  854            }
  855            catch (Throwable ignore)
  856            {
  857            }
  858            tms.endTX();
  859   
  860            // Restore the interrupted state of the thread
  861            if (threadWasInterrupted)
  862               Thread.currentThread().interrupt();
  863         }
  864      }
  865   
  866      /////////////////////////////////////////////////////////////////////////////////
  867      //
  868      // TX Commit
  869      //
  870      /////////////////////////////////////////////////////////////////////////////////
  871      public void commitPersistentTx(Tx txId) throws javax.jms.JMSException
  872      {
  873         if (txId.wasPersisted() == false)
  874            return;
  875         
  876         TransactionManagerStrategy tms = new TransactionManagerStrategy();
  877         tms.startTX();
  878         Connection c = null;
  879         boolean threadWasInterrupted = Thread.interrupted();
  880         try
  881         {
  882   
  883            c = this.getConnection();
  884            removeMarkedMessages(c, txId, "D");
  885            removeTXRecord(c, txId.longValue());
  886   
  887         }
  888         catch (SQLException e)
  889         {
  890            tms.setRollbackOnly();
  891            throw new SpyJMSException("Could not commit tx: " + txId, e);
  892         }
  893         finally
  894         {
  895            try
  896            {
  897               if (c != null)
  898                  c.close();
  899            }
  900            catch (Throwable ignore)
  901            {
  902            }
  903            tms.endTX();
  904   
  905            // Restore the interrupted state of the thread
  906            if (threadWasInterrupted)
  907               Thread.currentThread().interrupt();
  908         }
  909      }
  910   
  911      public void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException
  912      {
  913         PreparedStatement stmt = null;
  914         try
  915         {
  916            stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
  917            stmt.setLong(1, txid.longValue());
  918            stmt.setString(2, mark);
  919            stmt.executeUpdate();
  920         }
  921         finally
  922         {
  923            try
  924            {
  925               if (stmt != null)
  926                  stmt.close();
  927            }
  928            catch (Throwable e)
  929            {
  930            }
  931         }
  932      }
  933   
  934      public void addTXRecord(Connection c, Tx txid) throws SQLException, IOException
  935      {
  936         PreparedStatement stmt = null;
  937         try
  938         {
  939            String insertTx = INSERT_TX;
  940            if (xaRecovery)
  941               insertTx = INSERT_TX_XARECOVERY;
  942            stmt = c.prepareStatement(insertTx);
  943            stmt.setLong(1, txid.longValue());
  944            if (xaRecovery)
  945            {
  946               Xid xid = txid.getXid();
  947               if (xid != null)
  948                  setBlob(stmt, 2, xid);
  949               else
  950                  stmt.setNull(2, java.sql.Types.BLOB);
  951            }
  952            stmt.executeUpdate();
  953         }
  954         finally
  955         {
  956            try
  957            {
  958               if (stmt != null)
  959                  stmt.close();
  960            }
  961            catch (Throwable e)
  962            {
  963            }
  964         }
  965      }
  966   
  967      public void removeTXRecord(Connection c, long txid) throws SQLException
  968      {
  969         PreparedStatement stmt = null;
  970         try
  971         {
  972            stmt = c.prepareStatement(DELETE_TX);
  973            stmt.setLong(1, txid);
  974            stmt.executeUpdate();
  975         }
  976         finally
  977         {
  978            try
  979            {
  980               if (stmt != null)
  981                  stmt.close();
  982            }
  983            catch (Throwable e)
  984            {
  985            }
  986         }
  987      }
  988   
  989      /////////////////////////////////////////////////////////////////////////////////
  990      //
  991      // TX Rollback
  992      //
  993      /////////////////////////////////////////////////////////////////////////////////
  994      public void rollbackPersistentTx(Tx txId) throws JMSException
  995      {
  996         if (txId.wasPersisted() == false)
  997            return;
  998   
  999         TransactionManagerStrategy tms = new TransactionManagerStrategy();
 1000         tms.startTX();
 1001         Connection c = null;
 1002         PreparedStatement stmt = null;
 1003         boolean threadWasInterrupted = Thread.interrupted();
 1004         try
 1005         {
 1006   
 1007            c = this.getConnection();
 1008            removeMarkedMessages(c, txId, "A");
 1009            removeTXRecord(c, txId.longValue());
 1010   
 1011            // Restore all the messages that were logically removed.
 1012            stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
 1013            stmt.setNull(1, java.sql.Types.BIGINT);
 1014            stmt.setString(2, "A");
 1015            stmt.setString(3, "D");
 1016            stmt.setLong(4, txId.longValue());
 1017            stmt.executeUpdate();
 1018            stmt.close();
 1019            stmt = null;
 1020         }
 1021         catch (SQLException e)
 1022         {
 1023            tms.setRollbackOnly();
 1024            throw new SpyJMSException("Could not rollback tx: " + txId, e);
 1025         }
 1026         finally
 1027         {
 1028            try
 1029            {
 1030               if (stmt != null)
 1031                  stmt.close();
 1032            }
 1033            catch (Throwable ignore)
 1034            {
 1035            }
 1036            try
 1037            {
 1038               if (c != null)
 1039                  c.close();
 1040            }
 1041            catch (Throwable ignore)
 1042            {
 1043            }
 1044            tms.endTX();
 1045   
 1046            // Restore the interrupted state of the thread
 1047            if (threadWasInterrupted)
 1048               Thread.currentThread().interrupt();
 1049         }
 1050   
 1051      }
 1052   
 1053      /////////////////////////////////////////////////////////////////////////////////
 1054      //
 1055      // TX Creation
 1056      //
 1057      /////////////////////////////////////////////////////////////////////////////////
 1058      public Tx createPersistentTx() throws JMSException
 1059      {
 1060         Tx id = new Tx(nextTransactionId.increment());
 1061         return id;
 1062      }
 1063   
 1064      public void insertPersistentTx(TransactionManagerStrategy tms, Connection c, Tx tx) throws JMSException
 1065      {
 1066         try
 1067         {
 1068            if (tx != null && tx.checkPersisted() == false)
 1069               addTXRecord(c, tx);
 1070         }
 1071         catch (Exception e)
 1072         {
 1073            tms.setRollbackOnly();
 1074            throw new SpyJMSException("Could not create tx: " + tx.longValue(), e);
 1075         }
 1076      }
 1077   
 1078      /////////////////////////////////////////////////////////////////////////////////
 1079      //
 1080      // Adding a message
 1081      //
 1082      /////////////////////////////////////////////////////////////////////////////////
 1083      public void add(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
 1084      {
 1085         boolean trace = log.isTraceEnabled();
 1086         if (trace)
 1087            log.trace("About to add message " + messageRef + " transaction=" + txId);
 1088   
 1089         TransactionManagerStrategy tms = new TransactionManagerStrategy();
 1090         tms.startTX();
 1091         Connection c = null;
 1092         boolean threadWasInterrupted = Thread.interrupted();
 1093         try
 1094         {
 1095            c = this.getConnection();
 1096   
 1097            // Lazily write the peristent transaction
 1098            insertPersistentTx(tms, c, txId);
 1099            
 1100            // Synchronize on the message to avoid a race with the softener
 1101            synchronized (messageRef)
 1102            {
 1103               SpyMessage message = messageRef.getMessage();
 1104   
 1105               // has it allready been stored by the message cache interface??
 1106               if (messageRef.stored == MessageReference.STORED)
 1107               {
 1108                  if (trace)
 1109                     log.trace("Updating message " + messageRef + " transaction=" + txId);
 1110   
 1111                  markMessage(c, messageRef.messageId, messageRef.getPersistentKey(), txId, "A");
 1112               }
 1113               else
 1114               {
 1115                  if (trace)
 1116                     log.trace("Inserting message " + messageRef + " transaction=" + txId);
 1117   
 1118                  add(c, messageRef.getPersistentKey(), message, txId, "A");
 1119                  messageRef.setStored(MessageReference.STORED);
 1120               }
 1121               if (trace)
 1122                  log.trace("Added message " + messageRef + " transaction=" + txId);
 1123            }
 1124         }
 1125         catch (IOException e)
 1126         {
 1127            tms.setRollbackOnly();
 1128            throw new SpyJMSException("Could not store message: " + messageRef, e);
 1129         }
 1130         catch (SQLException e)
 1131         {
 1132            tms.setRollbackOnly();
 1133            throw new SpyJMSException("Could not store message: " + messageRef, e);
 1134         }
 1135         finally
 1136         {
 1137            try
 1138            {
 1139               if (c != null)
 1140                  c.close();
 1141            }
 1142            catch (Throwable ignore)
 1143            {
 1144            }
 1145            tms.endTX();
 1146   
 1147            // Restore the interrupted state of the thread
 1148            if (threadWasInterrupted)
 1149               Thread.currentThread().interrupt();
 1150         }
 1151      }
 1152   
 1153      protected void add(Connection c, String queue, SpyMessage message, Tx txId, String mark)
 1154         throws SQLException, IOException
 1155      {
 1156         PreparedStatement stmt = null;
 1157         try
 1158         {
 1159   
 1160            stmt = c.prepareStatement(INSERT_MESSAGE);
 1161   
 1162            stmt.setLong(1, message.header.messageId);
 1163            stmt.setString(2, queue);
 1164            setBlob(stmt, 3, message);
 1165   
 1166            if (txId != null)
 1167               stmt.setLong(4, txId.longValue());
 1168            else
 1169               stmt.setNull(4, java.sql.Types.BIGINT);
 1170            stmt.setString(5, mark);
 1171   
 1172            stmt.executeUpdate();
 1173         }
 1174         finally
 1175         {
 1176            try
 1177            {
 1178               if (stmt != null)
 1179                  stmt.close();
 1180            }
 1181            catch (Throwable ignore)
 1182            {
 1183            }
 1184         }
 1185      }
 1186   
 1187      public void markMessage(Connection c, long messageid, String destination, Tx txId, String mark)
 1188         throws SQLException
 1189      {
 1190         PreparedStatement stmt = null;
 1191         try
 1192         {
 1193   
 1194            stmt = c.prepareStatement(MARK_MESSAGE);
 1195            if (txId == null)
 1196            {
 1197               stmt.setNull(1, java.sql.Types.BIGINT);
 1198            }
 1199            else
 1200            {
 1201               stmt.setLong(1, txId.longValue());
 1202            }
 1203            stmt.setString(2, mark);
 1204            stmt.setLong(3, messageid);
 1205            stmt.setString(4, destination);
 1206            stmt.executeUpdate();
 1207         }
 1208         finally
 1209         {
 1210            try
 1211            {
 1212               if (stmt != null)
 1213                  stmt.close();
 1214            }
 1215            catch (Throwable ignore)
 1216            {
 1217            }
 1218         }
 1219   
 1220      }
 1221   
 1222      public void setBlob(PreparedStatement stmt, int column, SpyMessage message) throws IOException, SQLException
 1223      {
 1224         if (blobType == OBJECT_BLOB)
 1225         {
 1226            stmt.setObject(column, message);
 1227         }
 1228         else if (blobType == BYTES_BLOB)
 1229         {
 1230            ByteArrayOutputStream baos = new ByteArrayOutputStream();
 1231            ObjectOutputStream oos = new ObjectOutputStream(baos);
 1232            SpyMessage.writeMessage(message, oos);
 1233            oos.flush();
 1234            byte[] messageAsBytes = baos.toByteArray();
 1235            stmt.setBytes(column, messageAsBytes);
 1236         }
 1237         else if (blobType == BINARYSTREAM_BLOB)
 1238         {
 1239            ByteArrayOutputStream baos = new ByteArrayOutputStream();
 1240            ObjectOutputStream oos = new ObjectOutputStream(baos);
 1241            SpyMessage.writeMessage(message, oos);
 1242            oos.flush();
 1243            byte[] messageAsBytes = baos.toByteArray();
 1244            ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
 1245            stmt.setBinaryStream(column, bais, messageAsBytes.length);
 1246         }
 1247         else if (blobType == BLOB_BLOB)
 1248         {
 1249   
 1250            throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
 1251            /** TODO:
 1252            ByteArrayOutputStream baos= new ByteArrayOutputStream();
 1253            ObjectOutputStream oos= new ObjectOutputStream(baos);
 1254            oos.writeObject(message);
 1255            byte[] messageAsBytes= baos.toByteArray();
 1256            ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
 1257            stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
 1258            */
 1259         }
 1260      }
 1261   
 1262      public void setBlob(PreparedStatement stmt, int column, Xid xid) throws IOException, SQLException
 1263      {
 1264         if (blobType == OBJECT_BLOB)
 1265         {
 1266            stmt.setObject(column, xid);
 1267         }
 1268         else if (blobType == BYTES_BLOB)
 1269         {
 1270            ByteArrayOutputStream baos = new ByteArrayOutputStream();
 1271            ObjectOutputStream oos = new ObjectOutputStream(baos);
 1272            oos.writeObject(xid);
 1273            oos.flush();
 1274            byte[] messageAsBytes = baos.toByteArray();
 1275            stmt.setBytes(column, messageAsBytes);
 1276         }
 1277         else if (blobType == BINARYSTREAM_BLOB)
 1278         {
 1279            ByteArrayOutputStream baos = new ByteArrayOutputStream();
 1280            ObjectOutputStream oos = new ObjectOutputStream(baos);
 1281            oos.writeObject(xid);
 1282            oos.flush();
 1283            byte[] messageAsBytes = baos.toByteArray();
 1284            ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
 1285            stmt.setBinaryStream(column, bais, messageAsBytes.length);
 1286         }
 1287         else if (blobType == BLOB_BLOB)
 1288         {
 1289   
 1290            throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
 1291            /** TODO:
 1292            ByteArrayOutputStream baos= new ByteArrayOutputStream();
 1293            ObjectOutputStream oos= new ObjectOutputStream(baos);
 1294            oos.writeObject(xid);
 1295            byte[] messageAsBytes= baos.toByteArray();
 1296            ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
 1297            stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
 1298            */
 1299         }
 1300      }
 1301   
 1302      /////////////////////////////////////////////////////////////////////////////////
 1303      //
 1304      // Updating a message
 1305      //
 1306      /////////////////////////////////////////////////////////////////////////////////
 1307      public void update(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
 1308      {
 1309         boolean trace = log.isTraceEnabled();
 1310         if (trace)
 1311            log.trace("Updating message " + messageRef + " transaction=" + txId);
 1312   
 1313         TransactionManagerStrategy tms = new TransactionManagerStrategy();
 1314         tms.startTX();
 1315         Connection c = null;
 1316         PreparedStatement stmt = null;
 1317         boolean threadWasInterrupted = Thread.interrupted();
 1318         try
 1319         {
 1320   
 1321            c = this.getConnection();
 1322            if (txId == null)
 1323            {
 1324   
 1325               stmt = c.prepareStatement(UPDATE_MESSAGE);
 1326               setBlob(stmt, 1, messageRef.getMessage());
 1327               stmt.setLong(2, messageRef.messageId);
 1328               stmt.setString(3, messageRef.getPersistentKey());
 1329               int rc = stmt.executeUpdate();
 1330               if (rc != 1)
 1331                  throw new SpyJMSException(
 1332                     "Could not update the message in the database: update affected " + rc + " rows");
 1333            }
 1334            else
 1335            {
 1336               throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
 1337            }
 1338            if (trace)
 1339               log.trace("Updated message " + messageRef + " transaction=" + txId);
 1340   
 1341         }
 1342         catch (IOException e)
 1343         {
 1344            tms.setRollbackOnly();
 1345            throw new SpyJMSException("Could not update message: " + messageRef, e);
 1346         }
 1347         catch (SQLException e)
 1348         {
 1349            tms.setRollbackOnly();
 1350            throw new SpyJMSException("Could not update message: " + messageRef, e);
 1351         }
 1352         finally
 1353         {
 1354            try
 1355            {
 1356               if (stmt != null)
 1357                  stmt.close();
 1358            }
 1359            catch (Throwable ignore)
 1360            {
 1361            }
 1362            try
 1363            {
 1364               if (c != null)
 1365                  c.close();
 1366            }
 1367            catch (Throwable ignore)
 1368            {
 1369            }
 1370            tms.endTX();
 1371   
 1372            // Restore the interrupted state of the thread
 1373            if (threadWasInterrupted)
 1374               Thread.currentThread().interrupt();
 1375         }
 1376   
 1377      }
 1378   
 1379      /////////////////////////////////////////////////////////////////////////////////
 1380      //
 1381      // Removing a message
 1382      //
 1383      /////////////////////////////////////////////////////////////////////////////////
 1384      public void remove(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
 1385      {
 1386         boolean trace = log.isTraceEnabled();
 1387         if (trace)
 1388            log.trace("Removing message " + messageRef + " transaction=" + txId);
 1389   
 1390         TransactionManagerStrategy tms = new TransactionManagerStrategy();
 1391         tms.startTX();
 1392         Connection c = null;
 1393         PreparedStatement stmt = null;
 1394         boolean threadWasInterrupted = Thread.interrupted();
 1395         try
 1396         {
 1397            c = this.getConnection();
 1398   
 1399            // Lazily write the peristent transaction
 1400            insertPersistentTx(tms, c, txId);
 1401            
 1402            // Synchronize on the message to avoid a race with the softener
 1403            synchronized (messageRef)
 1404            {
 1405               if (txId == null)
 1406               {
 1407                  stmt = c.prepareStatement(DELETE_MESSAGE);
 1408                  stmt.setLong(1, messageRef.messageId);
 1409                  stmt.setString(2, messageRef.getPersistentKey());
 1410   
 1411                  // Adrian Brock:
 1412                  // Remove the message from the cache, but don't 
 1413                  // return it to the pool just yet. The queue still holds
 1414                  // a reference to the message and will return it
 1415                  // to the pool once it gets enough time slice.
 1416                  // The alternative is to remove the validation
 1417                  // for double removal from the cache, 
 1418                  // which I don't want to do because it is useful
 1419                  // for spotting errors
 1420                  messageRef.setStored(MessageReference.NOT_STORED);
 1421                  messageRef.removeDelayed();
 1422               }
 1423               else
 1424               {
 1425                  stmt = c.prepareStatement(MARK_MESSAGE);
 1426                  stmt.setLong(1, txId.longValue());
 1427                  stmt.setString(2, "D");
 1428                  stmt.setLong(3, messageRef.messageId);
 1429                  stmt.setString(4, messageRef.getPersistentKey());
 1430               }
 1431   
 1432                int tries = 0;
 1433                while (true)
 1434                {
 1435                   try
 1436                   {
 1437                      int rc = stmt.executeUpdate();
 1438   
 1439                      if (tries > 0)
 1440                      {
 1441                         if (rc != 1)
 1442                            throw new SpyJMSException(
 1443                              "Could not mark the message as deleted in the database: update affected " + rc + " rows." + CONCURRENCY_WARNING);
 1444   
 1445                         log.warn("Remove operation worked after " +tries +" retries");
 1446                      }
 1447                      break;
 1448                   }
 1449                   catch (SQLException e)
 1450                   {
 1451                      log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
 1452                      tries++;
 1453                      if (tries >= statementRetries)
 1454                      {
 1455                         log.error("Retried " + tries + " times, now giving up");
 1456                         throw new IllegalStateException("Could not remove message after " +tries + "attempts");
 1457                      }
 1458                      log.warn("Trying again after a pause");
 1459                      //Now we wait for a random amount of time to minimise risk of deadlock
 1460                      Thread.sleep((long)(Math.random() * 500));
 1461                   }
 1462                }
 1463   
 1464               if (trace)
 1465                  log.trace("Removed message " + messageRef + " transaction=" + txId);
 1466            }
 1467         }
 1468         catch (Exception e)
 1469         {
 1470            tms.setRollbackOnly();
 1471            throw new SpyJMSException("Could not remove message: " + messageRef, e);
 1472         }
 1473         finally
 1474         {
 1475            try
 1476            {
 1477               if (stmt != null)
 1478                  stmt.close();
 1479            }
 1480            catch (Throwable ignore)
 1481            {
 1482            }
 1483            try
 1484            {
 1485               if (c != null)
 1486                  c.close();
 1487            }
 1488            catch (Throwable ignore)
 1489            {
 1490            }
 1491            tms.endTX();
 1492   
 1493            // Restore the interrupted state of the thread
 1494            if (threadWasInterrupted)
 1495               Thread.currentThread().interrupt();
 1496         }
 1497   
 1498      }
 1499   
 1500      /////////////////////////////////////////////////////////////////////////////////
 1501      //
 1502      // Misc. PM functions
 1503      //
 1504      /////////////////////////////////////////////////////////////////////////////////
 1505   
 1506      public TxManager getTxManager()
 1507      {
 1508         return txManager;
 1509      }
 1510   
 1511      public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
 1512      {
 1513         // Nothing to clean up, all the state is in the db.
 1514      }
 1515   
 1516      public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException
 1517      {
 1518         if (log.isTraceEnabled())
 1519            log.trace("Loading message from storage " + messageRef);
 1520   
 1521         TransactionManagerStrategy tms = new TransactionManagerStrategy();
 1522         tms.startTX();
 1523         Connection c = null;
 1524         PreparedStatement stmt = null;
 1525         ResultSet rs = null;
 1526         boolean threadWasInterrupted = Thread.interrupted();
 1527         try
 1528         {
 1529   
 1530            c = this.getConnection();
 1531            stmt = c.prepareStatement(SELECT_MESSAGE);
 1532            stmt.setLong(1, messageRef.messageId);
 1533            stmt.setString(2, messageRef.getPersistentKey());
 1534   
 1535            rs = stmt.executeQuery();
 1536            if (rs.next())
 1537               return extractMessage(rs);
 1538            else
 1539               throw new SpyJMSException("Could not load message from storage: " + messageRef + " " + CONCURRENCY_WARNING);
 1540   
 1541         }
 1542         catch (Exception e)
 1543         {
 1544            tms.setRollbackOnly();
 1545            SpyJMSException.rethrowAsJMSException("Could not load message : " + messageRef, e);
 1546            throw new UnreachableStatementException();
 1547         }
 1548         finally
 1549         {
 1550            try
 1551            {
 1552               if (rs != null)
 1553                  rs.close();
 1554            }
 1555            catch (Throwable ignore)
 1556            {
 1557            }
 1558            try
 1559            {
 1560               if (stmt != null)
 1561                  stmt.close();
 1562            }
 1563            catch (Throwable ignore)
 1564            {
 1565            }
 1566            try
 1567            {
 1568               if (c != null)
 1569                  c.close();
 1570            }
 1571            catch (Throwable ignore)
 1572            {
 1573            }
 1574            tms.endTX();
 1575   
 1576            // Restore the interrupted state of the thread
 1577            if (threadWasInterrupted)
 1578               Thread.currentThread().interrupt();
 1579         }
 1580      }
 1581   
 1582      /////////////////////////////////////////////////////////////////////////////////
 1583      //
 1584      // CacheStore Functions
 1585      //
 1586      /////////////////////////////////////////////////////////////////////////////////   
 1587      public void removeFromStorage(MessageReference messageRef) throws JMSException
 1588      {
 1589         // We don't remove persistent messages sent to persistent queues
 1590         if (messageRef.isPersistent())
 1591            return;
 1592   
 1593         boolean trace = log.isTraceEnabled();
 1594         if (trace)
 1595            log.trace("Removing message from storage " + messageRef);
 1596   
 1597         TransactionManagerStrategy tms = new TransactionManagerStrategy();
 1598         tms.startTX();
 1599         Connection c = null;
 1600         PreparedStatement stmt = null;
 1601         boolean threadWasInterrupted = Thread.interrupted();
 1602         try
 1603         {
 1604            c = this.getConnection();
 1605            stmt = c.prepareStatement(DELETE_MESSAGE);
 1606            stmt.setLong(1, messageRef.messageId);
 1607            stmt.setString(2, messageRef.getPersistentKey());
 1608            stmt.executeUpdate();
 1609            messageRef.setStored(MessageReference.NOT_STORED);
 1610   
 1611            if (trace)
 1612               log.trace("Removed message from storage " + messageRef);
 1613         }
 1614         catch (SQLException e)
 1615         {
 1616            tms.setRollbackOnly();
 1617            throw new SpyJMSException("Could not remove message: " + messageRef, e);
 1618         }
 1619         finally
 1620         {
 1621            try
 1622            {
 1623               if (stmt != null)
 1624                  stmt.close();
 1625            }
 1626            catch (Throwable ignore)
 1627            {
 1628            }
 1629            try
 1630            {
 1631               if (c != null)
 1632                  c.close();
 1633            }
 1634            catch (Throwable ignore)
 1635            {
 1636            }
 1637            tms.endTX();
 1638   
 1639            // Restore the interrupted state of the thread
 1640            if (threadWasInterrupted)
 1641               Thread.currentThread().interrupt();
 1642         }
 1643      }
 1644   
 1645      public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException
 1646      {
 1647         // Ignore save operations for persistent messages sent to persistent queues
 1648         // The queues handle the persistence
 1649         if (messageRef.isPersistent())
 1650            return;
 1651   
 1652         boolean trace = log.isTraceEnabled();
 1653         if (trace)
 1654            log.trace("Saving message to storage " + messageRef);
 1655   
 1656         TransactionManagerStrategy tms = new TransactionManagerStrategy();
 1657         tms.startTX();
 1658         Connection c = null;
 1659         boolean threadWasInterrupted = Thread.interrupted();
 1660         try
 1661         {
 1662   
 1663            c = this.getConnection();
 1664            add(c, messageRef.getPersistentKey(), message, null, "T");
 1665            messageRef.setStored(MessageReference.STORED);
 1666   
 1667            if (trace)
 1668               log.trace("Saved message to storage " + messageRef);
 1669         }
 1670         catch (IOException e)
 1671         {
 1672            tms.setRollbackOnly();
 1673            throw new SpyJMSException("Could not store message: " + messageRef, e);
 1674         }
 1675         catch (SQLException e)
 1676         {
 1677            tms.setRollbackOnly();
 1678            throw new SpyJMSException("Could not store message: " + messageRef, e);
 1679         }
 1680         finally
 1681         {
 1682            try
 1683            {
 1684               if (c != null)
 1685                  c.close();
 1686            }
 1687            catch (Throwable ignore)
 1688            {
 1689            }
 1690            tms.endTX();
 1691   
 1692            // Restore the interrupted state of the thread
 1693            if (threadWasInterrupted)
 1694               Thread.currentThread().interrupt();
 1695         }
 1696      }
 1697   
 1698      /**
 1699       * Gets a connection from the datasource, retrying as needed.  This was
 1700       * implemented because in some minimal configurations (i.e. little logging
 1701       * and few services) the database wasn't ready when we tried to get a
 1702       * connection.  We, therefore, implement a retry loop wich is controled
 1703       * by the ConnectionRetryAttempts attribute.  Submitted by terry@amicas.com
 1704       *
 1705       * @return the connection
 1706       * @exception SQLException if an error occurs.
 1707       */
 1708      protected Connection getConnection() throws SQLException
 1709      {
 1710         int attempts = this.connectionRetryAttempts;
 1711         int attemptCount = 0;
 1712         SQLException sqlException = null;
 1713         while (attempts-- > 0)
 1714         {
 1715            if (++attemptCount > 1)
 1716            {
 1717               log.debug("Retrying connection: attempt # " + attemptCount);
 1718            }
 1719            try
 1720            {
 1721               sqlException = null;
 1722               return datasource.getConnection();
 1723            }
 1724            catch (SQLException exception)
 1725            {
 1726               log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
 1727               sqlException = exception;
 1728            }
 1729            finally
 1730            {
 1731               if (sqlException == null && attemptCount > 1)
 1732               {
 1733                  log.debug("Connection succeeded on attempt # " + attemptCount);
 1734               }
 1735            }
 1736   
 1737            if (attempts > 0)
 1738            {
 1739               try
 1740               {
 1741                  Thread.sleep(1500);
 1742               }
 1743               catch (InterruptedException interruptedException)
 1744               {
 1745                  break;
 1746               }
 1747            }
 1748         }
 1749         if (sqlException != null)
 1750         {
 1751            throw sqlException;
 1752         }
 1753         throw new SQLException("connection attempt interrupted");
 1754      }
 1755   
 1756      /////////////////////////////////////////////////////////////////////////////////
 1757      //
 1758      // JMX Interface 
 1759      //
 1760      /////////////////////////////////////////////////////////////////////////////////
 1761      
 1762      /** The object name of the DataSource */
 1763      protected ObjectName connectionManagerName;
 1764      
 1765      /** The SQL properties */
 1766      protected Properties sqlProperties = new Properties();
 1767   
 1768      public void startService() throws Exception
 1769      {
 1770         UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
 1771         UPDATE_MARKED_MESSAGES_XARECOVERY = 
 1772            sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_XARECOVERY", UPDATE_MARKED_MESSAGES_XARECOVERY);
 1773         UPDATE_MARKED_MESSAGES_WITH_TX =
 1774            sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX);
 1775         DELETE_MARKED_MESSAGES_WITH_TX =
 1776            sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX);
 1777         DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY =
 1778            sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY", DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY);
 1779         DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
 1780         DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
 1781         DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
 1782         INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
 1783         INSERT_TX_XARECOVERY = sqlProperties.getProperty("INSERT_TX_XARECOVERY", INSERT_TX_XARECOVERY);
 1784         DELETE_ALL_TX = sqlProperties.getProperty("DELETE_ALL_TX", DELETE_ALL_TX);
 1785         DELETE_ALL_TX_XARECOVERY = sqlProperties.getProperty("DELETE_ALL_TX_XARECOVERY", DELETE_ALL_TX_XARECOVERY);
 1786         SELECT_ALL_TX_XARECOVERY = sqlProperties.getProperty("SELECT_ALL_TX_XARECOVERY", SELECT_ALL_TX_XARECOVERY);
 1787         SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX);
 1788         SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
 1789         SELECT_MESSAGES_IN_DEST_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST_XARECOVERY", SELECT_MESSAGES_IN_DEST_XARECOVERY);
 1790         SELECT_MESSAGE_KEYS_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGE_KEYS_IN_DEST", SELECT_MESSAGE_KEYS_IN_DEST);
 1791         SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE);
 1792         SELECT_MESSAGE_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGE_XARECOVERY", SELECT_MESSAGE_XARECOVERY);
 1793         INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE);
 1794         MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE);
 1795         DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE);
 1796         UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE);
 1797         CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
 1798         CREATE_IDX_MESSAGE_TXOP_TXID = sqlProperties.getProperty("CREATE_IDX_MESSAGE_TXOP_TXID", CREATE_IDX_MESSAGE_TXOP_TXID);
 1799         CREATE_IDX_MESSAGE_DESTINATION = sqlProperties.getProperty("CREATE_IDX_MESSAGE_DESTINATION", CREATE_IDX_MESSAGE_DESTINATION);
 1800         CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE);
 1801         CREATE_TX_TABLE_XARECOVERY = sqlProperties.getProperty("CREATE_TX_TABLE_XARECOVERY", CREATE_TX_TABLE_XARECOVERY);
 1802         createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
 1803         String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");
 1804   
 1805         if (s.equals("OBJECT_BLOB"))
 1806         {
 1807            blobType = OBJECT_BLOB;
 1808         }
 1809         else if (s.equals("BYTES_BLOB"))
 1810         {
 1811            blobType = BYTES_BLOB;
 1812         }
 1813         else if (s.equals("BINARYSTREAM_BLOB"))
 1814         {
 1815            blobType = BINARYSTREAM_BLOB;
 1816         }
 1817         else if (s.equals("BLOB_BLOB"))
 1818         {
 1819            blobType = BLOB_BLOB;
 1820         }
 1821   
 1822   
 1823         // initialize tm and datasource
 1824         initializeFields();
 1825   
 1826         log.debug("Creating Schema");
 1827         try
 1828         {
 1829            createSchema();
 1830         }
 1831         catch (Exception e)
 1832         {
 1833            log.warn("Error creating schema", e);
 1834         }
 1835         
 1836         log.debug("Resolving uncommited TXS");
 1837         Throwable error = null;
 1838         for (int i = 0; i <= recoveryRetries; ++i)
 1839         {
 1840            try
 1841            {
 1842               resolveAllUncommitedTXs();
 1843               
 1844               // done
 1845               break;
 1846            }
 1847            catch (Throwable t)
 1848            {
 1849               if (i < recoveryRetries)
 1850                  log.warn("Error resolving transactions retries=" + i + " of " + recoveryRetries, t);
 1851               else
 1852                  error = t;
 1853            }
 1854         }
 1855         
 1856         if (error != null)
 1857            SpyJMSException.rethrowAsJMSException("Unable to resolve transactions retries=" + recoveryRetries, error);
 1858      }
 1859   
 1860      protected void initializeFields()
 1861              throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, ReflectionException, NamingException
 1862      {
 1863         //Find the ConnectionFactoryLoader MBean so we can find the datasource
 1864         String dsName = (String) getServer().getAttribute(connectionManagerName, "BindName");
 1865         //Get an InitialContext
 1866   
 1867         InitialContext ctx = new InitialContext();
 1868         datasource = (DataSource) ctx.lookup(dsName);
 1869   
 1870         //Get the Transaction Manager so we can control the jdbc tx
 1871         tm = TransactionManagerLocator.locateTransactionManager();
 1872      }
 1873   
 1874      public Object getInstance()
 1875      {
 1876         return this;
 1877      }
 1878   
 1879      public ObjectName getMessageCache()
 1880      {
 1881         throw new UnsupportedOperationException("This is now set on the destination manager");
 1882      }
 1883   
 1884      public void setMessageCache(ObjectName messageCache)
 1885      {
 1886         throw new UnsupportedOperationException("This is now set on the destination manager");
 1887      }
 1888   
 1889      public ObjectName getConnectionManager()
 1890      {
 1891         return connectionManagerName;
 1892      }
 1893   
 1894      public void setConnectionManager(ObjectName connectionManagerName)
 1895      {
 1896         this.connectionManagerName = connectionManagerName;
 1897      }
 1898   
 1899      public MessageCache getMessageCacheInstance()
 1900      {
 1901         throw new UnsupportedOperationException("This is now set on the destination manager");
 1902      }
 1903   
 1904      public String getSqlProperties()
 1905      {
 1906         try
 1907         {
 1908            ByteArrayOutputStream boa = new ByteArrayOutputStream();
 1909            sqlProperties.store(boa, "");
 1910            return new String(boa.toByteArray());
 1911         }
 1912         catch (IOException shouldnothappen)
 1913         {
 1914            return "";
 1915         }
 1916      }
 1917   
 1918      public void setSqlProperties(String value)
 1919      {
 1920         try
 1921         {
 1922            ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
 1923            sqlProperties = new Properties();
 1924            sqlProperties.load(is);
 1925         }
 1926         catch (IOException shouldnothappen)
 1927         {
 1928         }
 1929      }
 1930   
 1931      public void setConnectionRetryAttempts(int value)
 1932      {
 1933         this.connectionRetryAttempts = value;
 1934      }
 1935   
 1936      public int getConnectionRetryAttempts()
 1937      {
 1938         return this.connectionRetryAttempts;
 1939      }
 1940     
 1941      public int getRecoveryTimeout()
 1942      {
 1943         return recoveryTimeout;
 1944      }
 1945   
 1946      public void setRecoveryTimeout(int timeout)
 1947      {
 1948         this.recoveryTimeout = timeout;
 1949      }
 1950      
 1951      public int getRecoveryRetries()
 1952      {
 1953         return recoveryRetries;
 1954      }
 1955   
 1956      public void setRecoveryRetries(int retries)
 1957      {
 1958         this.recoveryRetries = retries;
 1959      }
 1960   
 1961      public int getRecoverMessagesChunk()
 1962      {
 1963         return recoverMessagesChunk;
 1964      }
 1965   
 1966      public void setRecoverMessagesChunk(int recoverMessagesChunk)
 1967      {
 1968         if (recoverMessagesChunk != 0 && recoverMessagesChunk != 1)
 1969         {
 1970            log.warn("Only the values 0 and 1 are currently support for chunk size, using chunk size=1");
 1971            recoverMessagesChunk = 1;
 1972         }
 1973         this.recoverMessagesChunk = recoverMessagesChunk;
 1974      }
 1975   
 1976      public boolean isXARecovery()
 1977      {
 1978         return xaRecovery;
 1979      }
 1980   
 1981      public void setXARecovery(boolean xaRecovery)
 1982      {
 1983         this.xaRecovery = xaRecovery;
 1984      }
 1985   
 1986      public int getStatementRetries()
 1987      {
 1988         return statementRetries;
 1989      }
 1990   
 1991      public void setStatementRetries(int statementRetries)
 1992      {
 1993         if (statementRetries < 0)
 1994            statementRetries = 0;
 1995         this.statementRetries = statementRetries;
 1996      }
 1997   }

Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » pm » jdbc2 » [javadoc | source]