1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq;
23
24 import java.util.LinkedList;
25
26 import javax.jms.Destination;
27 import javax.jms.IllegalStateException;
28 import javax.jms.InvalidSelectorException;
29 import javax.jms.JMSException;
30 import javax.jms.Message;
31 import javax.jms.MessageConsumer;
32 import javax.jms.MessageListener;
33 import javax.jms.Session;
34
35 import org.jboss.logging.Logger;
36 import org.jboss.util.UnreachableStatementException;
37
38 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
39
40 /**
41 * This class implements <tt>javax.jms.MessageConsumer</tt>.
42 *
43 * @author Norbert Lataille (Norbert.Lataille@m4x.org)
44 * @author Hiram Chirino (Cojonudo14@hotmail.com)
45 * @author David Maplesden (David.Maplesden@orion.co.nz)
46 * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
47 * @version $Revision: 74988 $
48 */
49 public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable
50 {
51 /** The log */
52 static Logger log = Logger.getLogger(SpyMessageConsumer.class);
53
54 /** Is trace enabled */
55 static boolean trace = log.isTraceEnabled();
56
57 /** Delivered once */
58 static final Integer ONCE = new Integer(1);
59
60 /** Link to my session */
61 public SpySession session;
62 /** The subscription structure should be fill out by the descendent */
63 public Subscription subscription = new Subscription();
64 /** Are we closed ? */
65 private SynchronizedBoolean closed = new SynchronizedBoolean(false);
66 /** The state lock */
67 protected Object stateLock = new Object();
68 /** Are we receiving a message */
69 protected boolean receiving = false;
70 /** Are we waiting for a message */
71 protected boolean waitingForMessage = false;
72 /** Are we listening */
73 protected boolean listening = false;
74 /** The listener thread */
75 protected Thread listenerThread = null;
76 /** My message listener (null if none) */
77 MessageListener messageListener;
78 /** List of Pending messages (not yet delivered) */
79 LinkedList messages;
80 /** Is this a session consumer? */
81 boolean sessionConsumer;
82
83 /**
84 * Create a new SpyMessageConsumer
85 *
86 * @param s the session
87 * @param sessionConsumer true for a session consumer, false otherwise
88 */
89 SpyMessageConsumer(SpySession s, boolean sessionConsumer)
90 {
91 trace = log.isTraceEnabled();
92
93 session = s;
94 this.sessionConsumer = sessionConsumer;
95 messageListener = null;
96 messages = new LinkedList();
97
98 if (trace)
99 log.trace("New message consumer " + this);
100 }
101
102 /**
103 * Create a new SpyMessageConsumer
104 *
105 * @param s the session
106 * @param sessionConsumer true for a session consumer, false otherwise
107 * @param destination the destination
108 * @param selector the selector
109 * @param noLocal true for noLocal, false otherwise
110 */
111 SpyMessageConsumer(SpySession s, boolean sessionConsumer, SpyDestination destination, String selector, boolean noLocal) throws InvalidSelectorException
112 {
113 trace = log.isTraceEnabled();
114
115 session = s;
116 this.sessionConsumer = sessionConsumer;
117 subscription.destination = destination;
118 subscription.messageSelector = selector;
119 subscription.noLocal = noLocal;
120
121 // If the selector is set, try to build it, throws an
122 // InvalidSelectorException
123 // if it is not valid.
124 if (subscription.messageSelector != null)
125 subscription.getSelector();
126
127 messageListener = null;
128 messages = new LinkedList();
129
130 if (trace)
131 log.trace("New message consumer " + this);
132 }
133
134 /**
135 * Get the subscription
136 *
137 * @return the subscription
138 */
139 public Subscription getSubscription()
140 {
141 return subscription;
142 }
143
144 /**
145 * Add a message
146 *
147 * @param message the message to add
148 * @throws JMSException for any error
149 */
150 public void addMessage(SpyMessage message) throws JMSException
151 {
152 if (isClosed())
153 {
154 if (trace)
155 log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
156 " The message consumer was closed. " + this);
157 session.connection.send(message.getAcknowledgementRequest(false));
158 return;
159 }
160
161 //Add a message to the queue
162
163 // Consider removing this test (subscription.accepts). I don't think it
164 // will ever fail
165 // because the test is also done by the server before message is even
166 // sent.
167 if (subscription.accepts(message.header))
168 {
169 if (sessionConsumer)
170 sessionConsumerProcessMessage(message);
171 else
172 {
173 synchronized (messages)
174 {
175 if (waitingForMessage)
176 {
177 if (trace)
178 log.trace("Adding message=" + message.header.jmsMessageID + " " + this);
179 messages.addLast(message);
180 messages.notifyAll();
181 }
182 else
183 {
184 //unwanted message (due to consumer receive timing out) Nack
185 // it.
186 if (trace)
187 log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
188 " The message consumer was not waiting for a message. " + this);
189 session.connection.send(message.getAcknowledgementRequest(false));
190 }
191 }
192 }
193 }
194 else
195 {
196 if (trace)
197 log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
198 " The subscription did not accept the message. " + this);
199 session.connection.send(message.getAcknowledgementRequest(false));
200 }
201 }
202
203 /**
204 * Restarts the processing of the messages in case of a recovery
205 */
206 public void restartProcessing()
207 {
208 synchronized (messages)
209 {
210 if (trace)
211 log.trace("Restarting processing " + this);
212 messages.notifyAll();
213 }
214 }
215
216 public void setMessageListener(MessageListener listener) throws JMSException
217 {
218 checkClosed();
219
220 synchronized (stateLock)
221 {
222 if (receiving)
223 throw new JMSException("Another thread is already in receive.");
224
225 if (trace)
226 log.trace("Set message listener=" + listener + " old listener=" + messageListener + " " + this);
227
228 boolean oldListening = listening;
229 listening = (listener != null);
230 messageListener = listener;
231
232 if (!sessionConsumer && listening && !oldListening)
233 {
234 //Start listener thread (if one is not already running)
235 if (listenerThread == null)
236 {
237 listenerThread = new Thread(this, "MessageListenerThread - " + subscription.destination.getName());
238 listenerThread.start();
239 }
240 }
241 }
242 }
243
244 public String getMessageSelector() throws JMSException
245 {
246 checkClosed();
247 return subscription.messageSelector;
248 }
249
250 public MessageListener getMessageListener() throws JMSException
251 {
252 checkClosed();
253 return messageListener;
254 }
255
256 public Message receive() throws JMSException
257 {
258 checkClosed();
259 synchronized (stateLock)
260 {
261 if (receiving)
262 throw new JMSException("Another thread is already in receive.");
263 if (listening)
264 throw new JMSException("A message listener is already registered");
265 receiving = true;
266
267 if (trace)
268 log.trace("receive() " + this);
269 }
270
271 try
272 {
273 synchronized (messages)
274 {
275 //see if we have any undelivered messages before we go to the JMS
276 //server to look.
277 Message message = getMessage();
278 if (message != null)
279 {
280 if (trace)
281 log.trace("receive() message in list " + message.getJMSMessageID() + " " + this);
282 return message;
283 }
284
285 // Loop through expired messages
286 while (true)
287 {
288 SpyMessage msg = session.connection.receive(subscription, 0);
289 if (msg != null)
290 {
291 Message mes = preProcessMessage(msg);
292 if (mes != null)
293 {
294 if (trace)
295 log.trace("receive() message from server " + mes.getJMSMessageID() + " " + this);
296 return mes;
297 }
298 }
299 else
300 break;
301 }
302
303 if (trace)
304 log.trace("No message in receive(), waiting " + this);
305
306 try
307 {
308 waitingForMessage = true;
309 while (true)
310 {
311 if (isClosed())
312 {
313 if (trace)
314 log.trace("Consumer closed in receive() " + this);
315 return null;
316 }
317 Message mes = getMessage();
318 if (mes != null)
319 {
320 if (trace)
321 log.trace("receive() message from list after wait " + this);
322 return mes;
323 }
324 messages.wait();
325 }
326 }
327 catch (Throwable t)
328 {
329 SpyJMSException.rethrowAsJMSException("Receive interupted", t);
330 throw new UnreachableStatementException();
331 }
332 finally
333 {
334 waitingForMessage = false;
335 }
336 }
337 }
338 finally
339 {
340 synchronized (stateLock)
341 {
342 receiving = false;
343 }
344 }
345 }
346
347 public Message receive(long timeOut) throws JMSException
348 {
349 if (timeOut == 0)
350 {
351 if (trace)
352 log.trace("Timeout is zero in receive(long) using receive() " + this);
353 return receive();
354 }
355
356 checkClosed();
357 synchronized (stateLock)
358 {
359 if (receiving)
360 throw new JMSException("Another thread is already in receive.");
361 if (listening)
362 throw new JMSException("A message listener is already registered");
363 receiving = true;
364
365 if (trace)
366 log.trace("receive(long) " + this);
367 }
368
369 long endTime = System.currentTimeMillis() + timeOut;
370
371 if (trace)
372 log.trace("receive(long) endTime=" + endTime + " " + this);
373
374 try
375 {
376 synchronized (messages)
377 {
378 //see if we have any undelivered messages before we go to the JMS
379 //server to look.
380 Message message = getMessage();
381 if (message != null)
382 {
383 if (trace)
384 log.trace("receive(long) message in list " + message.getJMSMessageID() + " " + this);
385 return message;
386 }
387 // Loop through expired messages
388 while (true)
389 {
390 SpyMessage msg = session.connection.receive(subscription, timeOut);
391 if (msg != null)
392 {
393 Message mes = preProcessMessage(msg);
394 if (mes != null)
395 {
396 if (trace)
397 log.trace("receive(long) message from server " + mes.getJMSMessageID() + " " + this);
398 return mes;
399 }
400 }
401 else
402 break;
403 }
404
405 if (trace)
406 log.trace("No message in receive(), waiting " + this);
407
408 try
409 {
410 waitingForMessage = true;
411 while (true)
412 {
413 if (isClosed())
414 {
415 if (trace)
416 log.trace("Consumer closed in receive(long) " + this);
417 return null;
418 }
419
420 Message mes = getMessage();
421 if (mes != null)
422 {
423 if (trace)
424 log.trace("receive(long) message from list after wait " + this);
425 return mes;
426 }
427
428 long att = endTime - System.currentTimeMillis();
429 if (att <= 0)
430 {
431 if (trace)
432 log.trace("receive(long) timed out endTime=" + endTime + " " + this);
433 return null;
434 }
435
436 messages.wait(att);
437 }
438 }
439 catch (Throwable t)
440 {
441 SpyJMSException.rethrowAsJMSException("Receive interupted", t);
442 throw new UnreachableStatementException();
443 }
444 finally
445 {
446 waitingForMessage = false;
447 }
448 }
449 }
450 finally
451 {
452 synchronized (stateLock)
453 {
454 receiving = false;
455 }
456 }
457 }
458
459 public Message receiveNoWait() throws JMSException
460 {
461 checkClosed();
462 synchronized (stateLock)
463 {
464 if (receiving)
465 throw new JMSException("Another thread is already in receive.");
466 if (listening)
467 throw new JMSException("A message listener is already registered");
468 receiving = true;
469
470 if (trace)
471 log.trace("receiveNoWait() " + this);
472 }
473
474 try
475 {
476 //see if we have any undelivered messages before we go to the JMS
477 //server to look.
478 synchronized (messages)
479 {
480 Message mes = getMessage();
481 if (mes != null)
482 {
483 if (trace)
484 log.trace("receiveNoWait() message in list " + mes.getJMSMessageID() + " " + this);
485 return mes;
486 }
487 }
488 // Loop through expired messages
489 while (true)
490 {
491 SpyMessage msg = session.connection.receive(subscription, -1);
492 if (msg != null)
493 {
494 Message mes = preProcessMessage(msg);
495 if (mes != null)
496 {
497 if (trace)
498 log.trace("receiveNoWait() message from server " + mes.getJMSMessageID() + " " + this);
499 return mes;
500 }
501 }
502 else
503 {
504 if (trace)
505 log.trace("receiveNoWait() no message " + this);
506 return null;
507 }
508 }
509 }
510 finally
511 {
512 synchronized (stateLock)
513 {
514 receiving = false;
515 }
516 }
517 }
518
519 public void close() throws JMSException
520 {
521 synchronized (messages)
522 {
523 if (closed.set(true))
524 return;
525
526 if (trace)
527 log.trace("Message consumer closing. " + this);
528
529 while (messages.isEmpty() == false)
530 {
531 SpyMessage mes = (SpyMessage) messages.removeFirst();
532 if (trace)
533 log.trace("close() nacking undelivered message mes=" + mes.getJMSMessageID() + " " + this);
534 try
535 {
536 session.connection.send(mes.getAcknowledgementRequest(false));
537 }
538 catch (Exception e)
539 {
540 log.debug("Error nacking message: " + mes.getJMSMessageID(), e);
541 }
542 }
543 messages.notifyAll();
544 }
545
546 // Notification to break out of delivery lock loop
547 session.interruptDeliveryLockWaiters();
548
549 if (listenerThread != null && !Thread.currentThread().equals(listenerThread))
550 {
551 try
552 {
553 if (trace)
554 log.trace("Joining listener thread. " + this);
555 listenerThread.join();
556 }
557 catch (InterruptedException e)
558 {
559 }
560 }
561
562 if (!sessionConsumer)
563 {
564 session.removeConsumer(this);
565 }
566
567 if (trace)
568 log.trace("Closed. " + this);
569 }
570
571 public void run()
572 {
573 SpyMessage mes = null;
574 try
575 {
576 outer : while (true)
577 {
578 //get Message
579 while (mes == null)
580 {
581 synchronized (messages)
582 {
583 if (isClosed())
584 {
585 waitingForMessage = false;
586 if (trace)
587 log.trace("Consumer closed in run() " + this);
588 break outer;
589 }
590 if (messages.isEmpty())
591 mes = session.connection.receive(subscription, 0);
592 if (mes == null)
593 {
594 waitingForMessage = true;
595 if (trace)
596 log.trace("waiting in run() " + this);
597 while ((messages.isEmpty() && isClosed() == false) || (!session.running))
598 {
599 try
600 {
601 messages.wait();
602 }
603 catch (InterruptedException e)
604 {
605 log.trace("Ignored interruption waiting for messages");
606 }
607 }
608 if (isClosed())
609 {
610 waitingForMessage = false;
611 if (trace)
612 log.trace("Consumer closed while waiting in run() " + this);
613 break outer;
614 }
615 mes = (SpyMessage) messages.removeFirst();
616 waitingForMessage = false;
617 }
618 else
619 {
620 if (trace)
621 log.trace("run() message from server mes=" + mes.getJMSMessageID() + " " + this);
622 }
623 }
624 mes.session = session;
625 }
626
627 MessageListener thisListener;
628 synchronized (stateLock)
629 {
630 if (!isListening())
631 {
632 //send NACK cause we have closed listener
633 if (mes != null)
634 {
635 if (trace)
636 log.trace("run() nacking not listening message mes=" + mes.getJMSMessageID() + " " + this);
637 session.connection.send(mes.getAcknowledgementRequest(false));
638 }
639 //this thread is about to die, so we will need a new one if
640 // a new listener is added
641 listenerThread = null;
642 mes = null;
643 break;
644 }
645 thisListener = messageListener;
646 }
647 Message message = mes;
648 if (mes instanceof SpyEncapsulatedMessage)
649 message = ((SpyEncapsulatedMessage) mes).getMessage();
650
651 // Try to obtain the session delivery lock
652 // This avoids concurrent delivery to message listeners in the same session as per spec
653 boolean gotDeliveryLock = false;
654 while (gotDeliveryLock == false)
655 {
656 gotDeliveryLock = session.tryDeliveryLock();
657 // We didn't get the lock, check whether we are closing
658 if (gotDeliveryLock == false)
659 {
660 synchronized (messages)
661 {
662 if (isClosed())
663 break;
664 }
665 }
666 }
667 if (gotDeliveryLock == false)
668 {
669 if (trace)
670 log.trace("run() nacking didn't get delivery lock mes=" + mes.getJMSMessageID() + " " + this);
671 session.connection.send(mes.getAcknowledgementRequest(false));
672 }
673 else
674 {
675 //Handle runtime exceptions. These are handled as per the spec if
676 // you assume
677 //the number of times erroneous messages are redelivered in
678 // auto_acknowledge mode
679 //is 0. :)
680 try
681 {
682 if (session.transacted)
683 {
684 // REVIEW: for an XASession without a transaction this will ack the message
685 // before it has been processed. Plain message listeners
686 // are not supported in a j2ee environment, but what if somebody is trying
687 // to be clever?
688 if (trace)
689 log.trace("run() acknowledging message in tx mes=" + mes.getJMSMessageID() + " " + this);
690 session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), mes);
691 }
692
693 try
694 {
695 prepareDelivery((SpyMessage) message);
696 session.addUnacknowlegedMessage((SpyMessage) message);
697 thisListener.onMessage(message);
698 }
699 catch (Throwable t)
700 {
701 log.warn("Message listener " + thisListener + " threw a throwable.", t);
702 }
703 }
704 finally
705 {
706 session.releaseDeliveryLock();
707 }
708
709 if (!session.transacted
710 && (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE))
711 {
712 // Only acknowledge the message if the message wasn't recovered
713 boolean recovered;
714 synchronized (messages)
715 {
716 recovered = messages.contains(message);
717 }
718 if (recovered == false)
719 mes.doAcknowledge();
720 }
721 mes = null;
722 }
723 }
724 }
725 catch (Throwable t)
726 {
727 log.warn("Message consumer closing due to error in listening thread.", t);
728 try
729 {
730 close();
731 }
732 catch (Throwable ignore)
733 {
734 }
735 session.asynchFailure("Message consumer closing due to error in listening thread.", t);
736 }
737 }
738
739 public String toString()
740 {
741 StringBuffer buffer = new StringBuffer(100);
742 buffer.append("SpyMessageConsumer@").append(System.identityHashCode(this));
743 buffer.append("[sub=").append(subscription);
744 if (isClosed())
745 buffer.append(" CLOSED");
746 buffer.append(" listening=").append(listening);
747 buffer.append(" receiving=").append(receiving);
748 buffer.append(" sessionConsumer=").append(sessionConsumer);
749 buffer.append(" waitingForMessage=").append(waitingForMessage);
750 buffer.append(" messages=").append(messages.size());
751 if (listenerThread != null)
752 buffer.append(" thread=").append(listenerThread);
753 if (messageListener != null)
754 buffer.append(" listener=").append(messageListener);
755 buffer.append(" session=").append(session);
756 buffer.append(']');
757 return buffer.toString();
758 }
759
760 Message getMessage()
761 {
762 synchronized (messages)
763 {
764 if (trace)
765 log.trace("Getting message from list " + this);
766 while (true)
767 {
768 try
769 {
770 if (messages.size() == 0)
771 return null;
772
773 SpyMessage mes = (SpyMessage) messages.removeFirst();
774
775 Message rc = preProcessMessage(mes);
776 // could happen if the message has expired.
777 if (rc == null)
778 continue;
779
780 return rc;
781 }
782 catch (Throwable t)
783 {
784 log.error("Ignoring error", t);
785 }
786 }
787 }
788 }
789
790 Message preProcessMessage(SpyMessage message) throws JMSException
791 {
792 message.session = session;
793 session.addUnacknowlegedMessage(message);
794
795 prepareDelivery(message);
796
797 // Should we try to ack before the message is processed?
798 if (!isListening())
799 {
800 if (session.transacted)
801 {
802 if (trace)
803 log.trace("preprocess() acking message in tx message=" + message.getJMSMessageID() + " " + this);
804 session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
805 }
806 else if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
807 || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
808 {
809 message.doAcknowledge();
810 }
811
812 if (message instanceof SpyEncapsulatedMessage)
813 {
814 return ((SpyEncapsulatedMessage) message).getMessage();
815 }
816 return message;
817 }
818 else
819 {
820 return message;
821 }
822 }
823
824 /**
825 * Prepare the message for delivery
826 *
827 * @param message the message
828 * @throws JMSException for any error
829 */
830 void prepareDelivery(SpyMessage message) throws JMSException
831 {
832 Integer delivery = ONCE;
833 Integer redelivery = (Integer) message.header.jmsProperties.get(SpyMessage.PROPERTY_REDELIVERY_COUNT);
834 if (redelivery != null)
835 {
836 int value = redelivery.intValue();
837 if (value != 0)
838 delivery = new Integer(value + 1);
839 }
840 message.header.jmsProperties.put(SpyMessage.PROPERTY_DELIVERY_COUNT, delivery);
841 }
842
843 protected Destination getDestination() throws JMSException
844 {
845 checkClosed();
846 return subscription.destination;
847 }
848
849 protected boolean getNoLocal() throws JMSException
850 {
851 checkClosed();
852 return subscription.noLocal;
853 }
854
855 /**
856 * Are we listening
857 *
858 * @return true when listening, false otherwise
859 */
860 protected boolean isListening()
861 {
862 synchronized (stateLock)
863 {
864 return listening;
865 }
866 }
867
868 protected void sessionConsumerProcessMessage(SpyMessage message) throws JMSException
869 {
870 message.session = session;
871 //simply pass on to messageListener (if there is one)
872 MessageListener thisListener;
873 synchronized (stateLock)
874 {
875 thisListener = messageListener;
876 }
877
878 // Add the message to XAResource manager before we call onMessages since
879 // the
880 // resource may get elisted IN the onMessage method.
881 // This gives onMessage a chance to roll the message back.
882 Object anonymousTXID = null;
883 if (session.transacted)
884 {
885 // Only happens with XA transactions
886 if (session.getCurrentTransactionId() == null)
887 {
888 anonymousTXID = session.connection.spyXAResourceManager.startTx();
889 session.setCurrentTransactionId(anonymousTXID);
890 }
891 if (trace)
892 log.trace("consumer() acking message in tx message=" + message.getJMSMessageID() + " " + this);
893 session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
894 }
895
896 if (thisListener != null)
897 {
898 Message mes = message;
899 if (message instanceof SpyEncapsulatedMessage)
900 {
901 mes = ((SpyEncapsulatedMessage) message).getMessage();
902 }
903 session.addUnacknowlegedMessage((SpyMessage) mes);
904 if (trace)
905 log.trace("consumer() before onMessage=" + message.getJMSMessageID() + " " + this);
906 thisListener.onMessage(mes);
907 if (trace)
908 log.trace("consumer() after onMessage=" + message.getJMSMessageID() + " " + this);
909 }
910
911 if (session.transacted)
912 {
913 // If we started an anonymous tx
914 if (anonymousTXID != null)
915 {
916 if (session.getCurrentTransactionId() == anonymousTXID)
917 {
918 // We never got enlisted, so just commit the transaction
919 try
920 {
921 if (trace)
922 log.trace("XASession was not enlisted - Committing work using anonymous xid: " + anonymousTXID);
923 session.connection.spyXAResourceManager.endTx(anonymousTXID, true);
924 session.connection.spyXAResourceManager.commit(anonymousTXID, true);
925 }
926 catch (Throwable t)
927 {
928 log.error("Could not commit", t);
929 }
930 finally
931 {
932 session.unsetCurrentTransactionId(anonymousTXID);
933 }
934 }
935 }
936 }
937 else
938 {
939 // Should we Auto-ack the message since the message has now been
940 // processesed
941 if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
942 || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
943 {
944 message.doAcknowledge();
945 }
946 }
947 }
948
949 /**
950 * Check whether we are closed
951 *
952 * @return true when closed
953 */
954 private boolean isClosed()
955 {
956 return closed.get();
957 }
958
959 /**
960 * Check whether we are closed
961 *
962 * @throws IllegalStateException when the session is closed
963 */
964 private void checkClosed() throws IllegalStateException
965 {
966 if (closed.get())
967 throw new IllegalStateException("The consumer is closed");
968 }
969 }