Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ActiveMQSession.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.activemq;
20  import java.io.IOException;
21  import java.io.Serializable;
22  import java.util.Iterator;
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.ListIterator;
26  
27  import javax.jms.BytesMessage;
28  import javax.jms.DeliveryMode;
29  import javax.jms.Destination;
30  import javax.jms.IllegalStateException;
31  import javax.jms.InvalidDestinationException;
32  import javax.jms.InvalidSelectorException;
33  import javax.jms.JMSException;
34  import javax.jms.MapMessage;
35  import javax.jms.Message;
36  import javax.jms.MessageConsumer;
37  import javax.jms.MessageListener;
38  import javax.jms.MessageProducer;
39  import javax.jms.ObjectMessage;
40  import javax.jms.Queue;
41  import javax.jms.QueueBrowser;
42  import javax.jms.QueueReceiver;
43  import javax.jms.QueueSender;
44  import javax.jms.QueueSession;
45  import javax.jms.Session;
46  import javax.jms.StreamMessage;
47  import javax.jms.TemporaryQueue;
48  import javax.jms.TemporaryTopic;
49  import javax.jms.TextMessage;
50  import javax.jms.Topic;
51  import javax.jms.TopicPublisher;
52  import javax.jms.TopicSession;
53  import javax.jms.TopicSubscriber;
54  import javax.jms.TransactionRolledBackException;
55  
56  import org.activemq.io.util.ByteArray;
57  import org.activemq.io.util.ByteArrayCompression;
58  import org.activemq.io.util.ByteArrayFragmentation;
59  import org.activemq.management.JMSSessionStatsImpl;
60  import org.activemq.management.StatsCapable;
61  import org.activemq.management.StatsImpl;
62  import org.activemq.message.ActiveMQBytesMessage;
63  import org.activemq.message.ActiveMQDestination;
64  import org.activemq.message.ActiveMQMapMessage;
65  import org.activemq.message.ActiveMQMessage;
66  import org.activemq.message.ActiveMQObjectMessage;
67  import org.activemq.message.ActiveMQQueue;
68  import org.activemq.message.ActiveMQStreamMessage;
69  import org.activemq.message.ActiveMQTemporaryQueue;
70  import org.activemq.message.ActiveMQTemporaryTopic;
71  import org.activemq.message.ActiveMQTextMessage;
72  import org.activemq.message.ActiveMQTopic;
73  import org.activemq.message.ConsumerInfo;
74  import org.activemq.message.DurableUnsubscribe;
75  import org.activemq.message.MessageAck;
76  import org.activemq.message.MessageAcknowledge;
77  import org.activemq.message.ProducerInfo;
78  import org.activemq.service.impl.DefaultQueueList;
79  import org.activemq.util.IdGenerator;
80  import org.apache.commons.logging.Log;
81  import org.apache.commons.logging.LogFactory;
82  
83  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
84  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
85  
86  /**
87   * <P>
88   * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
89   * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
90   * <P>
91   * A session serves several purposes:
92   * <UL>
93   * <LI>It is a factory for its message producers and consumers.
94   * <LI>It supplies provider-optimized message factories.
95   * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
96   * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
97   * dynamically manipulate provider-specific destination names.
98   * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
99   * units.
100  * <LI>It defines a serial order for the messages it consumes and the messages it produces.
101  * <LI>It retains messages it consumes until they have been acknowledged.
102  * <LI>It serializes execution of message listeners registered with its message consumers.
103  * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
104  * </UL>
105  * <P>
106  * A session can create and service multiple message producers and consumers.
107  * <P>
108  * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
109  * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
110  * <P>
111  * If a client desires to have one thread produce messages while others consume them, the client should use a separate
112  * session for its producing thread.
113  * <P>
114  * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
115  * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
116  * constituent objects from another thread of control. The only exception to this rule is the use of the session or
117  * connection <CODE>close</CODE> method.
118  * <P>
119  * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
120  * start simply and incrementally add message processing complexity as their need for concurrency grows.
121  * <P>
122  * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
123  * being executed in another thread.
124  * <P>
125  * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
126  * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
127  * transactions organize a session's input message stream and output message stream into series of atomic units. When a
128  * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
129  * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
130  * recovered.
131  * <P>
132  * The content of a transaction's input and output units is simply those messages that have been produced and consumed
133  * within the session's current transaction.
134  * <P>
135  * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
136  * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a
137  * transacted session always has a current transaction within which its work is done.
138  * <P>
139  * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
140  * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
141  * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
142  * methods in this context is prohibited.
143  * <P>
144  * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
145  * <P>
146  * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
147  * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
148  * JMS API into their application server products.
149  * 
150  * @version $Revision: 1.1.1.1 $
151  * @see javax.jms.Session
152  * @see javax.jms.QueueSession
153  * @see javax.jms.TopicSession
154  * @see javax.jms.XASession
155  */
156 public class ActiveMQSession
157         implements
158             Session,
159             QueueSession,
160             TopicSession,
161             ActiveMQMessageDispatcher,
162             MessageAcknowledge,
163             StatsCapable {
164     
165     public static interface DeliveryListener {
166         public void beforeDelivery(ActiveMQSession session, Message msg);
167         public void afterDelivery(ActiveMQSession session, Message msg);
168     }
169     
170     protected static final int CONSUMER_DISPATCH_UNSET = 1;
171     protected static final int CONSUMER_DISPATCH_ASYNC = 2;
172     protected static final int CONSUMER_DISPATCH_SYNC = 3;
173     private static final Log log = LogFactory.getLog(ActiveMQSession.class);
174     protected ActiveMQConnection connection;
175     protected int acknowledgeMode;
176     protected CopyOnWriteArrayList consumers;
177     protected CopyOnWriteArrayList producers;
178     private IdGenerator temporaryDestinationGenerator;
179     private MessageListener messageListener;
180     protected boolean closed;
181     private SynchronizedBoolean started;
182     private short sessionId;
183     private long startTime;
184     private DefaultQueueList deliveredMessages;
185     private ActiveMQSessionExecutor messageExecutor;
186     private JMSSessionStatsImpl stats;
187     private int consumerDispatchState;
188     private ByteArrayCompression compression;
189     private TransactionContext transactionContext;
190     private boolean internalSession;
191     private DeliveryListener deliveryListener;
192     
193     /**
194      * Construct the Session
195      * 
196      * @param theConnection
197      * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
198      * @throws JMSException on internal error
199      */
200     protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
201         this(theConnection, theAcknowledgeMode,theConnection.isOptimizedMessageDispatch());
202     }
203 
204     /**
205      * Construct the Session
206      * 
207      * @param theConnection
208      * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
209      * @param optimizedDispatch
210      * @throws JMSException on internal error
211      */
212     protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode,boolean optimizedDispatch) throws JMSException {
213         this.connection = theConnection;
214         this.acknowledgeMode = theAcknowledgeMode;
215         setTransactionContext(new TransactionContext(theConnection));
216         this.consumers = new CopyOnWriteArrayList();
217         this.producers = new CopyOnWriteArrayList();
218         this.temporaryDestinationGenerator = new IdGenerator();
219         this.started = new SynchronizedBoolean(false);
220         this.sessionId = connection.generateSessionId();
221         this.startTime = System.currentTimeMillis();
222         this.deliveredMessages = new DefaultQueueList();
223         this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue("Session("
224                 + sessionId + ")"));
225         this.messageExecutor.setOptimizedMessageDispatch(optimizedDispatch);
226         connection.addSession(this);
227         stats = new JMSSessionStatsImpl(producers, consumers);
228         this.consumerDispatchState = CONSUMER_DISPATCH_UNSET;
229         this.compression = new ByteArrayCompression();
230         this.compression.setCompressionLevel(theConnection.getMessageCompressionLevel());
231         this.compression.setCompressionStrategy(theConnection.getMessageCompressionStrategy());
232         this.compression.setCompressionLimit(theConnection.getMessageCompressionLimit());
233         
234         this.internalSession = theConnection.isInternalConnection();
235     }
236 
237     public void setTransactionContext(TransactionContext transactionContext) {
238         if( this.transactionContext!=null ) {
239             this.transactionContext.removeSession(this);
240         }        
241         this.transactionContext = transactionContext;
242         this.transactionContext.addSession(this);
243     }
244     
245     public TransactionContext getTransactionContext() {
246         return transactionContext;
247     }
248 
249     public StatsImpl getStats() {
250         return stats;
251     }
252 
253     public JMSSessionStatsImpl getSessionStats() {
254         return stats;
255     }
256 
257     /**
258      * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
259      * containing a stream of uninterpreted bytes.
260      * 
261      * @return the an ActiveMQBytesMessage
262      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
263      */
264     public BytesMessage createBytesMessage() throws JMSException {
265         checkClosed();
266         return new ActiveMQBytesMessage();
267     }
268 
269     /**
270      * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
271      * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
272      * Java programming language.
273      * 
274      * @return an ActiveMQMapMessage
275      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
276      */
277     public MapMessage createMapMessage() throws JMSException {
278         checkClosed();
279         return new ActiveMQMapMessage();
280     }
281 
282     /**
283      * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
284      * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
285      * a message containing only header information is sufficient.
286      * 
287      * @return an ActiveMQMessage
288      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
289      */
290     public Message createMessage() throws JMSException {
291         checkClosed();
292         return new ActiveMQMessage();
293     }
294 
295     /**
296      * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
297      * that contains a serializable Java object.
298      * 
299      * @return an ActiveMQObjectMessage
300      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
301      */
302     public ObjectMessage createObjectMessage() throws JMSException {
303         checkClosed();
304         return new ActiveMQObjectMessage();
305     }
306 
307     /**
308      * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
309      * send a message that contains a serializable Java object.
310      * 
311      * @param object the object to use to initialize this message
312      * @return an ActiveMQObjectMessage
313      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
314      */
315     public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
316         checkClosed();
317         ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
318         msg.setObject(object);
319         return msg;
320     }
321 
322     /**
323      * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
324      * self-defining stream of primitive values in the Java programming language.
325      * 
326      * @return an ActiveMQStreamMessage
327      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
328      */
329     public StreamMessage createStreamMessage() throws JMSException {
330         checkClosed();
331         return new ActiveMQStreamMessage();
332     }
333 
334     /**
335      * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
336      * containing a <CODE>String</CODE> object.
337      * 
338      * @return an ActiveMQTextMessage
339      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
340      */
341     public TextMessage createTextMessage() throws JMSException {
342         checkClosed();
343         return new ActiveMQTextMessage();
344     }
345 
346     /**
347      * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
348      * message containing a <CODE>String</CODE>.
349      * 
350      * @param text the string used to initialize this message
351      * @return an ActiveMQTextMessage
352      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
353      */
354     public TextMessage createTextMessage(String text) throws JMSException {
355         checkClosed();
356         ActiveMQTextMessage msg = new ActiveMQTextMessage();
357         msg.setText(text);
358         return msg;
359     }
360 
361     /**
362      * Indicates whether the session is in transacted mode.
363      * 
364      * @return true if the session is in transacted mode
365      * @throws JMSException if there is some internal error.
366      */
367     public boolean getTransacted() throws JMSException {
368         checkClosed();
369         return this.acknowledgeMode == Session.SESSION_TRANSACTED || transactionContext.isInXATransaction();
370     }
371 
372     /**
373      * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
374      * created. If the session is transacted, the acknowledgement mode is ignored.
375      * 
376      * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
377      * session is transacted, returns SESSION_TRANSACTED.
378      * @throws JMSException
379      * @see javax.jms.Connection#createSession(boolean,int)
380      * @since 1.1 exception JMSException if there is some internal error.
381      */
382     public int getAcknowledgeMode() throws JMSException {
383         checkClosed();
384         return this.acknowledgeMode;
385     }
386 
387     /**
388      * Commits all messages done in this transaction and releases any locks currently held.
389      * 
390      * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
391      * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
392      * commit.
393      * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
394      */
395     public void commit() throws JMSException {
396         checkClosed();
397         if (!getTransacted()) {
398             throw new javax.jms.IllegalStateException("Not a transacted session");
399         }
400         transactionContext.commit();
401     }
402 
403     /**
404      * Rolls back any messages done in this transaction and releases any locks currently held.
405      * 
406      * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
407      * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
408      */
409     public void rollback() throws JMSException {
410         checkClosed();
411         if (!getTransacted()) {
412             throw new javax.jms.IllegalStateException("Not a transacted session");
413         }
414         transactionContext.rollback();
415     }
416 
417     public void clearDeliveredMessages() {
418         deliveredMessages.clear();        
419     }
420     
421     /**
422      * Closes the session.
423      * <P>
424      * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
425      * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
426      * be timely enough.
427      * <P>
428      * There is no need to close the producers and consumers of a closed session.
429      * <P>
430      * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
431      * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
432      * <P>
433      * Closing a transacted session must roll back the transaction in progress.
434      * <P>
435      * This method is the only <CODE>Session</CODE> method that can be called concurrently.
436      * <P>
437      * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
438      * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
439      * 
440      * @throws JMSException if the JMS provider fails to close the session due to some internal error.
441      */
442     public void close() throws JMSException {
443         if (!this.closed) {
444             if (getTransactionContext().isInLocalTransaction()) {
445                 rollback();
446             }
447             doClose();
448             closed = true;
449         }
450     }
451 
452     protected void doClose() throws JMSException {
453         doAcknowledge(true);
454         deliveredMessages.clear();
455         for (Iterator i = consumers.iterator();i.hasNext();) {
456             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
457             consumer.close();
458         }
459         for (Iterator i = producers.iterator();i.hasNext();) {
460             ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
461             producer.close();
462         }
463         consumers.clear();
464         producers.clear();
465         this.connection.removeSession(this);
466         this.transactionContext.removeSession(this);
467         messageExecutor.close();
468     }
469 
470     /**
471      * @throws IllegalStateException if the Session is closed
472      */
473     protected void checkClosed() throws IllegalStateException {
474         if (this.closed) {
475             throw new IllegalStateException("The Session is closed");
476         }
477     }
478 
479     /**
480      * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
481      * <P>
482      * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
483      * messages that have been delivered to the client.
484      * <P>
485      * Restarting a session causes it to take the following actions:
486      * <UL>
487      * <LI>Stop message delivery
488      * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
489      * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
490      * Redelivered messages do not have to be delivered in exactly their original delivery order.
491      * </UL>
492      * 
493      * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error.
494      * @throws IllegalStateException if the method is called by a transacted session.
495      */
496     public void recover() throws JMSException {
497         checkClosed();
498         if (getTransacted()) {
499             throw new IllegalStateException("This session is transacted");
500         }
501         redeliverUnacknowledgedMessages();
502     }
503 
504     /**
505      * Returns the session's distinguished message listener (optional).
506      * 
507      * @return the message listener associated with this session
508      * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
509      * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
510      * @see javax.jms.ServerSessionPool
511      * @see javax.jms.ServerSession
512      */
513     public MessageListener getMessageListener() throws JMSException {
514         checkClosed();
515         return this.messageListener;
516     }
517 
518     /**
519      * Sets the session's distinguished message listener (optional).
520      * <P>
521      * When the distinguished message listener is set, no other form of message receipt in the session can be used;
522      * however, all forms of sending messages are still supported.
523      * <P>
524      * This is an expert facility not used by regular JMS clients.
525      * 
526      * @param listener the message listener to associate with this session
527      * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
528      * @see javax.jms.Session#getMessageListener()
529      * @see javax.jms.ServerSessionPool
530      * @see javax.jms.ServerSession
531      */
532     public void setMessageListener(MessageListener listener) throws JMSException {
533         checkClosed();
534         this.messageListener = listener;
535         if (listener != null) {
536             messageExecutor.setDispatchedBySessionPool(true);
537         }
538     }
539 
540     /**
541      * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
542      * 
543      * @see javax.jms.ServerSession
544      */
545     public void run() {
546         ActiveMQMessage message;
547         while ((message = messageExecutor.dequeueNoWait()) != null) {
548             if( deliveryListener!=null )
549                 deliveryListener.beforeDelivery(this, message);
550             beforeMessageDelivered(message);
551             deliver(message);
552             if( deliveryListener!=null )
553                 deliveryListener.afterDelivery(this, message);
554         }
555     }
556 
557     /**
558      * Delivers a message to the messageListern
559      * @param message The message to deliver
560      */ 
561     private void deliver(ActiveMQMessage message) {
562         if (!message.isExpired() && this.messageListener != null) {
563             try {
564                 
565                 if( log.isDebugEnabled() ) {
566                     log.debug("Message delivered to session message listener: "+message);
567                 }
568                 
569                 this.messageListener.onMessage(message);
570                 this.afterMessageDelivered(true, message, true, false, true);
571             }
572             catch (Throwable t) {
573                 log.info("Caught :" + t, t);
574                 this.afterMessageDelivered(true, message, false, false, true);
575             }
576         }
577         else {
578             this.afterMessageDelivered(true, message, false, message.isExpired(), true);
579         }
580     }
581 
582     /**
583      * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
584      * <P>
585      * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
586      * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
587      * destination parameter to create a <CODE>MessageProducer</CODE> object.
588      * 
589      * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a
590      * specified destination.
591      * @return the MessageProducer
592      * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
593      * @throws InvalidDestinationException if an invalid destination is specified.
594      * @since 1.1
595      */
596     public MessageProducer createProducer(Destination destination) throws JMSException {
597         checkClosed();
598         return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
599     }
600 
601     /**
602      * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
603      * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
604      * create a <CODE>MessageConsumer</CODE>.
605      * 
606      * @param destination the <CODE>Destination</CODE> to access.
607      * @return the MessageConsumer
608      * @throws JMSException if the session fails to create a consumer due to some internal error.
609      * @throws InvalidDestinationException if an invalid destination is specified.
610      * @since 1.1
611      */
612     public MessageConsumer createConsumer(Destination destination) throws JMSException {
613         checkClosed();
614         int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
615                 .getPrefetchPolicy().getQueuePrefetch();
616         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
617                 "", this.connection.getNextConsumerNumber(), prefetch, false, false);
618     }
619 
620     /**
621      * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
622      * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
623      * destination parameter to create a <CODE>MessageConsumer</CODE>.
624      * <P>
625      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
626      * 
627      * @param destination the <CODE>Destination</CODE> to access
628      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
629      * value of null or an empty string indicates that there is no message selector for the message consumer.
630      * @return the MessageConsumer
631      * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
632      * @throws InvalidDestinationException if an invalid destination is specified.
633      * @throws InvalidSelectorException if the message selector is invalid.
634      * @since 1.1
635      */
636     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
637         checkClosed();
638         int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
639                 .getPrefetchPolicy().getQueuePrefetch();
640         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
641                 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
642     }
643 
644     /**
645      * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
646      * specify whether messages published by its own connection should be delivered to it, if the destination is a
647      * topic.
648      * <P>
649      * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
650      * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
651      * <P>
652      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
653      * destination.
654      * <P>
655      * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
656      * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
657      * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
658      * topics.
659      * 
660      * @param destination the <CODE>Destination</CODE> to access
661      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
662      * value of null or an empty string indicates that there is no message selector for the message consumer.
663      * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own
664      * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
665      * @return the MessageConsumer
666      * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
667      * @throws InvalidDestinationException if an invalid destination is specified.
668      * @throws InvalidSelectorException if the message selector is invalid.
669      * @since 1.1
670      */
671     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
672             throws JMSException {
673         checkClosed();
674         int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
675         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
676                 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
677     }
678 
679     /**
680      * Creates a queue identity given a <CODE>Queue</CODE> name.
681      * <P>
682      * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
683      * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
684      * not portable.
685      * <P>
686      * Note that this method is not for creating the physical queue. The physical creation of queues is an
687      * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
688      * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
689      * 
690      * @param queueName the name of this <CODE>Queue</CODE>
691      * @return a <CODE>Queue</CODE> with the given name
692      * @throws JMSException if the session fails to create a queue due to some internal error.
693      * @since 1.1
694      */
695     public Queue createQueue(String queueName) throws JMSException {
696         checkClosed();
697         return new ActiveMQQueue(queueName);
698     }
699 
700     /**
701      * Creates a topic identity given a <CODE>Topic</CODE> name.
702      * <P>
703      * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
704      * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
705      * not portable.
706      * <P>
707      * Note that this method is not for creating the physical topic. The physical creation of topics is an
708      * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
709      * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
710      * 
711      * @param topicName the name of this <CODE>Topic</CODE>
712      * @return a <CODE>Topic</CODE> with the given name
713      * @throws JMSException if the session fails to create a topic due to some internal error.
714      * @since 1.1
715      */
716     public Topic createTopic(String topicName) throws JMSException {
717         checkClosed();
718         return new ActiveMQTopic(topicName);
719     }
720 
721     /**
722      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
723      * 
724      * @param queue the <CODE>queue</CODE> to access
725      * @exception InvalidDestinationException if an invalid destination is specified
726      * @since 1.1
727      */
728     /**
729      * Creates a durable subscriber to the specified topic.
730      * <P>
731      * If a client needs to receive all the messages published on a topic, including the ones published while the
732      * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
733      * this durable subscription and insures that all messages from the topic's publishers are retained until they are
734      * acknowledged by this durable subscriber or they have expired.
735      * <P>
736      * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
737      * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
738      * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
739      * <P>
740      * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
741      * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
742      * unsubscribing (deleting) the old one and creating a new one.
743      * <P>
744      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
745      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
746      * value for this attribute is false.
747      * 
748      * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
749      * @param name the name used to identify this subscription
750      * @return the TopicSubscriber
751      * @throws JMSException if the session fails to create a subscriber due to some internal error.
752      * @throws InvalidDestinationException if an invalid topic is specified.
753      * @since 1.1
754      */
755     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
756         checkClosed();
757         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
758                 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
759                 false, false);
760     }
761 
762     /**
763      * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
764      * published by its own connection should be delivered to it.
765      * <P>
766      * If a client needs to receive all the messages published on a topic, including the ones published while the
767      * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
768      * this durable subscription and insures that all messages from the topic's publishers are retained until they are
769      * acknowledged by this durable subscriber or they have expired.
770      * <P>
771      * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
772      * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
773      * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
774      * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it.
775      * <P>
776      * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
777      * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
778      * unsubscribing (deleting) the old one and creating a new one.
779      * 
780      * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
781      * @param name the name used to identify this subscription
782      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
783      * value of null or an empty string indicates that there is no message selector for the message consumer.
784      * @param noLocal if set, inhibits the delivery of messages published by its own connection
785      * @return the Queue Browser
786      * @throws JMSException if the session fails to create a subscriber due to some internal error.
787      * @throws InvalidDestinationException if an invalid topic is specified.
788      * @throws InvalidSelectorException if the message selector is invalid.
789      * @since 1.1
790      */
791     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
792             throws JMSException {
793         checkClosed();
794         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
795                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
796                         .getDurableTopicPrefetch(), noLocal, false);
797     }
798 
799     /**
800      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
801      * 
802      * @param queue the <CODE>queue</CODE> to access
803      * @return the Queue Browser
804      * @throws JMSException if the session fails to create a browser due to some internal error.
805      * @throws InvalidDestinationException if an invalid destination is specified
806      * @since 1.1
807      */
808     public QueueBrowser createBrowser(Queue queue) throws JMSException {
809         checkClosed();
810         return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue), "",
811                 this.connection.getNextConsumerNumber());
812     }
813 
814     /**
815      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
816      * selector.
817      * 
818      * @param queue the <CODE>queue</CODE> to access
819      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
820      * value of null or an empty string indicates that there is no message selector for the message consumer.
821      * @return the Queue Browser
822      * @throws JMSException if the session fails to create a browser due to some internal error.
823      * @throws InvalidDestinationException if an invalid destination is specified
824      * @throws InvalidSelectorException if the message selector is invalid.
825      * @since 1.1
826      */
827     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
828         checkClosed();
829         return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue),
830                 messageSelector, this.connection.getNextConsumerNumber());
831     }
832 
833     /**
834      * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
835      * it is deleted earlier.
836      * 
837      * @return a temporary queue identity
838      * @throws JMSException if the session fails to create a temporary queue due to some internal error.
839      * @since 1.1
840      */
841     public TemporaryQueue createTemporaryQueue() throws JMSException {
842         checkClosed();
843         String tempQueueName = "TemporaryQueue-"
844                 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
845         tempQueueName += this.temporaryDestinationGenerator.generateId();
846         ActiveMQTemporaryQueue tempQueue =  new ActiveMQTemporaryQueue(tempQueueName);
847        tempQueue.setSessionCreatedBy(this);
848        this.connection.startTemporaryDestination(tempQueue);
849        return tempQueue;
850     }
851 
852     /**
853      * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
854      * it is deleted earlier.
855      * 
856      * @return a temporary topic identity
857      * @throws JMSException if the session fails to create a temporary topic due to some internal error.
858      * @since 1.1
859      */
860     public TemporaryTopic createTemporaryTopic() throws JMSException {
861         checkClosed();
862         String tempTopicName = "TemporaryTopic-"
863                 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
864         tempTopicName += this.temporaryDestinationGenerator.generateId();
865         ActiveMQTemporaryTopic tempTopic =  new ActiveMQTemporaryTopic(tempTopicName);
866         tempTopic.setSessionCreatedBy(this);
867         this.connection.startTemporaryDestination(tempTopic);
868         return tempTopic;
869     }
870 
871     /**
872      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
873      * 
874      * @param queue the <CODE>Queue</CODE> to access
875      * @return @throws JMSException if the session fails to create a receiver due to some internal error.
876      * @throws JMSException
877      * @throws InvalidDestinationException if an invalid queue is specified.
878      */
879     public QueueReceiver createReceiver(Queue queue) throws JMSException {
880         checkClosed();
881         return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
882                 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
883     }
884 
885     /**
886      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
887      * selector.
888      * 
889      * @param queue the <CODE>Queue</CODE> to access
890      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
891      * value of null or an empty string indicates that there is no message selector for the message consumer.
892      * @return QueueReceiver
893      * @throws JMSException if the session fails to create a receiver due to some internal error.
894      * @throws InvalidDestinationException if an invalid queue is specified.
895      * @throws InvalidSelectorException if the message selector is invalid.
896      */
897     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
898         checkClosed();
899         return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
900                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
901                         .getQueuePrefetch());
902     }
903 
904     /**
905      * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
906      * 
907      * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
908      * @return QueueSender
909      * @throws JMSException if the session fails to create a sender due to some internal error.
910      * @throws InvalidDestinationException if an invalid queue is specified.
911      */
912     public QueueSender createSender(Queue queue) throws JMSException {
913         checkClosed();
914         return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
915     }
916 
917     /**
918      * Creates a nondurable subscriber to the specified topic. <p/>
919      * <P>
920      * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
921      * <p/>
922      * <P>
923      * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
924      * while they are active. <p/>
925      * <P>
926      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
927      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
928      * value for this attribute is false.
929      * 
930      * @param topic the <CODE>Topic</CODE> to subscribe to
931      * @return TopicSubscriber
932      * @throws JMSException if the session fails to create a subscriber due to some internal error.
933      * @throws InvalidDestinationException if an invalid topic is specified.
934      */
935     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
936         checkClosed();
937         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
938                 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
939                 false);
940     }
941 
942     /**
943      * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
944      * published by its own connection should be delivered to it. <p/>
945      * <P>
946      * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
947      * <p/>
948      * <P>
949      * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
950      * while they are active. <p/>
951      * <P>
952      * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
953      * subscriber's perspective, they do not exist. <p/>
954      * <P>
955      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
956      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
957      * value for this attribute is false.
958      * 
959      * @param topic the <CODE>Topic</CODE> to subscribe to
960      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
961      * value of null or an empty string indicates that there is no message selector for the message consumer.
962      * @param noLocal if set, inhibits the delivery of messages published by its own connection
963      * @return TopicSubscriber
964      * @throws JMSException if the session fails to create a subscriber due to some internal error.
965      * @throws InvalidDestinationException if an invalid topic is specified.
966      * @throws InvalidSelectorException if the message selector is invalid.
967      */
968     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
969         checkClosed();
970         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
971                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
972                         .getTopicPrefetch(), noLocal, false);
973     }
974 
975     /**
976      * Creates a publisher for the specified topic. <p/>
977      * <P>
978      * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
979      * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
980      * relationship with the messages it has previously sent.
981      * 
982      * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
983      * @return TopicPublisher
984      * @throws JMSException if the session fails to create a publisher due to some internal error.
985      * @throws InvalidDestinationException if an invalid topic is specified.
986      */
987     public TopicPublisher createPublisher(Topic topic) throws JMSException {
988         checkClosed();
989         return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
990     }
991 
992     /**
993      * Unsubscribes a durable subscription that has been created by a client.
994      * <P>
995      * This method deletes the state being maintained on behalf of the subscriber by its provider.
996      * <P>
997      * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
998      * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
999      * transaction or has not been acknowledged in the session.
1000     * 
1001     * @param name the name used to identify this subscription
1002     * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
1003     * @throws InvalidDestinationException if an invalid subscription name is specified.
1004     * @since 1.1
1005     */
1006    public void unsubscribe(String name) throws JMSException {
1007        checkClosed();
1008        DurableUnsubscribe ds = new DurableUnsubscribe();
1009        ds.setClientId(this.connection.getClientID());
1010        ds.setSubscriberName(name);
1011        this.connection.syncSendPacket(ds);
1012    }
1013
1014    /**
1015     * Tests to see if the Message Dispatcher is a target for this message
1016     * 
1017     * @param message the message to test
1018     * @return true if the Message Dispatcher can dispatch the message
1019     */
1020    public boolean isTarget(ActiveMQMessage message) {
1021        for (Iterator i = this.consumers.iterator();i.hasNext();) {
1022            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1023            if (message.isConsumerTarget(consumer.getConsumerNumber())) {
1024                return true;
1025            }
1026        }
1027        return false;
1028    }
1029
1030    /**
1031     * Dispatch an ActiveMQMessage
1032     * 
1033     * @param message
1034     */
1035    public void dispatch(ActiveMQMessage message) {
1036        message.setMessageAcknowledge(this);
1037        messageExecutor.execute(message);
1038    }
1039
1040    /**
1041     * Acknowledges all consumed messages of the session of this consumed message.
1042     * <P>
1043     * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
1044     * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
1045     * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
1046     * <P>
1047     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
1048     * implicit acknowledgement modes.
1049     * <P>
1050     * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
1051     * an application-defined group (which is done by calling acknowledge on the last received message of the group,
1052     * thereby acknowledging all messages consumed by the session.)
1053     * <P>
1054     * Messages that have been received but not acknowledged may be redelivered.
1055     * @param caller - the message calling acknowledge on the session
1056     * 
1057     * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
1058     * @throws javax.jms.IllegalStateException if this method is called on a closed session.
1059     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1060     */
1061    public void acknowledge(ActiveMQMessage caller) throws JMSException {
1062        checkClosed();
1063        /**
1064         * Find the caller and ensure it is marked as consumed
1065         * This is to ensure acknowledge called by a 
1066         * MessageListener works correctly
1067         */
1068        ActiveMQMessage msg = (ActiveMQMessage)deliveredMessages.get(caller);
1069        if (msg != null){
1070            msg.setMessageConsumed(true);
1071        }
1072       
1073        doAcknowledge(false);
1074    }
1075
1076    protected void doAcknowledge(boolean isClosing) throws JMSException {
1077        if (!closed) {
1078            if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1079                ActiveMQMessage msg = null;
1080                while((msg = (ActiveMQMessage)deliveredMessages.removeFirst())!=null){
1081                    boolean messageConsumed = isClosing ? false : msg.isMessageConsumed();
1082                    if (!msg.isTransientConsumed()){
1083                        sendMessageAck(msg, messageConsumed, false);
1084                    }else {
1085                        if (!messageConsumed){
1086                            connection.addToTransientConsumedRedeliverCache(msg);
1087                        }
1088                    }
1089                }
1090                deliveredMessages.clear();
1091            }
1092        }
1093    }
1094
1095    protected void beforeMessageDelivered(ActiveMQMessage message) {
1096        if (message != null && !closed) {
1097            deliveredMessages.add(message);
1098        }
1099    }
1100
1101    protected void afterMessageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed,
1102            boolean messageExpired, boolean beforeCalled) {
1103        if (message != null && !closed) {
1104            if ((isClientAcknowledge() && !messageExpired) || (isTransacted() && message.isTransientConsumed())) {
1105                message.setMessageConsumed(messageConsumed);
1106                if (!beforeCalled) {
1107                    deliveredMessages.add(message);
1108                }
1109            }
1110            else {
1111                if (beforeCalled) {
1112                    deliveredMessages.remove(message);
1113                }
1114            }
1115            //don't send acks for expired messages unless sendAcknowledge is set
1116            //the sendAcknowledge flag is set for all messages expect those destined
1117            //for transient Topic subscribers
1118            if (sendAcknowledge && !isClientAcknowledge()) {
1119                try {
1120                    doStartTransaction();
1121                    sendMessageAck(message,messageConsumed,messageExpired);
1122                }
1123                catch (JMSException e) {
1124                    log.warn("failed to notify Broker that message is delivered", e);
1125                }
1126            }
1127        }
1128    }
1129    
1130    /**
1131     * remove a temporary destination
1132     * @param destination
1133     * @throws JMSException if active subscribers already exist
1134     */
1135    public void removeTemporaryDestination(ActiveMQDestination destination) throws JMSException{
1136        this.connection.stopTemporaryDestination(destination);
1137    }
1138    
1139    private void sendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1140            throws JMSException {
1141        if (message.isMessagePart()) {
1142            ActiveMQMessage[] parts = (ActiveMQMessage[]) connection.getAssemblies().remove(message.getParentMessageID());
1143            if (parts != null) {
1144                for (int i = 0;i < parts.length;i++) {
1145                    parts[i].setConsumerIdentifer(message.getConsumerIdentifer());
1146                    doSendMessageAck(parts[i], messageConsumed, messageExpired);
1147                }
1148            }
1149            else {
1150                JMSException jmsEx = new JMSException("Could not find parts for fragemented message: " + message);
1151                connection.onException(jmsEx);
1152            }
1153        }
1154        else {
1155            doSendMessageAck(message, messageConsumed, messageExpired);
1156        }
1157    }
1158    
1159    private void doSendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1160            throws JMSException {
1161        if (message != null && !message.isAdvisory()) {
1162            MessageAck ack = new MessageAck();
1163            ack.setConsumerId(message.getConsumerIdentifer());
1164            ack.setTransactionId(transactionContext.getTransactionId());
1165            ack.setExternalMessageId(message.isExternalMessageId());
1166            ack.setMessageID(message.getJMSMessageID());
1167            ack.setSequenceNumber(message.getSequenceNumber());
1168            ack.setProducerKey(message.getProducerKey());
1169            ack.setMessageRead(messageConsumed);
1170            ack.setDestination(message.getJMSActiveMQDestination());
1171            ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1172            ack.setExpired(messageExpired);
1173            ack.setSessionId(getSessionId());
1174            this.connection.asyncSendPacket(ack);
1175        }
1176    }
1177
1178    /**
1179     * @param consumer
1180     * @throws JMSExcep