Source code: org/activemq/ActiveMQSession.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq;
20 import java.io.IOException;
21 import java.io.Serializable;
22 import java.util.Iterator;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.ListIterator;
26
27 import javax.jms.BytesMessage;
28 import javax.jms.DeliveryMode;
29 import javax.jms.Destination;
30 import javax.jms.IllegalStateException;
31 import javax.jms.InvalidDestinationException;
32 import javax.jms.InvalidSelectorException;
33 import javax.jms.JMSException;
34 import javax.jms.MapMessage;
35 import javax.jms.Message;
36 import javax.jms.MessageConsumer;
37 import javax.jms.MessageListener;
38 import javax.jms.MessageProducer;
39 import javax.jms.ObjectMessage;
40 import javax.jms.Queue;
41 import javax.jms.QueueBrowser;
42 import javax.jms.QueueReceiver;
43 import javax.jms.QueueSender;
44 import javax.jms.QueueSession;
45 import javax.jms.Session;
46 import javax.jms.StreamMessage;
47 import javax.jms.TemporaryQueue;
48 import javax.jms.TemporaryTopic;
49 import javax.jms.TextMessage;
50 import javax.jms.Topic;
51 import javax.jms.TopicPublisher;
52 import javax.jms.TopicSession;
53 import javax.jms.TopicSubscriber;
54 import javax.jms.TransactionRolledBackException;
55
56 import org.activemq.io.util.ByteArray;
57 import org.activemq.io.util.ByteArrayCompression;
58 import org.activemq.io.util.ByteArrayFragmentation;
59 import org.activemq.management.JMSSessionStatsImpl;
60 import org.activemq.management.StatsCapable;
61 import org.activemq.management.StatsImpl;
62 import org.activemq.message.ActiveMQBytesMessage;
63 import org.activemq.message.ActiveMQDestination;
64 import org.activemq.message.ActiveMQMapMessage;
65 import org.activemq.message.ActiveMQMessage;
66 import org.activemq.message.ActiveMQObjectMessage;
67 import org.activemq.message.ActiveMQQueue;
68 import org.activemq.message.ActiveMQStreamMessage;
69 import org.activemq.message.ActiveMQTemporaryQueue;
70 import org.activemq.message.ActiveMQTemporaryTopic;
71 import org.activemq.message.ActiveMQTextMessage;
72 import org.activemq.message.ActiveMQTopic;
73 import org.activemq.message.ConsumerInfo;
74 import org.activemq.message.DurableUnsubscribe;
75 import org.activemq.message.MessageAck;
76 import org.activemq.message.MessageAcknowledge;
77 import org.activemq.message.ProducerInfo;
78 import org.activemq.service.impl.DefaultQueueList;
79 import org.activemq.util.IdGenerator;
80 import org.apache.commons.logging.Log;
81 import org.apache.commons.logging.LogFactory;
82
83 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
84 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
85
86 /**
87 * <P>
88 * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
89 * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
90 * <P>
91 * A session serves several purposes:
92 * <UL>
93 * <LI>It is a factory for its message producers and consumers.
94 * <LI>It supplies provider-optimized message factories.
95 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
96 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
97 * dynamically manipulate provider-specific destination names.
98 * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
99 * units.
100 * <LI>It defines a serial order for the messages it consumes and the messages it produces.
101 * <LI>It retains messages it consumes until they have been acknowledged.
102 * <LI>It serializes execution of message listeners registered with its message consumers.
103 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
104 * </UL>
105 * <P>
106 * A session can create and service multiple message producers and consumers.
107 * <P>
108 * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
109 * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
110 * <P>
111 * If a client desires to have one thread produce messages while others consume them, the client should use a separate
112 * session for its producing thread.
113 * <P>
114 * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
115 * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
116 * constituent objects from another thread of control. The only exception to this rule is the use of the session or
117 * connection <CODE>close</CODE> method.
118 * <P>
119 * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
120 * start simply and incrementally add message processing complexity as their need for concurrency grows.
121 * <P>
122 * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
123 * being executed in another thread.
124 * <P>
125 * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
126 * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
127 * transactions organize a session's input message stream and output message stream into series of atomic units. When a
128 * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
129 * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
130 * recovered.
131 * <P>
132 * The content of a transaction's input and output units is simply those messages that have been produced and consumed
133 * within the session's current transaction.
134 * <P>
135 * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
136 * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a
137 * transacted session always has a current transaction within which its work is done.
138 * <P>
139 * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
140 * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
141 * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
142 * methods in this context is prohibited.
143 * <P>
144 * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
145 * <P>
146 * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
147 * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
148 * JMS API into their application server products.
149 *
150 * @version $Revision: 1.1.1.1 $
151 * @see javax.jms.Session
152 * @see javax.jms.QueueSession
153 * @see javax.jms.TopicSession
154 * @see javax.jms.XASession
155 */
156 public class ActiveMQSession
157 implements
158 Session,
159 QueueSession,
160 TopicSession,
161 ActiveMQMessageDispatcher,
162 MessageAcknowledge,
163 StatsCapable {
164
165 public static interface DeliveryListener {
166 public void beforeDelivery(ActiveMQSession session, Message msg);
167 public void afterDelivery(ActiveMQSession session, Message msg);
168 }
169
170 protected static final int CONSUMER_DISPATCH_UNSET = 1;
171 protected static final int CONSUMER_DISPATCH_ASYNC = 2;
172 protected static final int CONSUMER_DISPATCH_SYNC = 3;
173 private static final Log log = LogFactory.getLog(ActiveMQSession.class);
174 protected ActiveMQConnection connection;
175 protected int acknowledgeMode;
176 protected CopyOnWriteArrayList consumers;
177 protected CopyOnWriteArrayList producers;
178 private IdGenerator temporaryDestinationGenerator;
179 private MessageListener messageListener;
180 protected boolean closed;
181 private SynchronizedBoolean started;
182 private short sessionId;
183 private long startTime;
184 private DefaultQueueList deliveredMessages;
185 private ActiveMQSessionExecutor messageExecutor;
186 private JMSSessionStatsImpl stats;
187 private int consumerDispatchState;
188 private ByteArrayCompression compression;
189 private TransactionContext transactionContext;
190 private boolean internalSession;
191 private DeliveryListener deliveryListener;
192
193 /**
194 * Construct the Session
195 *
196 * @param theConnection
197 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
198 * @throws JMSException on internal error
199 */
200 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
201 this(theConnection, theAcknowledgeMode,theConnection.isOptimizedMessageDispatch());
202 }
203
204 /**
205 * Construct the Session
206 *
207 * @param theConnection
208 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
209 * @param optimizedDispatch
210 * @throws JMSException on internal error
211 */
212 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode,boolean optimizedDispatch) throws JMSException {
213 this.connection = theConnection;
214 this.acknowledgeMode = theAcknowledgeMode;
215 setTransactionContext(new TransactionContext(theConnection));
216 this.consumers = new CopyOnWriteArrayList();
217 this.producers = new CopyOnWriteArrayList();
218 this.temporaryDestinationGenerator = new IdGenerator();
219 this.started = new SynchronizedBoolean(false);
220 this.sessionId = connection.generateSessionId();
221 this.startTime = System.currentTimeMillis();
222 this.deliveredMessages = new DefaultQueueList();
223 this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue("Session("
224 + sessionId + ")"));
225 this.messageExecutor.setOptimizedMessageDispatch(optimizedDispatch);
226 connection.addSession(this);
227 stats = new JMSSessionStatsImpl(producers, consumers);
228 this.consumerDispatchState = CONSUMER_DISPATCH_UNSET;
229 this.compression = new ByteArrayCompression();
230 this.compression.setCompressionLevel(theConnection.getMessageCompressionLevel());
231 this.compression.setCompressionStrategy(theConnection.getMessageCompressionStrategy());
232 this.compression.setCompressionLimit(theConnection.getMessageCompressionLimit());
233
234 this.internalSession = theConnection.isInternalConnection();
235 }
236
237 public void setTransactionContext(TransactionContext transactionContext) {
238 if( this.transactionContext!=null ) {
239 this.transactionContext.removeSession(this);
240 }
241 this.transactionContext = transactionContext;
242 this.transactionContext.addSession(this);
243 }
244
245 public TransactionContext getTransactionContext() {
246 return transactionContext;
247 }
248
249 public StatsImpl getStats() {
250 return stats;
251 }
252
253 public JMSSessionStatsImpl getSessionStats() {
254 return stats;
255 }
256
257 /**
258 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
259 * containing a stream of uninterpreted bytes.
260 *
261 * @return the an ActiveMQBytesMessage
262 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
263 */
264 public BytesMessage createBytesMessage() throws JMSException {
265 checkClosed();
266 return new ActiveMQBytesMessage();
267 }
268
269 /**
270 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
271 * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
272 * Java programming language.
273 *
274 * @return an ActiveMQMapMessage
275 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
276 */
277 public MapMessage createMapMessage() throws JMSException {
278 checkClosed();
279 return new ActiveMQMapMessage();
280 }
281
282 /**
283 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
284 * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
285 * a message containing only header information is sufficient.
286 *
287 * @return an ActiveMQMessage
288 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
289 */
290 public Message createMessage() throws JMSException {
291 checkClosed();
292 return new ActiveMQMessage();
293 }
294
295 /**
296 * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
297 * that contains a serializable Java object.
298 *
299 * @return an ActiveMQObjectMessage
300 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
301 */
302 public ObjectMessage createObjectMessage() throws JMSException {
303 checkClosed();
304 return new ActiveMQObjectMessage();
305 }
306
307 /**
308 * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
309 * send a message that contains a serializable Java object.
310 *
311 * @param object the object to use to initialize this message
312 * @return an ActiveMQObjectMessage
313 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
314 */
315 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
316 checkClosed();
317 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
318 msg.setObject(object);
319 return msg;
320 }
321
322 /**
323 * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
324 * self-defining stream of primitive values in the Java programming language.
325 *
326 * @return an ActiveMQStreamMessage
327 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
328 */
329 public StreamMessage createStreamMessage() throws JMSException {
330 checkClosed();
331 return new ActiveMQStreamMessage();
332 }
333
334 /**
335 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
336 * containing a <CODE>String</CODE> object.
337 *
338 * @return an ActiveMQTextMessage
339 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
340 */
341 public TextMessage createTextMessage() throws JMSException {
342 checkClosed();
343 return new ActiveMQTextMessage();
344 }
345
346 /**
347 * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
348 * message containing a <CODE>String</CODE>.
349 *
350 * @param text the string used to initialize this message
351 * @return an ActiveMQTextMessage
352 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
353 */
354 public TextMessage createTextMessage(String text) throws JMSException {
355 checkClosed();
356 ActiveMQTextMessage msg = new ActiveMQTextMessage();
357 msg.setText(text);
358 return msg;
359 }
360
361 /**
362 * Indicates whether the session is in transacted mode.
363 *
364 * @return true if the session is in transacted mode
365 * @throws JMSException if there is some internal error.
366 */
367 public boolean getTransacted() throws JMSException {
368 checkClosed();
369 return this.acknowledgeMode == Session.SESSION_TRANSACTED || transactionContext.isInXATransaction();
370 }
371
372 /**
373 * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
374 * created. If the session is transacted, the acknowledgement mode is ignored.
375 *
376 * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
377 * session is transacted, returns SESSION_TRANSACTED.
378 * @throws JMSException
379 * @see javax.jms.Connection#createSession(boolean,int)
380 * @since 1.1 exception JMSException if there is some internal error.
381 */
382 public int getAcknowledgeMode() throws JMSException {
383 checkClosed();
384 return this.acknowledgeMode;
385 }
386
387 /**
388 * Commits all messages done in this transaction and releases any locks currently held.
389 *
390 * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
391 * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
392 * commit.
393 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
394 */
395 public void commit() throws JMSException {
396 checkClosed();
397 if (!getTransacted()) {
398 throw new javax.jms.IllegalStateException("Not a transacted session");
399 }
400 transactionContext.commit();
401 }
402
403 /**
404 * Rolls back any messages done in this transaction and releases any locks currently held.
405 *
406 * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
407 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
408 */
409 public void rollback() throws JMSException {
410 checkClosed();
411 if (!getTransacted()) {
412 throw new javax.jms.IllegalStateException("Not a transacted session");
413 }
414 transactionContext.rollback();
415 }
416
417 public void clearDeliveredMessages() {
418 deliveredMessages.clear();
419 }
420
421 /**
422 * Closes the session.
423 * <P>
424 * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
425 * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
426 * be timely enough.
427 * <P>
428 * There is no need to close the producers and consumers of a closed session.
429 * <P>
430 * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
431 * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
432 * <P>
433 * Closing a transacted session must roll back the transaction in progress.
434 * <P>
435 * This method is the only <CODE>Session</CODE> method that can be called concurrently.
436 * <P>
437 * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
438 * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
439 *
440 * @throws JMSException if the JMS provider fails to close the session due to some internal error.
441 */
442 public void close() throws JMSException {
443 if (!this.closed) {
444 if (getTransactionContext().isInLocalTransaction()) {
445 rollback();
446 }
447 doClose();
448 closed = true;
449 }
450 }
451
452 protected void doClose() throws JMSException {
453 doAcknowledge(true);
454 deliveredMessages.clear();
455 for (Iterator i = consumers.iterator();i.hasNext();) {
456 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
457 consumer.close();
458 }
459 for (Iterator i = producers.iterator();i.hasNext();) {
460 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
461 producer.close();
462 }
463 consumers.clear();
464 producers.clear();
465 this.connection.removeSession(this);
466 this.transactionContext.removeSession(this);
467 messageExecutor.close();
468 }
469
470 /**
471 * @throws IllegalStateException if the Session is closed
472 */
473 protected void checkClosed() throws IllegalStateException {
474 if (this.closed) {
475 throw new IllegalStateException("The Session is closed");
476 }
477 }
478
479 /**
480 * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
481 * <P>
482 * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
483 * messages that have been delivered to the client.
484 * <P>
485 * Restarting a session causes it to take the following actions:
486 * <UL>
487 * <LI>Stop message delivery
488 * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
489 * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
490 * Redelivered messages do not have to be delivered in exactly their original delivery order.
491 * </UL>
492 *
493 * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error.
494 * @throws IllegalStateException if the method is called by a transacted session.
495 */
496 public void recover() throws JMSException {
497 checkClosed();
498 if (getTransacted()) {
499 throw new IllegalStateException("This session is transacted");
500 }
501 redeliverUnacknowledgedMessages();
502 }
503
504 /**
505 * Returns the session's distinguished message listener (optional).
506 *
507 * @return the message listener associated with this session
508 * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
509 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
510 * @see javax.jms.ServerSessionPool
511 * @see javax.jms.ServerSession
512 */
513 public MessageListener getMessageListener() throws JMSException {
514 checkClosed();
515 return this.messageListener;
516 }
517
518 /**
519 * Sets the session's distinguished message listener (optional).
520 * <P>
521 * When the distinguished message listener is set, no other form of message receipt in the session can be used;
522 * however, all forms of sending messages are still supported.
523 * <P>
524 * This is an expert facility not used by regular JMS clients.
525 *
526 * @param listener the message listener to associate with this session
527 * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
528 * @see javax.jms.Session#getMessageListener()
529 * @see javax.jms.ServerSessionPool
530 * @see javax.jms.ServerSession
531 */
532 public void setMessageListener(MessageListener listener) throws JMSException {
533 checkClosed();
534 this.messageListener = listener;
535 if (listener != null) {
536 messageExecutor.setDispatchedBySessionPool(true);
537 }
538 }
539
540 /**
541 * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
542 *
543 * @see javax.jms.ServerSession
544 */
545 public void run() {
546 ActiveMQMessage message;
547 while ((message = messageExecutor.dequeueNoWait()) != null) {
548 if( deliveryListener!=null )
549 deliveryListener.beforeDelivery(this, message);
550 beforeMessageDelivered(message);
551 deliver(message);
552 if( deliveryListener!=null )
553 deliveryListener.afterDelivery(this, message);
554 }
555 }
556
557 /**
558 * Delivers a message to the messageListern
559 * @param message The message to deliver
560 */
561 private void deliver(ActiveMQMessage message) {
562 if (!message.isExpired() && this.messageListener != null) {
563 try {
564
565 if( log.isDebugEnabled() ) {
566 log.debug("Message delivered to session message listener: "+message);
567 }
568
569 this.messageListener.onMessage(message);
570 this.afterMessageDelivered(true, message, true, false, true);
571 }
572 catch (Throwable t) {
573 log.info("Caught :" + t, t);
574 this.afterMessageDelivered(true, message, false, false, true);
575 }
576 }
577 else {
578 this.afterMessageDelivered(true, message, false, message.isExpired(), true);
579 }
580 }
581
582 /**
583 * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
584 * <P>
585 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
586 * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
587 * destination parameter to create a <CODE>MessageProducer</CODE> object.
588 *
589 * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a
590 * specified destination.
591 * @return the MessageProducer
592 * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
593 * @throws InvalidDestinationException if an invalid destination is specified.
594 * @since 1.1
595 */
596 public MessageProducer createProducer(Destination destination) throws JMSException {
597 checkClosed();
598 return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
599 }
600
601 /**
602 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
603 * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
604 * create a <CODE>MessageConsumer</CODE>.
605 *
606 * @param destination the <CODE>Destination</CODE> to access.
607 * @return the MessageConsumer
608 * @throws JMSException if the session fails to create a consumer due to some internal error.
609 * @throws InvalidDestinationException if an invalid destination is specified.
610 * @since 1.1
611 */
612 public MessageConsumer createConsumer(Destination destination) throws JMSException {
613 checkClosed();
614 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
615 .getPrefetchPolicy().getQueuePrefetch();
616 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
617 "", this.connection.getNextConsumerNumber(), prefetch, false, false);
618 }
619
620 /**
621 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
622 * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
623 * destination parameter to create a <CODE>MessageConsumer</CODE>.
624 * <P>
625 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
626 *
627 * @param destination the <CODE>Destination</CODE> to access
628 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
629 * value of null or an empty string indicates that there is no message selector for the message consumer.
630 * @return the MessageConsumer
631 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
632 * @throws InvalidDestinationException if an invalid destination is specified.
633 * @throws InvalidSelectorException if the message selector is invalid.
634 * @since 1.1
635 */
636 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
637 checkClosed();
638 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
639 .getPrefetchPolicy().getQueuePrefetch();
640 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
641 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
642 }
643
644 /**
645 * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
646 * specify whether messages published by its own connection should be delivered to it, if the destination is a
647 * topic.
648 * <P>
649 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
650 * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
651 * <P>
652 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
653 * destination.
654 * <P>
655 * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
656 * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
657 * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
658 * topics.
659 *
660 * @param destination the <CODE>Destination</CODE> to access
661 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
662 * value of null or an empty string indicates that there is no message selector for the message consumer.
663 * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own
664 * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
665 * @return the MessageConsumer
666 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
667 * @throws InvalidDestinationException if an invalid destination is specified.
668 * @throws InvalidSelectorException if the message selector is invalid.
669 * @since 1.1
670 */
671 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
672 throws JMSException {
673 checkClosed();
674 int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
675 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
676 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
677 }
678
679 /**
680 * Creates a queue identity given a <CODE>Queue</CODE> name.
681 * <P>
682 * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
683 * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
684 * not portable.
685 * <P>
686 * Note that this method is not for creating the physical queue. The physical creation of queues is an
687 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
688 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
689 *
690 * @param queueName the name of this <CODE>Queue</CODE>
691 * @return a <CODE>Queue</CODE> with the given name
692 * @throws JMSException if the session fails to create a queue due to some internal error.
693 * @since 1.1
694 */
695 public Queue createQueue(String queueName) throws JMSException {
696 checkClosed();
697 return new ActiveMQQueue(queueName);
698 }
699
700 /**
701 * Creates a topic identity given a <CODE>Topic</CODE> name.
702 * <P>
703 * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
704 * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
705 * not portable.
706 * <P>
707 * Note that this method is not for creating the physical topic. The physical creation of topics is an
708 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
709 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
710 *
711 * @param topicName the name of this <CODE>Topic</CODE>
712 * @return a <CODE>Topic</CODE> with the given name
713 * @throws JMSException if the session fails to create a topic due to some internal error.
714 * @since 1.1
715 */
716 public Topic createTopic(String topicName) throws JMSException {
717 checkClosed();
718 return new ActiveMQTopic(topicName);
719 }
720
721 /**
722 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
723 *
724 * @param queue the <CODE>queue</CODE> to access
725 * @exception InvalidDestinationException if an invalid destination is specified
726 * @since 1.1
727 */
728 /**
729 * Creates a durable subscriber to the specified topic.
730 * <P>
731 * If a client needs to receive all the messages published on a topic, including the ones published while the
732 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
733 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
734 * acknowledged by this durable subscriber or they have expired.
735 * <P>
736 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
737 * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
738 * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
739 * <P>
740 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
741 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
742 * unsubscribing (deleting) the old one and creating a new one.
743 * <P>
744 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
745 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
746 * value for this attribute is false.
747 *
748 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
749 * @param name the name used to identify this subscription
750 * @return the TopicSubscriber
751 * @throws JMSException if the session fails to create a subscriber due to some internal error.
752 * @throws InvalidDestinationException if an invalid topic is specified.
753 * @since 1.1
754 */
755 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
756 checkClosed();
757 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
758 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
759 false, false);
760 }
761
762 /**
763 * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
764 * published by its own connection should be delivered to it.
765 * <P>
766 * If a client needs to receive all the messages published on a topic, including the ones published while the
767 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
768 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
769 * acknowledged by this durable subscriber or they have expired.
770 * <P>
771 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
772 * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
773 * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
774 * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it.
775 * <P>
776 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
777 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
778 * unsubscribing (deleting) the old one and creating a new one.
779 *
780 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
781 * @param name the name used to identify this subscription
782 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
783 * value of null or an empty string indicates that there is no message selector for the message consumer.
784 * @param noLocal if set, inhibits the delivery of messages published by its own connection
785 * @return the Queue Browser
786 * @throws JMSException if the session fails to create a subscriber due to some internal error.
787 * @throws InvalidDestinationException if an invalid topic is specified.
788 * @throws InvalidSelectorException if the message selector is invalid.
789 * @since 1.1
790 */
791 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
792 throws JMSException {
793 checkClosed();
794 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
795 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
796 .getDurableTopicPrefetch(), noLocal, false);
797 }
798
799 /**
800 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
801 *
802 * @param queue the <CODE>queue</CODE> to access
803 * @return the Queue Browser
804 * @throws JMSException if the session fails to create a browser due to some internal error.
805 * @throws InvalidDestinationException if an invalid destination is specified
806 * @since 1.1
807 */
808 public QueueBrowser createBrowser(Queue queue) throws JMSException {
809 checkClosed();
810 return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue), "",
811 this.connection.getNextConsumerNumber());
812 }
813
814 /**
815 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
816 * selector.
817 *
818 * @param queue the <CODE>queue</CODE> to access
819 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
820 * value of null or an empty string indicates that there is no message selector for the message consumer.
821 * @return the Queue Browser
822 * @throws JMSException if the session fails to create a browser due to some internal error.
823 * @throws InvalidDestinationException if an invalid destination is specified
824 * @throws InvalidSelectorException if the message selector is invalid.
825 * @since 1.1
826 */
827 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
828 checkClosed();
829 return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue),
830 messageSelector, this.connection.getNextConsumerNumber());
831 }
832
833 /**
834 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
835 * it is deleted earlier.
836 *
837 * @return a temporary queue identity
838 * @throws JMSException if the session fails to create a temporary queue due to some internal error.
839 * @since 1.1
840 */
841 public TemporaryQueue createTemporaryQueue() throws JMSException {
842 checkClosed();
843 String tempQueueName = "TemporaryQueue-"
844 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
845 tempQueueName += this.temporaryDestinationGenerator.generateId();
846 ActiveMQTemporaryQueue tempQueue = new ActiveMQTemporaryQueue(tempQueueName);
847 tempQueue.setSessionCreatedBy(this);
848 this.connection.startTemporaryDestination(tempQueue);
849 return tempQueue;
850 }
851
852 /**
853 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
854 * it is deleted earlier.
855 *
856 * @return a temporary topic identity
857 * @throws JMSException if the session fails to create a temporary topic due to some internal error.
858 * @since 1.1
859 */
860 public TemporaryTopic createTemporaryTopic() throws JMSException {
861 checkClosed();
862 String tempTopicName = "TemporaryTopic-"
863 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
864 tempTopicName += this.temporaryDestinationGenerator.generateId();
865 ActiveMQTemporaryTopic tempTopic = new ActiveMQTemporaryTopic(tempTopicName);
866 tempTopic.setSessionCreatedBy(this);
867 this.connection.startTemporaryDestination(tempTopic);
868 return tempTopic;
869 }
870
871 /**
872 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
873 *
874 * @param queue the <CODE>Queue</CODE> to access
875 * @return @throws JMSException if the session fails to create a receiver due to some internal error.
876 * @throws JMSException
877 * @throws InvalidDestinationException if an invalid queue is specified.
878 */
879 public QueueReceiver createReceiver(Queue queue) throws JMSException {
880 checkClosed();
881 return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
882 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
883 }
884
885 /**
886 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
887 * selector.
888 *
889 * @param queue the <CODE>Queue</CODE> to access
890 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
891 * value of null or an empty string indicates that there is no message selector for the message consumer.
892 * @return QueueReceiver
893 * @throws JMSException if the session fails to create a receiver due to some internal error.
894 * @throws InvalidDestinationException if an invalid queue is specified.
895 * @throws InvalidSelectorException if the message selector is invalid.
896 */
897 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
898 checkClosed();
899 return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
900 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
901 .getQueuePrefetch());
902 }
903
904 /**
905 * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
906 *
907 * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
908 * @return QueueSender
909 * @throws JMSException if the session fails to create a sender due to some internal error.
910 * @throws InvalidDestinationException if an invalid queue is specified.
911 */
912 public QueueSender createSender(Queue queue) throws JMSException {
913 checkClosed();
914 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
915 }
916
917 /**
918 * Creates a nondurable subscriber to the specified topic. <p/>
919 * <P>
920 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
921 * <p/>
922 * <P>
923 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
924 * while they are active. <p/>
925 * <P>
926 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
927 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
928 * value for this attribute is false.
929 *
930 * @param topic the <CODE>Topic</CODE> to subscribe to
931 * @return TopicSubscriber
932 * @throws JMSException if the session fails to create a subscriber due to some internal error.
933 * @throws InvalidDestinationException if an invalid topic is specified.
934 */
935 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
936 checkClosed();
937 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
938 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
939 false);
940 }
941
942 /**
943 * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
944 * published by its own connection should be delivered to it. <p/>
945 * <P>
946 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
947 * <p/>
948 * <P>
949 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
950 * while they are active. <p/>
951 * <P>
952 * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
953 * subscriber's perspective, they do not exist. <p/>
954 * <P>
955 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
956 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
957 * value for this attribute is false.
958 *
959 * @param topic the <CODE>Topic</CODE> to subscribe to
960 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
961 * value of null or an empty string indicates that there is no message selector for the message consumer.
962 * @param noLocal if set, inhibits the delivery of messages published by its own connection
963 * @return TopicSubscriber
964 * @throws JMSException if the session fails to create a subscriber due to some internal error.
965 * @throws InvalidDestinationException if an invalid topic is specified.
966 * @throws InvalidSelectorException if the message selector is invalid.
967 */
968 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
969 checkClosed();
970 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
971 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
972 .getTopicPrefetch(), noLocal, false);
973 }
974
975 /**
976 * Creates a publisher for the specified topic. <p/>
977 * <P>
978 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
979 * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
980 * relationship with the messages it has previously sent.
981 *
982 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
983 * @return TopicPublisher
984 * @throws JMSException if the session fails to create a publisher due to some internal error.
985 * @throws InvalidDestinationException if an invalid topic is specified.
986 */
987 public TopicPublisher createPublisher(Topic topic) throws JMSException {
988 checkClosed();
989 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
990 }
991
992 /**
993 * Unsubscribes a durable subscription that has been created by a client.
994 * <P>
995 * This method deletes the state being maintained on behalf of the subscriber by its provider.
996 * <P>
997 * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
998 * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
999 * transaction or has not been acknowledged in the session.
1000 *
1001 * @param name the name used to identify this subscription
1002 * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
1003 * @throws InvalidDestinationException if an invalid subscription name is specified.
1004 * @since 1.1
1005 */
1006 public void unsubscribe(String name) throws JMSException {
1007 checkClosed();
1008 DurableUnsubscribe ds = new DurableUnsubscribe();
1009 ds.setClientId(this.connection.getClientID());
1010 ds.setSubscriberName(name);
1011 this.connection.syncSendPacket(ds);
1012 }
1013
1014 /**
1015 * Tests to see if the Message Dispatcher is a target for this message
1016 *
1017 * @param message the message to test
1018 * @return true if the Message Dispatcher can dispatch the message
1019 */
1020 public boolean isTarget(ActiveMQMessage message) {
1021 for (Iterator i = this.consumers.iterator();i.hasNext();) {
1022 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1023 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
1024 return true;
1025 }
1026 }
1027 return false;
1028 }
1029
1030 /**
1031 * Dispatch an ActiveMQMessage
1032 *
1033 * @param message
1034 */
1035 public void dispatch(ActiveMQMessage message) {
1036 message.setMessageAcknowledge(this);
1037 messageExecutor.execute(message);
1038 }
1039
1040 /**
1041 * Acknowledges all consumed messages of the session of this consumed message.
1042 * <P>
1043 * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
1044 * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
1045 * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
1046 * <P>
1047 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
1048 * implicit acknowledgement modes.
1049 * <P>
1050 * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
1051 * an application-defined group (which is done by calling acknowledge on the last received message of the group,
1052 * thereby acknowledging all messages consumed by the session.)
1053 * <P>
1054 * Messages that have been received but not acknowledged may be redelivered.
1055 * @param caller - the message calling acknowledge on the session
1056 *
1057 * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
1058 * @throws javax.jms.IllegalStateException if this method is called on a closed session.
1059 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1060 */
1061 public void acknowledge(ActiveMQMessage caller) throws JMSException {
1062 checkClosed();
1063 /**
1064 * Find the caller and ensure it is marked as consumed
1065 * This is to ensure acknowledge called by a
1066 * MessageListener works correctly
1067 */
1068 ActiveMQMessage msg = (ActiveMQMessage)deliveredMessages.get(caller);
1069 if (msg != null){
1070 msg.setMessageConsumed(true);
1071 }
1072
1073 doAcknowledge(false);
1074 }
1075
1076 protected void doAcknowledge(boolean isClosing) throws JMSException {
1077 if (!closed) {
1078 if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1079 ActiveMQMessage msg = null;
1080 while((msg = (ActiveMQMessage)deliveredMessages.removeFirst())!=null){
1081 boolean messageConsumed = isClosing ? false : msg.isMessageConsumed();
1082 if (!msg.isTransientConsumed()){
1083 sendMessageAck(msg, messageConsumed, false);
1084 }else {
1085 if (!messageConsumed){
1086 connection.addToTransientConsumedRedeliverCache(msg);
1087 }
1088 }
1089 }
1090 deliveredMessages.clear();
1091 }
1092 }
1093 }
1094
1095 protected void beforeMessageDelivered(ActiveMQMessage message) {
1096 if (message != null && !closed) {
1097 deliveredMessages.add(message);
1098 }
1099 }
1100
1101 protected void afterMessageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed,
1102 boolean messageExpired, boolean beforeCalled) {
1103 if (message != null && !closed) {
1104 if ((isClientAcknowledge() && !messageExpired) || (isTransacted() && message.isTransientConsumed())) {
1105 message.setMessageConsumed(messageConsumed);
1106 if (!beforeCalled) {
1107 deliveredMessages.add(message);
1108 }
1109 }
1110 else {
1111 if (beforeCalled) {
1112 deliveredMessages.remove(message);
1113 }
1114 }
1115 //don't send acks for expired messages unless sendAcknowledge is set
1116 //the sendAcknowledge flag is set for all messages expect those destined
1117 //for transient Topic subscribers
1118 if (sendAcknowledge && !isClientAcknowledge()) {
1119 try {
1120 doStartTransaction();
1121 sendMessageAck(message,messageConsumed,messageExpired);
1122 }
1123 catch (JMSException e) {
1124 log.warn("failed to notify Broker that message is delivered", e);
1125 }
1126 }
1127 }
1128 }
1129
1130 /**
1131 * remove a temporary destination
1132 * @param destination
1133 * @throws JMSException if active subscribers already exist
1134 */
1135 public void removeTemporaryDestination(ActiveMQDestination destination) throws JMSException{
1136 this.connection.stopTemporaryDestination(destination);
1137 }
1138
1139 private void sendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1140 throws JMSException {
1141 if (message.isMessagePart()) {
1142 ActiveMQMessage[] parts = (ActiveMQMessage[]) connection.getAssemblies().remove(message.getParentMessageID());
1143 if (parts != null) {
1144 for (int i = 0;i < parts.length;i++) {
1145 parts[i].setConsumerIdentifer(message.getConsumerIdentifer());
1146 doSendMessageAck(parts[i], messageConsumed, messageExpired);
1147 }
1148 }
1149 else {
1150 JMSException jmsEx = new JMSException("Could not find parts for fragemented message: " + message);
1151 connection.onException(jmsEx);
1152 }
1153 }
1154 else {
1155 doSendMessageAck(message, messageConsumed, messageExpired);
1156 }
1157 }
1158
1159 private void doSendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1160 throws JMSException {
1161 if (message != null && !message.isAdvisory()) {
1162 MessageAck ack = new MessageAck();
1163 ack.setConsumerId(message.getConsumerIdentifer());
1164 ack.setTransactionId(transactionContext.getTransactionId());
1165 ack.setExternalMessageId(message.isExternalMessageId());
1166 ack.setMessageID(message.getJMSMessageID());
1167 ack.setSequenceNumber(message.getSequenceNumber());
1168 ack.setProducerKey(message.getProducerKey());
1169 ack.setMessageRead(messageConsumed);
1170 ack.setDestination(message.getJMSActiveMQDestination());
1171 ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1172 ack.setExpired(messageExpired);
1173 ack.setSessionId(getSessionId());
1174 this.connection.asyncSendPacket(ack);
1175 }
1176 }
1177
1178 /**
1179 * @param consumer
1180 * @throws JMSExcep