Save This Page
Home » JBoss-5.1.0 » org » jboss » mq » [javadoc | source]
    1   /*
    2    * JBoss, Home of Professional Open Source.
    3    * Copyright 2006, Red Hat Middleware LLC, and individual contributors
    4    * as indicated by the @author tags. See the copyright.txt file in the
    5    * distribution for a full listing of individual contributors.
    6    *
    7    * This is free software; you can redistribute it and/or modify it
    8    * under the terms of the GNU Lesser General Public License as
    9    * published by the Free Software Foundation; either version 2.1 of
   10    * the License, or (at your option) any later version.
   11    *
   12    * This software is distributed in the hope that it will be useful,
   13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15    * Lesser General Public License for more details.
   16    *
   17    * You should have received a copy of the GNU Lesser General Public
   18    * License along with this software; if not, write to the Free
   19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21    */
   22   package org.jboss.mq;
   23   
   24   import java.util.LinkedList;
   25   
   26   import javax.jms.Destination;
   27   import javax.jms.IllegalStateException;
   28   import javax.jms.InvalidSelectorException;
   29   import javax.jms.JMSException;
   30   import javax.jms.Message;
   31   import javax.jms.MessageConsumer;
   32   import javax.jms.MessageListener;
   33   import javax.jms.Session;
   34   
   35   import org.jboss.logging.Logger;
   36   import org.jboss.util.UnreachableStatementException;
   37   
   38   import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
   39   
   40   /**
   41    * This class implements <tt>javax.jms.MessageConsumer</tt>.
   42    * 
   43    * @author Norbert Lataille (Norbert.Lataille@m4x.org)
   44    * @author Hiram Chirino (Cojonudo14@hotmail.com)
   45    * @author David Maplesden (David.Maplesden@orion.co.nz)
   46    * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
   47    * @version $Revision: 74988 $
   48    */
   49   public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable
   50   {
   51      /** The log */
   52      static Logger log = Logger.getLogger(SpyMessageConsumer.class);
   53   
   54      /** Is trace enabled */
   55      static boolean trace = log.isTraceEnabled();
   56   
   57      /** Delivered once */
   58      static final Integer ONCE = new Integer(1);
   59      
   60      /** Link to my session */
   61      public SpySession session;
   62      /** The subscription structure should be fill out by the descendent */
   63      public Subscription subscription = new Subscription();
   64      /** Are we closed ? */
   65      private SynchronizedBoolean closed = new SynchronizedBoolean(false);
   66      /** The state lock */
   67      protected Object stateLock = new Object();
   68      /** Are we receiving a message */
   69      protected boolean receiving = false;
   70      /** Are we waiting for a message */
   71      protected boolean waitingForMessage = false;
   72      /** Are we listening */
   73      protected boolean listening = false;
   74      /** The listener thread */
   75      protected Thread listenerThread = null;
   76      /** My message listener (null if none) */
   77      MessageListener messageListener;
   78      /** List of Pending messages (not yet delivered) */
   79      LinkedList messages;
   80      /** Is this a session consumer? */
   81      boolean sessionConsumer;
   82   
   83      /**
   84   	 * Create a new SpyMessageConsumer
   85   	 * 
   86   	 * @param s the session
   87   	 * @param sessionConsumer true for a session consumer, false otherwise
   88   	 */
   89      SpyMessageConsumer(SpySession s, boolean sessionConsumer)
   90      {
   91         trace = log.isTraceEnabled();
   92   
   93         session = s;
   94         this.sessionConsumer = sessionConsumer;
   95         messageListener = null;
   96         messages = new LinkedList();
   97   
   98         if (trace)
   99            log.trace("New message consumer " + this);
  100      }
  101   
  102      /**
  103       * Create a new SpyMessageConsumer
  104       * 
  105       * @param s the session
  106       * @param sessionConsumer true for a session consumer, false otherwise
  107       * @param destination the destination
  108       * @param selector the selector
  109       * @param noLocal true for noLocal, false otherwise
  110       */
  111      SpyMessageConsumer(SpySession s, boolean sessionConsumer, SpyDestination destination, String selector, boolean noLocal) throws InvalidSelectorException
  112      {
  113         trace = log.isTraceEnabled();
  114   
  115         session = s;
  116         this.sessionConsumer = sessionConsumer;
  117         subscription.destination = destination;
  118         subscription.messageSelector = selector;
  119         subscription.noLocal = noLocal;
  120   
  121         // If the selector is set, try to build it, throws an
  122         // InvalidSelectorException
  123         // if it is not valid.
  124         if (subscription.messageSelector != null)
  125            subscription.getSelector();
  126         
  127         messageListener = null;
  128         messages = new LinkedList();
  129   
  130         if (trace)
  131            log.trace("New message consumer " + this);
  132      }
  133   
  134      /**
  135       * Get the subscription
  136       * 
  137       * @return the subscription
  138       */
  139      public Subscription getSubscription()
  140      {
  141         return subscription;
  142      }
  143   
  144      /**
  145       * Add a message 
  146       * 
  147       * @param message the message to add
  148       * @throws JMSException for any error
  149       */
  150      public void addMessage(SpyMessage message) throws JMSException
  151      {
  152         if (isClosed())
  153         {
  154            if (trace)
  155               log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID + 
  156                         " The message consumer was closed. " + this);
  157            session.connection.send(message.getAcknowledgementRequest(false));
  158            return;
  159         }
  160   
  161         //Add a message to the queue
  162   
  163         //  Consider removing this test (subscription.accepts). I don't think it
  164         // will ever fail
  165         //  because the test is also done by the server before message is even
  166         // sent.
  167         if (subscription.accepts(message.header))
  168         {
  169            if (sessionConsumer)
  170               sessionConsumerProcessMessage(message);
  171            else
  172            {
  173               synchronized (messages)
  174               {
  175                  if (waitingForMessage)
  176                  {
  177                     if (trace)
  178                        log.trace("Adding message=" + message.header.jmsMessageID + " " + this);
  179                     messages.addLast(message);
  180                     messages.notifyAll();
  181                  }
  182                  else
  183                  {
  184                     //unwanted message (due to consumer receive timing out) Nack
  185                     // it.
  186                     if (trace)
  187                        log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID + 
  188                              " The message consumer was not waiting for a message. " + this);
  189                     session.connection.send(message.getAcknowledgementRequest(false));
  190                  }
  191               }
  192            }
  193         }
  194         else
  195         {
  196            if (trace)
  197               log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID + 
  198                     " The subscription did not accept the message. " + this);
  199            session.connection.send(message.getAcknowledgementRequest(false));
  200         }
  201      }
  202   
  203      /**
  204   	 * Restarts the processing of the messages in case of a recovery
  205   	 */
  206      public void restartProcessing()
  207      {
  208         synchronized (messages)
  209         {
  210            if (trace)
  211               log.trace("Restarting processing " + this);
  212            messages.notifyAll();
  213         }
  214      }
  215   
  216      public void setMessageListener(MessageListener listener) throws JMSException
  217      {
  218         checkClosed();
  219   
  220         synchronized (stateLock)
  221         {
  222            if (receiving)
  223               throw new JMSException("Another thread is already in receive.");
  224   
  225            if (trace)
  226               log.trace("Set message listener=" + listener + " old listener=" + messageListener + " " + this);
  227   
  228            boolean oldListening = listening;
  229            listening = (listener != null);
  230            messageListener = listener;
  231   
  232            if (!sessionConsumer && listening && !oldListening)
  233            {
  234               //Start listener thread (if one is not already running)
  235               if (listenerThread == null)
  236               {
  237                  listenerThread = new Thread(this, "MessageListenerThread - " + subscription.destination.getName());
  238                  listenerThread.start();
  239               }
  240            }
  241         }
  242      }
  243   
  244      public String getMessageSelector() throws JMSException
  245      {
  246         checkClosed();
  247         return subscription.messageSelector;
  248      }
  249   
  250      public MessageListener getMessageListener() throws JMSException
  251      {
  252         checkClosed();
  253         return messageListener;
  254      }
  255   
  256      public Message receive() throws JMSException
  257      {
  258         checkClosed();
  259         synchronized (stateLock)
  260         {
  261            if (receiving)
  262               throw new JMSException("Another thread is already in receive.");
  263            if (listening)
  264               throw new JMSException("A message listener is already registered");
  265            receiving = true;
  266            
  267            if (trace)
  268               log.trace("receive() " + this);
  269         }
  270   
  271         try
  272         {
  273            synchronized (messages)
  274            {
  275               //see if we have any undelivered messages before we go to the JMS
  276               //server to look.
  277               Message message = getMessage();
  278               if (message != null)
  279               {
  280                  if (trace)
  281                     log.trace("receive() message in list " + message.getJMSMessageID() + " " + this);
  282                  return message;
  283               }
  284               
  285               // Loop through expired messages
  286               while (true)
  287               {
  288                  SpyMessage msg = session.connection.receive(subscription, 0);
  289                  if (msg != null)
  290                  {
  291                     Message mes = preProcessMessage(msg);
  292                     if (mes != null)
  293                     {
  294                        if (trace)
  295                           log.trace("receive() message from server " + mes.getJMSMessageID() + " " + this);
  296                        return mes;
  297                     }
  298                  }
  299                  else
  300                     break;
  301               }
  302   
  303               if (trace)
  304                  log.trace("No message in receive(), waiting " + this);
  305               
  306               try
  307               {
  308                  waitingForMessage = true;
  309                  while (true)
  310                  {
  311                     if (isClosed())
  312                     {
  313                        if (trace)
  314                           log.trace("Consumer closed in receive() " + this);
  315                        return null;
  316                     }
  317                     Message mes = getMessage();
  318                     if (mes != null)
  319                     {
  320                        if (trace)
  321                           log.trace("receive() message from list after wait " + this);
  322                        return mes;
  323                     }
  324                     messages.wait();
  325                  }
  326               }
  327               catch (Throwable t)
  328               {
  329                  SpyJMSException.rethrowAsJMSException("Receive interupted", t);
  330                  throw new UnreachableStatementException();
  331               }
  332               finally
  333               {
  334                  waitingForMessage = false;
  335               }
  336            }
  337         }
  338         finally
  339         {
  340            synchronized (stateLock)
  341            {
  342               receiving = false;
  343            }
  344         }
  345      }
  346   
  347      public Message receive(long timeOut) throws JMSException
  348      {
  349         if (timeOut == 0)
  350         {
  351            if (trace)
  352               log.trace("Timeout is zero in receive(long) using receive() " + this);
  353            return receive();
  354         }
  355   
  356         checkClosed();
  357         synchronized (stateLock)
  358         {
  359            if (receiving)
  360               throw new JMSException("Another thread is already in receive.");
  361            if (listening)
  362               throw new JMSException("A message listener is already registered");
  363            receiving = true;
  364            
  365            if (trace)
  366               log.trace("receive(long) " + this);
  367         }
  368   
  369         long endTime = System.currentTimeMillis() + timeOut;
  370         
  371         if (trace)
  372            log.trace("receive(long) endTime=" + endTime + " " + this);
  373         
  374         try
  375         {
  376            synchronized (messages)
  377            {
  378               //see if we have any undelivered messages before we go to the JMS
  379               //server to look.
  380               Message message = getMessage();
  381               if (message != null)
  382               {
  383                  if (trace)
  384                     log.trace("receive(long) message in list " + message.getJMSMessageID() + " " + this);
  385                  return message;
  386               }
  387               // Loop through expired messages
  388               while (true)
  389               {
  390                  SpyMessage msg = session.connection.receive(subscription, timeOut);
  391                  if (msg != null)
  392                  {
  393                     Message mes = preProcessMessage(msg);
  394                     if (mes != null)
  395                     {
  396                        if (trace)
  397                           log.trace("receive(long) message from server " + mes.getJMSMessageID() + " " + this);
  398                        return mes;
  399                     }
  400                  }
  401                  else
  402                     break;
  403               }
  404   
  405               if (trace)
  406                  log.trace("No message in receive(), waiting " + this);
  407               
  408               try
  409               {
  410                  waitingForMessage = true;
  411                  while (true)
  412                  {
  413                     if (isClosed())
  414                     {
  415                        if (trace)
  416                           log.trace("Consumer closed in receive(long) " + this);
  417                        return null;
  418                     }
  419   
  420                     Message mes = getMessage();
  421                     if (mes != null)
  422                     {
  423                        if (trace)
  424                           log.trace("receive(long) message from list after wait " + this);
  425                        return mes;
  426                     }
  427   
  428                     long att = endTime - System.currentTimeMillis();
  429                     if (att <= 0)
  430                     {
  431                        if (trace)
  432                           log.trace("receive(long) timed out endTime=" + endTime + " " + this);
  433                        return null;
  434                     }
  435   
  436                     messages.wait(att);
  437                  }
  438               }
  439               catch (Throwable t)
  440               {
  441                  SpyJMSException.rethrowAsJMSException("Receive interupted", t);
  442                  throw new UnreachableStatementException();
  443               }
  444               finally
  445               {
  446                  waitingForMessage = false;
  447               }
  448            }
  449         }
  450         finally
  451         {
  452            synchronized (stateLock)
  453            {
  454               receiving = false;
  455            }
  456         }
  457      }
  458   
  459      public Message receiveNoWait() throws JMSException
  460      {
  461         checkClosed();
  462         synchronized (stateLock)
  463         {
  464            if (receiving)
  465               throw new JMSException("Another thread is already in receive.");
  466            if (listening)
  467               throw new JMSException("A message listener is already registered");
  468            receiving = true;
  469            
  470            if (trace)
  471               log.trace("receiveNoWait() " + this);
  472         }
  473   
  474         try
  475         {
  476            //see if we have any undelivered messages before we go to the JMS
  477            //server to look.
  478            synchronized (messages)
  479            {
  480               Message mes = getMessage();
  481               if (mes != null)
  482               {
  483                  if (trace)
  484                     log.trace("receiveNoWait() message in list " + mes.getJMSMessageID() + " " + this);
  485                  return mes;
  486               }
  487            }
  488            // Loop through expired messages
  489            while (true)
  490            {
  491               SpyMessage msg = session.connection.receive(subscription, -1);
  492               if (msg != null)
  493               {
  494                  Message mes = preProcessMessage(msg);
  495                  if (mes != null)
  496                  {
  497                     if (trace)
  498                        log.trace("receiveNoWait() message from server " + mes.getJMSMessageID() + " " + this);
  499                     return mes;
  500                  }
  501               }
  502               else
  503               {
  504                  if (trace)
  505                     log.trace("receiveNoWait() no message " + this);
  506                  return null;
  507               }
  508            }
  509         }
  510         finally
  511         {
  512            synchronized (stateLock)
  513            {
  514               receiving = false;
  515            }
  516         }
  517      }
  518   
  519      public void close() throws JMSException
  520      {
  521         synchronized (messages)
  522         {
  523            if (closed.set(true))
  524               return;
  525   
  526            if (trace)      
  527               log.trace("Message consumer closing. " + this);
  528            
  529            while (messages.isEmpty() == false)
  530            {
  531               SpyMessage mes = (SpyMessage) messages.removeFirst();
  532               if (trace)
  533                  log.trace("close() nacking undelivered message mes=" + mes.getJMSMessageID() + " " + this);
  534               try
  535               {
  536                  session.connection.send(mes.getAcknowledgementRequest(false));
  537               }
  538               catch (Exception e)
  539               {
  540                  log.debug("Error nacking message: " + mes.getJMSMessageID(), e);
  541               }
  542            }
  543            messages.notifyAll();
  544         }
  545         
  546         // Notification to break out of delivery lock loop
  547         session.interruptDeliveryLockWaiters();
  548   
  549         if (listenerThread != null && !Thread.currentThread().equals(listenerThread))
  550         {
  551            try
  552            {
  553               if (trace)      
  554                  log.trace("Joining listener thread. " + this);
  555               listenerThread.join();
  556            }
  557            catch (InterruptedException e)
  558            {
  559            }
  560         }
  561   
  562         if (!sessionConsumer)
  563         {
  564            session.removeConsumer(this);
  565         }
  566   
  567         if (trace)      
  568            log.trace("Closed. " + this);
  569      }
  570   
  571      public void run()
  572      {
  573         SpyMessage mes = null;
  574         try
  575         {
  576            outer : while (true)
  577            {
  578               //get Message
  579               while (mes == null)
  580               {
  581                  synchronized (messages)
  582                  {
  583                     if (isClosed())
  584                     {
  585                        waitingForMessage = false;
  586                        if (trace)
  587                           log.trace("Consumer closed in run() " + this);
  588                        break outer;
  589                     }
  590                     if (messages.isEmpty())
  591                        mes = session.connection.receive(subscription, 0);
  592                     if (mes == null)
  593                     {
  594                        waitingForMessage = true;
  595                        if (trace)
  596                           log.trace("waiting in run() " + this);
  597                        while ((messages.isEmpty() && isClosed() == false) || (!session.running))
  598                        {
  599                           try
  600                           {
  601                              messages.wait();
  602                           }
  603                           catch (InterruptedException e)
  604                           {
  605                              log.trace("Ignored interruption waiting for messages");
  606                           }
  607                        }
  608                        if (isClosed())
  609                        {
  610                           waitingForMessage = false;
  611                           if (trace)
  612                              log.trace("Consumer closed while waiting in run() " + this);
  613                           break outer;
  614                        }
  615                        mes = (SpyMessage) messages.removeFirst();
  616                        waitingForMessage = false;
  617                     }
  618                     else
  619                     {
  620                        if (trace)
  621                           log.trace("run() message from server mes=" + mes.getJMSMessageID() + " " + this); 
  622                     }
  623                  }
  624                  mes.session = session;
  625               }
  626   
  627               MessageListener thisListener;
  628               synchronized (stateLock)
  629               {
  630                  if (!isListening())
  631                  {
  632                     //send NACK cause we have closed listener
  633                     if (mes != null)
  634                     {
  635                        if (trace)
  636                           log.trace("run() nacking not listening message mes=" + mes.getJMSMessageID() + " " + this); 
  637                        session.connection.send(mes.getAcknowledgementRequest(false));
  638                     }
  639                     //this thread is about to die, so we will need a new one if
  640                     // a new listener is added
  641                     listenerThread = null;
  642                     mes = null;
  643                     break;
  644                  }
  645                  thisListener = messageListener;
  646               }
  647               Message message = mes;
  648               if (mes instanceof SpyEncapsulatedMessage)
  649                  message = ((SpyEncapsulatedMessage) mes).getMessage();
  650   
  651               // Try to obtain the session delivery lock
  652               // This avoids concurrent delivery to message listeners in the same session as per spec
  653               boolean gotDeliveryLock = false;
  654               while (gotDeliveryLock == false)
  655               {
  656                  gotDeliveryLock = session.tryDeliveryLock();
  657                  // We didn't get the lock, check whether we are closing
  658                  if (gotDeliveryLock == false)
  659                  {
  660                     synchronized (messages)
  661                     {
  662                        if (isClosed())
  663                           break;
  664                     }
  665                  }
  666               }
  667               if (gotDeliveryLock == false)
  668               {
  669                  if (trace)
  670                     log.trace("run() nacking didn't get delivery lock mes=" + mes.getJMSMessageID() + " " + this); 
  671                  session.connection.send(mes.getAcknowledgementRequest(false));
  672               }
  673               else
  674               {
  675                  //Handle runtime exceptions. These are handled as per the spec if
  676                  // you assume
  677                  //the number of times erroneous messages are redelivered in
  678                  // auto_acknowledge mode
  679                  //is 0. :)
  680                  try
  681                  {
  682                     if (session.transacted)
  683                     {
  684                        // REVIEW: for an XASession without a transaction this will ack the message
  685                        // before it has been processed. Plain message listeners
  686                        // are not supported in a j2ee environment, but what if somebody is trying 
  687                        // to be clever?
  688                        if (trace)
  689                           log.trace("run() acknowledging message in tx mes=" + mes.getJMSMessageID() + " " + this); 
  690                        session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), mes);
  691                     }
  692   
  693                     try
  694                     {
  695                        prepareDelivery((SpyMessage) message);
  696                        session.addUnacknowlegedMessage((SpyMessage) message);
  697                        thisListener.onMessage(message);
  698                     }
  699                     catch (Throwable t)
  700                     {
  701                        log.warn("Message listener " + thisListener + " threw a throwable.", t);
  702                     }
  703                  }
  704                  finally
  705                  {
  706                     session.releaseDeliveryLock();
  707                  }
  708   
  709                  if (!session.transacted
  710                        && (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE))
  711                  {
  712                     // Only acknowledge the message if the message wasn't recovered
  713                     boolean recovered;
  714                     synchronized (messages)
  715                     {
  716                        recovered = messages.contains(message);
  717                     }
  718                     if (recovered == false)
  719                        mes.doAcknowledge();
  720                  }
  721                  mes = null;
  722               }
  723            }
  724         }
  725         catch (Throwable t)
  726         {
  727            log.warn("Message consumer closing due to error in listening thread.", t);
  728            try
  729            {
  730               close();
  731            }
  732            catch (Throwable ignore)
  733            {
  734            }
  735            session.asynchFailure("Message consumer closing due to error in listening thread.", t);
  736         }
  737      }
  738   
  739      public String toString()
  740      {
  741         StringBuffer buffer = new StringBuffer(100);
  742         buffer.append("SpyMessageConsumer@").append(System.identityHashCode(this));
  743         buffer.append("[sub=").append(subscription);
  744         if (isClosed())
  745            buffer.append(" CLOSED");
  746         buffer.append(" listening=").append(listening);
  747         buffer.append(" receiving=").append(receiving);
  748         buffer.append(" sessionConsumer=").append(sessionConsumer);
  749         buffer.append(" waitingForMessage=").append(waitingForMessage);
  750         buffer.append(" messages=").append(messages.size());
  751         if (listenerThread != null)
  752            buffer.append(" thread=").append(listenerThread);
  753         if (messageListener != null)
  754            buffer.append(" listener=").append(messageListener);
  755         buffer.append(" session=").append(session);
  756         buffer.append(']');
  757         return buffer.toString();
  758      }
  759   
  760      Message getMessage()
  761      {
  762         synchronized (messages)
  763         {
  764            if (trace)
  765               log.trace("Getting message from list " + this);
  766            while (true)
  767            {
  768               try
  769               {
  770                  if (messages.size() == 0)
  771                     return null;
  772   
  773                  SpyMessage mes = (SpyMessage) messages.removeFirst();
  774   
  775                  Message rc = preProcessMessage(mes);
  776                  // could happen if the message has expired.
  777                  if (rc == null)
  778                     continue;
  779   
  780                  return rc;
  781               }
  782               catch (Throwable t)
  783               {
  784                  log.error("Ignoring error", t);
  785               }
  786            }
  787         }
  788      }
  789   
  790      Message preProcessMessage(SpyMessage message) throws JMSException
  791      {
  792         message.session = session;
  793         session.addUnacknowlegedMessage(message);
  794   
  795         prepareDelivery(message);
  796         
  797         // Should we try to ack before the message is processed?
  798         if (!isListening())
  799         {
  800            if (session.transacted)
  801            {
  802               if (trace)
  803                  log.trace("preprocess() acking message in tx message=" + message.getJMSMessageID() + " " + this);
  804               session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
  805            }
  806            else if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
  807                  || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
  808            {
  809               message.doAcknowledge();
  810            }
  811   
  812            if (message instanceof SpyEncapsulatedMessage)
  813            {
  814               return ((SpyEncapsulatedMessage) message).getMessage();
  815            }
  816            return message;
  817         }
  818         else
  819         {
  820            return message;
  821         }
  822      }
  823   
  824      /**
  825       * Prepare the message for delivery
  826       * 
  827       * @param message the message
  828       * @throws JMSException for any error
  829       */
  830      void prepareDelivery(SpyMessage message) throws JMSException
  831      {
  832         Integer delivery = ONCE;
  833         Integer redelivery = (Integer) message.header.jmsProperties.get(SpyMessage.PROPERTY_REDELIVERY_COUNT);
  834         if (redelivery != null)
  835         {
  836            int value = redelivery.intValue();
  837            if (value != 0)
  838               delivery = new Integer(value + 1);
  839         }
  840         message.header.jmsProperties.put(SpyMessage.PROPERTY_DELIVERY_COUNT, delivery);
  841      }
  842      
  843      protected Destination getDestination() throws JMSException
  844      {
  845         checkClosed();
  846         return subscription.destination;
  847      }
  848   
  849      protected boolean getNoLocal() throws JMSException
  850      {
  851         checkClosed();
  852         return subscription.noLocal;
  853      }
  854   
  855      /**
  856   	 * Are we listening
  857   	 * 
  858   	 * @return true when listening, false otherwise
  859   	 */
  860      protected boolean isListening()
  861      {
  862         synchronized (stateLock)
  863         {
  864            return listening;
  865         }
  866      }
  867   
  868      protected void sessionConsumerProcessMessage(SpyMessage message) throws JMSException
  869      {
  870         message.session = session;
  871         //simply pass on to messageListener (if there is one)
  872         MessageListener thisListener;
  873         synchronized (stateLock)
  874         {
  875            thisListener = messageListener;
  876         }
  877   
  878         // Add the message to XAResource manager before we call onMessages since
  879         // the
  880         // resource may get elisted IN the onMessage method.
  881         // This gives onMessage a chance to roll the message back.
  882         Object anonymousTXID = null;
  883         if (session.transacted)
  884         {
  885            // Only happens with XA transactions
  886            if (session.getCurrentTransactionId() == null)
  887            {
  888               anonymousTXID = session.connection.spyXAResourceManager.startTx();
  889               session.setCurrentTransactionId(anonymousTXID);
  890            }
  891            if (trace)
  892               log.trace("consumer() acking message in tx message=" + message.getJMSMessageID() + " " + this);
  893            session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
  894         }
  895   
  896         if (thisListener != null)
  897         {
  898            Message mes = message;
  899            if (message instanceof SpyEncapsulatedMessage)
  900            {
  901               mes = ((SpyEncapsulatedMessage) message).getMessage();
  902            }
  903            session.addUnacknowlegedMessage((SpyMessage) mes);
  904            if (trace)
  905               log.trace("consumer() before onMessage=" + message.getJMSMessageID() + " " + this);
  906            thisListener.onMessage(mes);
  907            if (trace)
  908               log.trace("consumer() after onMessage=" + message.getJMSMessageID() + " " + this);
  909         }
  910   
  911         if (session.transacted)
  912         {
  913            // If we started an anonymous tx
  914            if (anonymousTXID != null)
  915            {
  916               if (session.getCurrentTransactionId() == anonymousTXID)
  917               {
  918                  // We never got enlisted, so just commit the transaction
  919                  try
  920                  {
  921                     if (trace)
  922                        log.trace("XASession was not enlisted - Committing work using anonymous xid: " + anonymousTXID);
  923                     session.connection.spyXAResourceManager.endTx(anonymousTXID, true);
  924                     session.connection.spyXAResourceManager.commit(anonymousTXID, true);
  925                  }
  926                  catch (Throwable t)
  927                  {
  928                     log.error("Could not commit", t);
  929                  }
  930                  finally
  931                  {
  932                     session.unsetCurrentTransactionId(anonymousTXID);
  933                  }
  934               }
  935            }
  936         }
  937         else
  938         {
  939            // Should we Auto-ack the message since the message has now been
  940            // processesed
  941            if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
  942                  || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
  943            {
  944               message.doAcknowledge();
  945            }
  946         }
  947      }
  948      
  949      /**
  950       * Check whether we are closed
  951       * 
  952       * @return true when closed
  953       */
  954      private boolean isClosed()
  955      {
  956         return closed.get();
  957      }
  958      
  959      /**
  960       * Check whether we are closed
  961       * 
  962       * @throws IllegalStateException when the session is closed
  963       */
  964      private void checkClosed() throws IllegalStateException
  965      {
  966         if (closed.get())
  967            throw new IllegalStateException("The consumer is closed");
  968      }
  969   }

Save This Page
Home » JBoss-5.1.0 » org » jboss » mq » [javadoc | source]