Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » ejb » plugins » jms » [javadoc | source]
    1   /***************************************
    2    *                                     *
    3    *  JBoss: The OpenSource J2EE WebOS   *
    4    *                                     *
    5    *  Distributable under LGPL license.  *
    6    *  See terms of license at gnu.org.   *
    7    *                                     *
    8    ***************************************/
    9   
   10   package org.jboss.ejb.plugins.jms;
   11   
   12   import java.util.Hashtable;
   13   import java.util.HashMap;
   14   import java.util.Map;
   15   import java.util.Enumeration;
   16   import java.util.Iterator;
   17   
   18   import javax.naming.InitialContext;
   19   import javax.naming.Context;
   20   import javax.jms.Session;
   21   import javax.jms.QueueConnection;
   22   import javax.jms.QueueConnectionFactory;
   23   import javax.jms.QueueSession;
   24   import javax.jms.QueueSender;
   25   import javax.jms.Queue;
   26   import javax.jms.Message;
   27   import javax.jms.JMSException;
   28   import javax.transaction.Status;
   29   import javax.transaction.Synchronization;
   30   import javax.transaction.Transaction;
   31   
   32   import org.w3c.dom.Element;
   33   
   34   import org.jboss.logging.Logger;
   35   import org.jboss.deployment.DeploymentException;
   36   import org.jboss.metadata.MetaData;
   37   import org.jboss.jms.jndi.JMSProviderAdapter;
   38   import org.jboss.system.ServiceMBeanSupport;
   39   
   40   /**
   41    * Places redeliveded messages on a Dead Letter Queue.
   42    *
   43    *<p>
   44    *The Dead Letter Queue handler is used to not set JBoss in an endles loop
   45    * when a message is resent on and on due to transaction rollback for
   46    * message receipt.
   47    *
   48    * <p>
   49    * It sends message to a dead letter queue (configurable, defaults to
   50    * queue/DLQ) when the message has been resent a configurable amount of times,
   51    * defaults to 10.
   52    *
   53    * <p>
   54    * The handler is configured through the element MDBConfig in
   55    * container-invoker-conf.
   56    *
   57    * <p>
   58    * The JMS property JBOSS_ORIG_DESTINATION in the resent message is set
   59    * to the name of the original destination (Destionation.toString()).
   60    *
   61    * <p>
   62    * The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set
   63    * to the id of the original message.
   64    *
   65    * Created: Thu Aug 23 21:17:26 2001
   66    *
   67    * @version <tt>$Revision: 1.11.2.9 $</tt>
   68    * @author ???
   69    * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
   70    */
   71   public class DLQHandler
   72      extends ServiceMBeanSupport
   73   {
   74      /** JMS property name holding original destination. */
   75      public static final String JBOSS_ORIG_DESTINATION ="JBOSS_ORIG_DESTINATION";
   76      
   77      /** JMS property name holding original JMS message id. */
   78      public static final String JBOSS_ORIG_MESSAGEID="JBOSS_ORIG_MESSAGEID";
   79      
   80      /** Properties copied from org.jboss.mq.SpyMessage */
   81      private static final String JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT";
   82      private static final String JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT";
   83      
   84      // Configuratable stuff
   85   
   86      /**
   87       * Destination to send dead letters to.
   88       * 
   89       * <p>
   90       * Defaults to <em>queue/DLQ</em>, configurable through
   91       * <tt>DestinationQueue</tt> element.
   92       */
   93      private String destinationJNDI = "queue/DLQ";
   94      
   95      /**
   96       * Maximum times a message is alowed to be resent.
   97       *
   98       * <p>Defaults to <em>10</em>, configurable through
   99       * <tt>MaxTimesRedelivered</tt> element.
  100       */
  101      private int maxResent = 10;
  102      
  103      /**
  104       * Time to live for the message.
  105       *
  106       * <p>
  107       * Defaults to <em>{@link Message#DEFAULT_TIME_TO_LIVE}</em>, 
  108       * configurable through the <tt>TimeToLive</tt> element.
  109       */
  110      private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
  111      
  112      // May become configurable
  113      
  114      /** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
  115      private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
  116   
  117      /** Priority for the message, Message.DEFAULT_PRIORITY */
  118      private int priority = Message.DEFAULT_PRIORITY;
  119   
  120      /** The dlq user for the connection */
  121      private String dlqUser;
  122   
  123      /** The dlq password for the connection */
  124      private String dlqPass;
  125      
  126      // Private stuff
  127      private QueueConnection connection;
  128      private Queue dlq;
  129      private JMSProviderAdapter providerAdapter;
  130      private Hashtable resentBuffer = new Hashtable();
  131   
  132      public DLQHandler(final JMSProviderAdapter providerAdapter)
  133      {
  134         this.providerAdapter = providerAdapter;
  135      }
  136   
  137      //--- Service
  138      
  139      /**
  140       * Initalize the service.
  141       *
  142       * @throws Exception    Service failed to initalize.
  143       */
  144      protected void createService() throws Exception
  145      {
  146         Context ctx = providerAdapter.getInitialContext();
  147         
  148         try {
  149            String factoryName = providerAdapter.getQueueFactoryRef();
  150            QueueConnectionFactory factory = (QueueConnectionFactory)
  151               ctx.lookup(factoryName);
  152            log.debug("Using factory: " + factory);
  153            
  154            if (dlqUser == null)
  155               connection = factory.createQueueConnection();
  156            else
  157               connection = factory.createQueueConnection(dlqUser, dlqPass);
  158            log.debug("Created connection: " + connection);
  159   
  160            dlq = (Queue)ctx.lookup(destinationJNDI);
  161            log.debug("Using Queue: " + dlq);
  162         }
  163         catch (Exception e)
  164         {
  165            if (e instanceof JMSException)
  166               throw e;
  167            else
  168            {
  169               JMSException x = new JMSException("Error creating the dlq connection: " + e.getMessage());
  170               x.setLinkedException(e);
  171               throw x;
  172            }
  173         }
  174         finally {
  175            ctx.close();
  176         }
  177      }
  178      
  179      protected void destroyService() throws Exception
  180      {
  181         // Help the GC
  182         if (connection != null)
  183            connection.close();
  184         connection = null;
  185         dlq = null;
  186         providerAdapter = null;
  187      }
  188      
  189      //--- Logic
  190      
  191      /**
  192       * Check if a message has been redelivered to many times.
  193       *
  194       * If message has been redelivered to many times, send it to the
  195       * dead letter queue (default to queue/DLQ).
  196       *
  197       * @return true if message is handled (i.e resent), false if not.
  198       */
  199      public boolean handleRedeliveredMessage(final Message msg, final Transaction tx)
  200      {
  201         boolean handled = false;
  202         int max = this.maxResent;
  203         String id = null;
  204         boolean jbossmq = true;
  205         int count = 0;
  206         
  207         try
  208         {
  209   
  210            if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
  211               max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT); 
  212   
  213            if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
  214               count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
  215            else
  216            {
  217               id = msg.getJMSMessageID();
  218               if (id == null)
  219               {
  220                  // if we can't get the id we are basically fucked
  221                  log.error("Message id is null, can't handle message");
  222                  return false;
  223               }
  224               count = incrementResentCount(id);
  225               jbossmq = false;
  226            }
  227            
  228            if (count > max)
  229            {
  230               id = msg.getJMSMessageID();
  231               log.warn("Message resent too many times; sending it to DLQ; message id=" + id);
  232               
  233               sendMessage(msg);
  234               deleteFromBuffer(id);
  235   
  236               handled = true;
  237            }
  238            else if (jbossmq == false && tx != null)
  239            {
  240               // Register a synchronization to remove the buffer entry
  241               // should the transaction commit
  242               DLQSynchronization synch = new DLQSynchronization(id);
  243               try
  244               {
  245                  tx.registerSynchronization(synch);
  246               }
  247               catch (Exception e)
  248               {
  249                  log.warn("Error registering DlQ Synchronization with transaction " + tx, e);
  250               }
  251            }
  252         }
  253         catch (JMSException e)
  254         {
  255            // If we can't send it ahead, we do not dare to just drop it...or?
  256            log.error("Could not send message to Dead Letter Queue", e);
  257         }
  258         
  259         return handled;
  260      }
  261   
  262      /**
  263       * Send message to the configured dead letter queue, defaults to queue/DLQ.
  264       */
  265      protected void sendMessage(Message msg) throws JMSException
  266      {
  267         boolean trace = log.isTraceEnabled();
  268         
  269         QueueSession session = null;
  270         QueueSender sender = null;
  271   
  272         try
  273         {
  274            msg = makeWritable(msg, trace); // Don't know yet if we are gona clone or not
  275            
  276            // Set the properties
  277            msg.setStringProperty(JBOSS_ORIG_MESSAGEID,
  278            msg.getJMSMessageID());
  279            msg.setStringProperty(JBOSS_ORIG_DESTINATION,
  280            msg.getJMSDestination().toString());
  281            
  282            session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
  283            sender = session.createSender(dlq);
  284            if (trace) {
  285               log.trace("Sending message to DLQ; destination=" +
  286                         dlq + ", session=" + session + ", sender=" + sender);
  287            }
  288   
  289            sender.send(msg, deliveryMode, priority, timeToLive);
  290   
  291            if (trace) {
  292               log.trace("Message sent.");
  293            }
  294            
  295         }
  296         finally
  297         {
  298            try
  299            {
  300               if (sender != null) sender.close();
  301               if (session != null) session.close();
  302            }
  303            catch(Exception e)
  304            {
  305               log.warn("Failed to close sender or session; ignoring", e);
  306            }
  307         }
  308      }
  309   
  310      /**
  311       * Increment the counter for the specific JMS message id.
  312       *
  313       * @return the new counter value.
  314       */
  315      protected int incrementResentCount(String id)
  316      {
  317         BufferEntry entry = null;
  318         boolean trace = log.isTraceEnabled();
  319         if(!resentBuffer.containsKey(id))
  320         {
  321            if (trace)
  322            log.trace("Making new entry for id " + id);
  323            entry = new BufferEntry();
  324            entry.id = id;
  325            entry.count = 1;
  326            resentBuffer.put(id,entry);
  327         } else
  328         {
  329            entry = (BufferEntry)resentBuffer.get(id);
  330            entry.count++;
  331            if (trace)
  332            log.trace("Incremented old entry for id " + id + " count " + entry.count);
  333         }
  334         return entry.count;
  335      }
  336      
  337      /**
  338       * Delete the entry in the message counter buffer for specifyed JMS id.
  339       */
  340      protected void deleteFromBuffer(String id)
  341      {
  342         resentBuffer.remove(id);
  343      }
  344      
  345      /**
  346       * Make the Message properties writable.
  347       *
  348       * @return the writable message.
  349       */
  350      protected Message makeWritable(Message msg, boolean trace) throws JMSException
  351      {
  352         HashMap tmp = new HashMap();
  353   
  354         // Save properties
  355         for (Enumeration en=msg.getPropertyNames(); en.hasMoreElements();)
  356         {
  357            String key = (String)en.nextElement();
  358            tmp.put(key, msg.getObjectProperty(key));
  359         }
  360         
  361         // Make them writable
  362         msg.clearProperties();
  363         
  364         Iterator i = tmp.entrySet().iterator();
  365         while (i.hasNext())
  366         {
  367            Map.Entry me = (Map.Entry)i.next();
  368            String key = (String) me.getKey();
  369            try
  370            {
  371               msg.setObjectProperty(key, me.getValue());
  372            }
  373            catch (JMSException ignored)
  374            {
  375               if (trace)
  376                  log.trace("Could not copy message property " + key, ignored);
  377            }
  378         }
  379         
  380         return msg;
  381      }
  382      
  383      /**
  384       * Takes an MDBConfig Element
  385       */
  386      public void importXml(final Element element) throws DeploymentException
  387      {
  388         destinationJNDI = MetaData.getElementContent
  389            (MetaData.getUniqueChild(element, "DestinationQueue"));
  390         
  391         try
  392         {
  393            String mr = MetaData.getElementContent
  394               (MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
  395            maxResent = Integer.parseInt(mr);
  396         }
  397         catch (Exception ignore) {}
  398   
  399         try {
  400            String ttl = MetaData.getElementContent
  401               (MetaData.getUniqueChild(element, "TimeToLive"));
  402            timeToLive = Long.parseLong(ttl);
  403            
  404            if (timeToLive < 0) {
  405               log.warn("Invalid TimeToLive: " + timeToLive + "; using default");
  406               timeToLive = Message.DEFAULT_TIME_TO_LIVE;
  407            }
  408         }
  409         catch (Exception ignore) {}
  410   
  411         dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQUser"));
  412         dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQPassword"));
  413      }
  414      
  415      public String toString()
  416      {
  417         return super.toString() +
  418            "{ destinationJNDI=" +  destinationJNDI +
  419            ", maxResent=" + maxResent +
  420            ", timeToLive=" + timeToLive +
  421            " }";
  422      }
  423      
  424      private static class BufferEntry
  425      {
  426         int count;
  427         String id;
  428      }
  429   
  430      /**
  431       * Remove a redelivered message from the DLQ's buffer when it is acknowledged
  432       */
  433      protected class DLQSynchronization
  434         implements Synchronization
  435      {
  436         /** The message id */
  437         String id;
  438   
  439         public DLQSynchronization(String id)
  440         {
  441            this.id = id;
  442         }
  443   
  444         public void beforeCompletion()
  445         {
  446         }
  447   
  448         /**
  449          * Forget the message when the transaction commits
  450          */
  451         public void afterCompletion(int status)
  452         {
  453            if (status == Status.STATUS_COMMITTED)
  454               deleteFromBuffer(id);
  455         }
  456      }
  457   }

Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » ejb » plugins » jms » [javadoc | source]