Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ActiveMQConnection.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  import java.util.Map;
24  
25  import javax.jms.Connection;
26  import javax.jms.ConnectionConsumer;
27  import javax.jms.ConnectionMetaData;
28  import javax.jms.DeliveryMode;
29  import javax.jms.Destination;
30  import javax.jms.ExceptionListener;
31  import javax.jms.IllegalStateException;
32  import javax.jms.JMSException;
33  import javax.jms.Queue;
34  import javax.jms.QueueConnection;
35  import javax.jms.QueueSession;
36  import javax.jms.ServerSessionPool;
37  import javax.jms.Session;
38  import javax.jms.Topic;
39  import javax.jms.TopicConnection;
40  import javax.jms.TopicSession;
41  import javax.jms.XAConnection;
42  
43  import org.activemq.advisories.TempDestinationAdvisor;
44  import org.activemq.advisories.TempDestinationAdvisoryEvent;
45  import org.activemq.capacity.CapacityMonitorEvent;
46  import org.activemq.capacity.CapacityMonitorEventListener;
47  import org.activemq.filter.AndFilter;
48  import org.activemq.filter.Filter;
49  import org.activemq.filter.FilterFactory;
50  import org.activemq.filter.FilterFactoryImpl;
51  import org.activemq.filter.NoLocalFilter;
52  import org.activemq.io.util.ByteArray;
53  import org.activemq.io.util.ByteArrayCompression;
54  import org.activemq.io.util.ByteArrayFragmentation;
55  import org.activemq.io.util.MemoryBoundedObjectManager;
56  import org.activemq.io.util.MemoryBoundedQueue;
57  import org.activemq.io.util.MemoryBoundedQueueManager;
58  import org.activemq.management.JMSConnectionStatsImpl;
59  import org.activemq.management.JMSStatsImpl;
60  import org.activemq.management.StatsCapable;
61  import org.activemq.management.StatsImpl;
62  import org.activemq.message.ActiveMQDestination;
63  import org.activemq.message.ActiveMQMessage;
64  import org.activemq.message.ActiveMQObjectMessage;
65  import org.activemq.message.BrokerAdminCommand;
66  import org.activemq.message.CapacityInfo;
67  import org.activemq.message.CleanupConnectionInfo;
68  import org.activemq.message.ConnectionInfo;
69  import org.activemq.message.ConsumerInfo;
70  import org.activemq.message.Packet;
71  import org.activemq.message.PacketListener;
72  import org.activemq.message.ProducerInfo;
73  import org.activemq.message.Receipt;
74  import org.activemq.message.ResponseReceipt;
75  import org.activemq.message.SessionInfo;
76  import org.activemq.message.TransactionInfo;
77  import org.activemq.message.WireFormatInfo;
78  import org.activemq.message.XATransactionInfo;
79  import org.activemq.transport.TransportChannel;
80  import org.activemq.transport.TransportStatusEvent;
81  import org.activemq.transport.TransportStatusEventListener;
82  import org.activemq.util.IdGenerator;
83  import org.activemq.util.JMSExceptionHelper;
84  import org.apache.commons.logging.Log;
85  import org.apache.commons.logging.LogFactory;
86  
87  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
88  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
89  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
90  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
91  
92  /**
93   * A <CODE>Connection</CODE> object is a client's active connection to its JMS
94   * provider. It typically allocates provider resources outside the Java virtual
95   * machine (JVM).
96   * <P>
97   * Connections support concurrent use.
98   * <P>
99   * A connection serves several purposes:
100  * <UL>
101  * <LI>It encapsulates an open connection with a JMS provider. It typically
102  * represents an open TCP/IP socket between a client and the service provider
103  * software.
104  * <LI>Its creation is where client authentication takes place.
105  * <LI>It can specify a unique client identifier.
106  * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
107  * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
108  * </UL>
109  * <P>
110  * Because the creation of a connection involves setting up authentication and
111  * communication, a connection is a relatively heavyweight object. Most clients
112  * will do all their messaging with a single connection. Other more advanced
113  * applications may use several connections. The JMS API does not architect a
114  * reason for using multiple connections; however, there may be operational
115  * reasons for doing so.
116  * <P>
117  * A JMS client typically creates a connection, one or more sessions, and a
118  * number of message producers and consumers. When a connection is created, it
119  * is in stopped mode. That means that no messages are being delivered.
120  * <P>
121  * It is typical to leave the connection in stopped mode until setup is complete
122  * (that is, until all message consumers have been created). At that point, the
123  * client calls the connection's <CODE>start</CODE> method, and messages begin
124  * arriving at the connection's consumers. This setup convention minimizes any
125  * client confusion that may result from asynchronous message delivery while the
126  * client is still in the process of setting itself up.
127  * <P>
128  * A connection can be started immediately, and the setup can be done
129  * afterwards. Clients that do this must be prepared to handle asynchronous
130  * message delivery while they are still in the process of setting up.
131  * <P>
132  * A message producer can send messages while a connection is stopped. <p/>This
133  * class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE>
134  * object is an active connection to a publish/subscribe JMS provider. A client
135  * uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE>
136  * objects for producing and consuming messages.
137  * <P>
138  * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>,
139  * from which specialized topic-related objects can be created. A more general,
140  * and recommended approach is to use the <CODE>Connection </CODE> object.
141  * <P>
142  * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE>
143  * object is an active connection to a point-to-point JMS provider. A client
144  * uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE>
145  * objects for producing and consuming messages.
146  * <P>
147  * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>,
148  * from which specialized queue-related objects can be created. A more general,
149  * and recommended, approach is to use the <CODE>Connection </CODE> object.
150  * <P>
151  * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to
152  * the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE>
153  * method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE>
154  * if used from <CODE>QueueConnection</CODE>. // *
155  * 
156  * @version $Revision: 1.1.1.1 $
157  * @see javax.jms.Connection
158  * @see javax.jms.ConnectionFactory
159  * @see javax.jms.QueueConnection
160  * @see javax.jms.TopicConnection
161  * @see javax.jms.TopicConnectionFactory
162  * @see javax.jms.QueueConnection
163  * @see javax.jms.QueueConnectionFactory
164  */
165 public class ActiveMQConnection implements Connection, PacketListener,
166     ExceptionListener, TopicConnection, QueueConnection, StatsCapable,
167     CapacityMonitorEventListener, TransportStatusEventListener, Closeable {
168 
169   /**
170    * Default UserName for the Connection
171    */
172   public static final String DEFAULT_USER = "defaultUser";
173 
174   /**
175    * Default URL for the ActiveMQ Broker
176    */
177   public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
178 
179   /**
180    * Default client URL. If using a message broker in a hub(s)/spoke
181    * architecture - use the DEFAULT_BROKER_URL
182    * 
183    * @see ActiveMQConnection#DEFAULT_BROKER_URL
184    */
185   public static final String DEFAULT_URL = "peer://development";
186 
187   /**
188    * Default Password for the Connection
189    */
190   public static final String DEFAULT_PASSWORD = "defaultPassword";
191 
192   private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
193 
194   private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
195 
196   // properties
197   private ActiveMQConnectionFactory factory;
198 
199   private String userName;
200 
201   private String password;
202 
203   protected String clientID;
204 
205   private int sendCloseTimeout = 2000;
206 
207   private TransportChannel transportChannel;
208 
209   private ExceptionListener exceptionListener;
210 
211   private ActiveMQPrefetchPolicy prefetchPolicy;
212 
213   private JMSStatsImpl factoryStats;
214 
215   private MemoryBoundedObjectManager memoryManager;
216 
217   private MemoryBoundedQueueManager boundedQueueManager;
218 
219   protected IdGenerator handleIdGenerator;
220 
221   private IdGenerator clientIdGenerator;
222 
223   protected IdGenerator packetIdGenerator;
224 
225   private IdGenerator sessionIdGenerator;
226 
227   private JMSConnectionStatsImpl stats;
228 
229   // internal state
230   private CopyOnWriteArrayList sessions;
231 
232   private CopyOnWriteArrayList messageDispatchers;
233 
234   private CopyOnWriteArrayList connectionConsumers;
235 
236   private SynchronizedInt consumerNumberGenerator;
237 
238   private ActiveMQConnectionMetaData connectionMetaData;
239 
240   private boolean closed;
241 
242   private SynchronizedBoolean started;
243 
244   private boolean clientIDSet;
245 
246   private boolean isConnectionInfoSentToBroker;
247 
248   private boolean isTransportOK;
249 
250   private boolean startedTransport;
251 
252   private long startTime;
253 
254   private long flowControlSleepTime = 0;
255 
256   private boolean quickClose;
257 
258   private boolean internalConnection;// used for notifying that the
259                     // connection is used for networks etc.
260 
261   private boolean userSpecifiedClientID;
262 
263   /**
264    * Should we use an async send for persistent non transacted messages ?
265    */
266   protected boolean useAsyncSend = true;
267 
268   private int sendConnectionInfoTimeout = 30000;
269 
270   private boolean disableTimeStampsByDefault = false;
271 
272   private boolean J2EEcompliant = true;
273 
274   private boolean prepareMessageBodyOnSend = true;
275 
276   private boolean copyMessageOnSend = true;
277 
278   // compression and fragmentation variables
279 
280   private boolean doMessageCompression = true;
281 
282   private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data
283                                               // size
284                                               // above
285                                               // which
286                                               // compression
287                                               // will
288                                               // be
289                                               // used
290 
291   private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
292 
293   private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default
294                                                 // compression
295                                                 // strategy
296 
297   private boolean doMessageFragmentation = false;
298 
299   private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
300 
301   private boolean cachingEnabled = true;
302 
303   private boolean optimizedMessageDispatch = false;
304 
305   private CopyOnWriteArrayList transientConsumedRedeliverCache;
306 
307   private FilterFactory filterFactory;
308 
309   private Map tempDestinationMap;
310 
311   private Map validDestinationsMap;
312 
313   private String resourceManagerId;
314     //used for assembling message fragments
315     private final ConcurrentHashMap assemblies= new ConcurrentHashMap();
316     private final ByteArrayFragmentation fragmentation = new ByteArrayFragmentation();
317 
318   /**
319    * A static helper method to create a new connection
320    * 
321    * @return an ActiveMQConnection
322    * @throws JMSException
323    */
324   public static ActiveMQConnection makeConnection() throws JMSException {
325     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
326     return (ActiveMQConnection) factory.createConnection();
327   }
328 
329   /**
330    * A static helper method to create a new connection
331    * 
332    * @param uri
333    * @return and ActiveMQConnection
334    * @throws JMSException
335    */
336   public static ActiveMQConnection makeConnection(String uri)
337       throws JMSException {
338     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
339     return (ActiveMQConnection) factory.createConnection();
340   }
341 
342   /**
343    * A static helper method to create a new connection
344    * 
345    * @param user
346    * @param password
347    * @param uri
348    * @return an ActiveMQConnection
349    * @throws JMSException
350    */
351   public static ActiveMQConnection makeConnection(String user,
352       String password, String uri) throws JMSException {
353     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,
354         password, uri);
355     return (ActiveMQConnection) factory.createConnection();
356   }
357 
358   /**
359    * Constructs a connection from an existing TransportChannel and
360    * user/password.
361    * 
362    * @param factory
363    * @param theUserName
364    *            the users name
365    * @param thePassword
366    *            the password
367    * @param transportChannel
368    *            the transport channel to communicate with the server
369    * @throws JMSException
370    */
371   public ActiveMQConnection(ActiveMQConnectionFactory factory,
372       String theUserName, String thePassword,
373       TransportChannel transportChannel) throws JMSException {
374     this(factory, theUserName, thePassword);
375     this.transportChannel = transportChannel;
376     this.transportChannel.setPacketListener(this);
377     this.transportChannel.setExceptionListener(this);
378     this.transportChannel.addTransportStatusEventListener(this);
379     this.isTransportOK = true;
380   }
381 
382   protected ActiveMQConnection(ActiveMQConnectionFactory factory,
383       String theUserName, String thePassword) {
384     this.factory = factory;
385     this.userName = theUserName;
386     this.password = thePassword;
387     this.clientIdGenerator = new IdGenerator();
388     this.packetIdGenerator = new IdGenerator();
389     this.handleIdGenerator = new IdGenerator();
390     this.sessionIdGenerator = new IdGenerator();
391     this.consumerNumberGenerator = new SynchronizedInt(0);
392     this.sessions = new CopyOnWriteArrayList();
393     this.messageDispatchers = new CopyOnWriteArrayList();
394     this.connectionConsumers = new CopyOnWriteArrayList();
395     this.connectionMetaData = new ActiveMQConnectionMetaData();
396     this.started = new SynchronizedBoolean(false);
397     this.startTime = System.currentTimeMillis();
398     this.prefetchPolicy = new ActiveMQPrefetchPolicy();
399     this.memoryManager = new MemoryBoundedObjectManager(clientID,
400         DEFAULT_CONNECTION_MEMORY_LIMIT);
401     this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager);
402     this.memoryManager.addCapacityEventListener(this);
403     boolean transactional = this instanceof XAConnection;
404     factoryStats = factory.getFactoryStats();
405     factoryStats.addConnection(this);
406     stats = new JMSConnectionStatsImpl(sessions, transactional);
407     this.transientConsumedRedeliverCache = new CopyOnWriteArrayList();
408     this.tempDestinationMap = new ConcurrentHashMap();
409     this.validDestinationsMap = new ConcurrentHashMap();
410     factory.onConnectionCreate(this);
411   }
412 
413   /**
414    * @return statistics for this Connection
415    */
416   public StatsImpl getStats() {
417     return stats;
418   }
419 
420   /**
421    * @return a number unique for this connection
422    */
423   public JMSConnectionStatsImpl getConnectionStats() {
424     return stats;
425   }
426 
427   /**
428    * Creates a <CODE>Session</CODE> object.
429    * 
430    * @param transacted
431    *            indicates whether the session is transacted
432    * @param acknowledgeMode
433    *            indicates whether the consumer or the client will acknowledge
434    *            any messages it receives; ignored if the session is
435    *            transacted. Legal values are
436    *            <code>Session.AUTO_ACKNOWLEDGE</code>,
437    *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
438    *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
439    * @return a newly created session
440    * @throws JMSException
441    *             if the <CODE>Connection</CODE> object fails to create a
442    *             session due to some internal error or lack of support for the
443    *             specific transaction and acknowledgement mode.
444    * @see Session#AUTO_ACKNOWLEDGE
445    * @see Session#CLIENT_ACKNOWLEDGE
446    * @see Session#DUPS_OK_ACKNOWLEDGE
447    * @since 1.1
448    */
449   public Session createSession(boolean transacted, int acknowledgeMode)
450       throws JMSException {
451     checkClosed();
452     sendConnectionInfoToBroker();
453     return new ActiveMQSession(
454         this,
455         (transacted ? Session.SESSION_TRANSACTED
456             : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
457                 : acknowledgeMode)));
458   }
459 
460   /**
461    * Creates a <CODE>Session</CODE> object.
462    * 
463    * @param transacted
464    *            indicates whether the session is transacted
465    * @param acknowledgeMode
466    *            indicates whether the consumer or the client will acknowledge
467    *            any messages it receives; ignored if the session is
468    *            transacted. Legal values are
469    *            <code>Session.AUTO_ACKNOWLEDGE</code>,
470    *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
471    *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
472    * @param optimizedDispatch
473    * @return a newly created session
474    * @throws JMSException
475    *             if the <CODE>Connection</CODE> object fails to create a
476    *             session due to some internal error or lack of support for the
477    *             specific transaction and acknowledgement mode.
478    * @see Session#AUTO_ACKNOWLEDGE
479    * @see Session#CLIENT_ACKNOWLEDGE
480    * @see Session#DUPS_OK_ACKNOWLEDGE
481    * @since 1.1
482    */
483   public Session createSession(boolean transacted, int acknowledgeMode,
484       boolean optimizedDispatch) throws JMSException {
485     checkClosed();
486     sendConnectionInfoToBroker();
487     return new ActiveMQSession(this,
488         (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode),
489         optimizedDispatch);
490   }
491 
492   /**
493    * Gets the client identifier for this connection.
494    * <P>
495    * This value is specific to the JMS provider. It is either preconfigured by
496    * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
497    * dynamically by the application by calling the <code>setClientID</code>
498    * method.
499    * 
500    * @return the unique client identifier
501    * @throws JMSException
502    *             if the JMS provider fails to return the client ID for this
503    *             connection due to some internal error.
504    */
505   public String getClientID() throws JMSException {
506     checkClosed();
507     return this.clientID;
508   }
509 
510   /**
511    * Sets the client identifier for this connection.
512    * <P>
513    * The preferred way to assign a JMS client's client identifier is for it to
514    * be configured in a client-specific <CODE>ConnectionFactory</CODE>
515    * object and transparently assigned to the <CODE>Connection</CODE> object
516    * it creates.
517    * <P>
518    * Alternatively, a client can set a connection's client identifier using a
519    * provider-specific value. The facility to set a connection's client
520    * identifier explicitly is not a mechanism for overriding the identifier
521    * that has been administratively configured. It is provided for the case
522    * where no administratively specified identifier exists. If one does exist,
523    * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
524    * If a client sets the client identifier explicitly, it must do so
525    * immediately after it creates the connection and before any other action
526    * on the connection is taken. After this point, setting the client
527    * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
528    * <P>
529    * The purpose of the client identifier is to associate a connection and its
530    * objects with a state maintained on behalf of the client by a provider.
531    * The only such state identified by the JMS API is that required to support
532    * durable subscriptions.
533    * <P>
534    * If another connection with the same <code>clientID</code> is already
535    * running when this method is called, the JMS provider should detect the
536    * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
537    * 
538    * @param newClientID
539    *            the unique client identifier
540    * @throws JMSException
541    *             if the JMS provider fails to set the client ID for this
542    *             connection due to some internal error.
543    * @throws javax.jms.InvalidClientIDException
544    *             if the JMS client specifies an invalid or duplicate client
545    *             ID.
546    * @throws javax.jms.IllegalStateException
547    *             if the JMS client attempts to set a connection's client ID at
548    *             the wrong time or when it has been administratively
549    *             configured.
550    */
551   public void setClientID(String newClientID) throws JMSException {
552     if (this.clientIDSet) {
553       throw new IllegalStateException("The clientID has already been set");
554     }
555     if (this.isConnectionInfoSentToBroker) {
556       throw new IllegalStateException(
557           "Setting clientID on a used Connection is not allowed");
558     }
559     checkClosed();
560     this.clientID = newClientID;
561     this.userSpecifiedClientID = true;
562     ensureClientIDInitialised();
563   }
564 
565   /**
566    * Gets the metadata for this connection.
567    * 
568    * @return the connection metadata
569    * @throws JMSException
570    *             if the JMS provider fails to get the connection metadata for
571    *             this connection.
572    * @see javax.jms.ConnectionMetaData
573    */
574   public ConnectionMetaData getMetaData() throws JMSException {
575     checkClosed();
576     return this.connectionMetaData;
577   }
578 
579   /**
580    * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
581    * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
582    * associated with it.
583    * 
584    * @return the <CODE>ExceptionListener</CODE> for this connection, or
585    *         null. if no <CODE>ExceptionListener</CODE> is associated with
586    *         this connection.
587    * @throws JMSException
588    *             if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
589    *             for this connection.
590    * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
591    */
592   public ExceptionListener getExceptionListener() throws JMSException {
593     checkClosed();
594     return this.exceptionListener;
595   }
596 
597   /**
598    * Sets an exception listener for this connection.
599    * <P>
600    * If a JMS provider detects a serious problem with a connection, it informs
601    * the connection's <CODE> ExceptionListener</CODE>, if one has been
602    * registered. It does this by calling the listener's <CODE>onException
603    * </CODE> method, passing it a <CODE>JMSException</CODE> object
604    * describing the problem.
605    * <P>
606    * An exception listener allows a client to be notified of a problem
607    * asynchronously. Some connections only consume messages, so they would
608    * have no other way to learn their connection has failed.
609    * <P>
610    * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
611    * <P>
612    * A JMS provider should attempt to resolve connection problems itself
613    * before it notifies the client of them.
614    * 
615    * @param listener
616    *            the exception listener
617    * @throws JMSException
618    *             if the JMS provider fails to set the exception listener for
619    *             this connection.
620    */
621   public void setExceptionListener(ExceptionListener listener)
622       throws JMSException {
623     checkClosed();
624     this.exceptionListener = listener;
625     this.transportChannel.setExceptionListener(listener);
626   }
627 
628   /**
629    * Starts (or restarts) a connection's delivery of incoming messages. A call
630    * to <CODE>start</CODE> on a connection that has already been started is
631    * ignored.
632    * 
633    * @throws JMSException
634    *             if the JMS provider fails to start message delivery due to
635    *             some internal error.
636    * @see javax.jms.Connection#stop()
637    */
638   public void start() throws JMSException {
639     checkClosed();
640     if (started.commit(false, true)) {
641       // We have a change in connection info to send.
642       // send the Connection info again
643       sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
644       for (Iterator i = sessions.iterator(); i.hasNext();) {
645         ActiveMQSession s = (ActiveMQSession) i.next();
646         s.start();
647       }
648     }
649   }
650 
651   /**
652    * @return true if this Connection is started
653    */
654   protected boolean isStarted() {
655     return started.get();
656   }
657 
658   /**
659    * Temporarily stops a connection's delivery of incoming messages. Delivery
660    * can be restarted using the connection's <CODE>start</CODE> method. When
661    * the connection is stopped, delivery to all the connection's message
662    * consumers is inhibited: synchronous receives block, and messages are not
663    * delivered to message listeners.
664    * <P>
665    * This call blocks until receives and/or message listeners in progress have
666    * completed.
667    * <P>
668    * Stopping a connection has no effect on its ability to send messages. A
669    * call to <CODE>stop</CODE> on a connection that has already been stopped
670    * is ignored.
671    * <P>
672    * A call to <CODE>stop</CODE> must not return until delivery of messages
673    * has paused. This means that a client can rely on the fact that none of
674    * its message listeners will be called and that all threads of control
675    * waiting for <CODE>receive</CODE> calls to return will not return with a
676    * message until the connection is restarted. The receive timers for a
677    * stopped connection continue to advance, so receives may time out while
678    * the connection is stopped.
679    * <P>
680    * If message listeners are running when <CODE>stop</CODE> is invoked, the
681    * <CODE>stop</CODE> call must wait until all of them have returned before
682    * it may return. While these message listeners are completing, they must
683    * have the full services of the connection available to them.
684    * 
685    * @throws JMSException
686    *             if the JMS provider fails to stop message delivery due to
687    *             some internal error.
688    * @see javax.jms.Connection#start()
689    */
690   public void stop() throws JMSException {
691     checkClosed();
692     if (started.commit(true, false)) {
693       for (Iterator i = sessions.iterator(); i.hasNext();) {
694         ActiveMQSession s = (ActiveMQSession) i.next();
695         s.stop();
696       }
697       sendConnectionInfoToBroker(2000, true, false);
698     }
699   }
700 
701   /**
702    * Closes the connection.
703    * <P>
704    * Since a provider typically allocates significant resources outside the
705    * JVM on behalf of a connection, clients should close these resources when
706    * they are not needed. Relying on garbage collection to eventually reclaim
707    * these resources may not be timely enough.
708    * <P>
709    * There is no need to close the sessions, producers, and consumers of a
710    * closed connection.
711    * <P>
712    * Closing a connection causes all temporary destinations to be deleted.
713    * <P>
714    * When this method is invoked, it should not return until message
715    * processing has been shut down in an orderly fashion. This means that all
716    * message listeners that may have been running have returned, and that all
717    * pending receives have returned. A close terminates all pending message
718    * receives on the connection's sessions' consumers. The receives may return
719    * with a message or with null, depending on whether there was a message
720    * available at the time of the close. If one or more of the connection's
721    * sessions' message listeners is processing a message at the time when
722    * connection <CODE>close</CODE> is invoked, all the facilities of the
723    * connection and its sessions must remain available to those listeners
724    * until they return control to the JMS provider.
725    * <P>
726    * Closing a connection causes any of its sessions' transactions in progress
727    * to be rolled back. In the case where a session's work is coordinated by
728    * an external transaction manager, a session's <CODE>commit</CODE> and
729    * <CODE> rollback</CODE> methods are not used and the result of a closed
730    * session's work is determined later by the transaction manager. Closing a
731    * connection does NOT force an acknowledgment of client-acknowledged
732    * sessions.
733    * <P>
734    * Invoking the <CODE>acknowledge</CODE> method of a received message from
735    * a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
736    * Closing a closed connection must NOT throw an exception.
737    * 
738    * @throws JMSException
739    *             if the JMS provider fails to close the connection due to some
740    *             internal error. For example, a failure to release resources
741    *             or to close a socket connection can cause this exception to
742    *             be thrown.
743    */
744   public void close() throws JMSException {
745     this.transportChannel.setPendingStop(true);
746     synchronized (this) {
747       if (!closed) {
748         memoryManager.removeCapacityEventListener(this);
749         try {
750           closeTemporaryDestinations();
751           for (Iterator i = this.sessions.iterator(); i.hasNext();) {
752             ActiveMQSession s = (ActiveMQSession) i.next();
753             s.close();
754           }
755           for (Iterator i = this.connectionConsumers.iterator(); i
756               .hasNext();) {
757             ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
758                 .next();
759             c.close();
760           }
761           try {
762             sendConnectionInfoToBroker(sendCloseTimeout, true, true);
763           } catch (TimeoutExpiredException e) {
764             log
765                 .warn("Failed to send close to broker, timeout expired of: "
766                     + sendCloseTimeout + " millis");
767           }
768           this.connectionConsumers.clear();
769           this.messageDispatchers.clear();
770           this.transportChannel.stop();
771         } finally {
772           this.sessions.clear();
773           started.set(false);
774           factory.onConnectionClose(this);
775         }
776         closed = true;
777         transientConsumedRedeliverCache.clear();
778         validDestinationsMap.clear();
779                  factoryStats.removeConnection(this);
780       }
781     }
782 
783   }
784 
785     /**
786      * Tells the broker to terminate its VM.  This can be used to cleanly terminate a broker running in
787      * a standalone java process.  Server must have property enable.vm.shutdown=true defined
788      * to allow this to work.
789      */
790     public void terminateBrokerVM() throws JMSException {
791         BrokerAdminCommand command = new BrokerAdminCommand();
792         command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
793         asyncSendPacket(command);
794     }
795 
796   /**
797    * simply throws an exception if the Connection is already closed
798    * 
799    * @throws JMSException
800    */
801   protected synchronized void checkClosed() throws JMSException {
802     if (!startedTransport) {
803       startedTransport = true;
804       this.transportChannel.setCachingEnabled(isCachingEnabled());
805       if (useAsyncSend == false) {
806         this.transportChannel.setNoDelay(true);
807       }
808 
809       this.transportChannel.setUsedInternally(internalConnection);
810       this.transportChannel.start();
811       if (transportChannel.doesSupportWireFormatVersioning()) {
812         WireFormatInfo info = new WireFormatInfo();
813         info.setVersion(transportChannel.getCurrentWireFormatVersion());
814         this.asyncSendPacket(info);
815       }
816     }
817     if (this.closed) {
818       throw new ConnectionClosedException();
819     }
820   }
821 
822   /**
823    * Creates a connection consumer for this connection (optional operation).
824    * This is an expert facility not used by regular JMS clients.
825    * 
826    * @param destination
827    *            the destination to access
828    * @param messageSelector
829    *            only messages with properties matching the message selector
830    *            expression are delivered. A value of null or an empty string
831    *            indicates that there is no message selector for the message
832    *            consumer.
833    * @param sessionPool
834    *            the server session pool to associate with this connection
835    *            consumer
836    * @param maxMessages
837    *            the maximum number of messages that can be assigned to a
838    *            server session at one time
839    * @return the connection consumer
840    * @throws JMSException
841    *             if the <CODE>Connection</CODE> object fails to create a
842    *             connection consumer due to some internal error or invalid
843    *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
844    * @throws javax.jms.InvalidDestinationException
845    *             if an invalid destination is specified.
846    * @throws javax.jms.InvalidSelectorException
847    *             if the message selector is invalid.
848    * @see javax.jms.ConnectionConsumer
849    * @since 1.1
850    */
851   public ConnectionConsumer createConnectionConsumer(Destination destination,
852       String messageSelector, ServerSessionPool sessionPool,
853       int maxMessages) throws JMSException {
854     checkClosed();
855     ensureClientIDInitialised();
856     ConsumerInfo info = new ConsumerInfo();
857     info.setConsumerId(handleIdGenerator.generateId());
858     info.setDestination(ActiveMQMessageTransformation
859         .transformDestination(destination));
860     info.setSelector(messageSelector);
861     info.setConsumerNo(handleIdGenerator.getNextShortSequence());
862     return new ActiveMQConnectionConsumer(this, sessionPool, info,
863         maxMessages);
864   }
865     
866     /**
867      * Creates a connection consumer for this connection (optional operation).
868      * This is an expert facility not used by regular JMS clients.
869      * 
870      * @param destination
871      *            the destination to access
872      * @param messageSelector
873      *            only messages with properties matching the message selector
874      *            expression are delivered. A value of null or an empty string
875      *            indicates that there is no message selector for the message
876      *            consumer.
877      * @param sessionPool
878      *            the server session pool to associate with this connection
879      *            consumer
880      * @param maxMessages
881      *            the maximum number of messages that can be assigned to a
882      *            server session at one time
883      * @param noLocal
884      *            set true if you want to filter out messages published locally
885      *           
886      * @return the connection consumer
887      * @throws JMSException
888      *             if the <CODE>Connection</CODE> object fails to create a
889      *             connection consumer due to some internal error or invalid
890      *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
891      * @throws javax.jms.InvalidDestinationException
892      *             if an invalid destination is specified.
893      * @throws javax.jms.InvalidSelectorException
894      *             if the message selector is invalid.
895      * @see javax.jms.ConnectionConsumer
896      * @since 1.1
897      */
898     public ConnectionConsumer createConnectionConsumer(Destination destination,
899             String messageSelector, ServerSessionPool sessionPool,
900             int maxMessages, boolean noLocal) throws JMSException {
901         
902         checkClosed();
903         ensureClientIDInitialised();
904         ConsumerInfo info = new ConsumerInfo();
905         info.setConsumerId(handleIdGenerator.generateId());
906         info.setDestination(ActiveMQMessageTransformation
907                 .transformDestination(destination));
908         info.setSelector(messageSelector);
909         info.setConsumerNo(handleIdGenerator.getNextShortSequence());
910         info.setNoLocal(noLocal);
911         return new ActiveMQConnectionConsumer(this, sessionPool, info,
912                 maxMessages);
913     }
914 
915 
916 
917   /**
918    * Create a durable connection consumer for this connection (optional
919    * operation). This is an expert facility not used by regular JMS clients.
920    * 
921    * @param topic
922    *            topic to access
923    * @param subscriptionName
924    *            durable subscription name
925    * @param messageSelector
926    *            only messages with properties matching the message selector
927    *            expression are delivered. A value of null or an empty string
928    *            indicates that there is no message selector for the message
929    *            consumer.
930    * @param sessionPool
931    *            the server session pool to associate with this durable
932    *            connection consumer
933    * @param maxMessages
934    *            the maximum number of messages that can be assigned to a
935    *            server session at one time
936    * @return the durable connection consumer
937    * @throws JMSException
938    *             if the <CODE>Connection</CODE> object fails to create a
939    *             connection consumer due to some internal error or invalid
940    *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
941    * @throws javax.jms.InvalidDestinationException
942    *             if an invalid destination is specified.
943    * @throws javax.jms.InvalidSelectorException
944    *             if the message selector is invalid.
945    * @see javax.jms.ConnectionConsumer
946    * @since 1.1
947    */
948   public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
949       String subscriptionName, String messageSelector,
950       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
951     checkClosed();
952     ensureClientIDInitialised();
953     ConsumerInfo info = new ConsumerInfo();
954     info.setConsumerId(this.handleIdGenerator.generateId());
955     info.setDestination(ActiveMQMessageTransformation
956         .transformDestination(topic));
957     info.setSelector(messageSelector);
958     info.setConsumerName(subscriptionName);
959     info.setConsumerNo(handleIdGenerator.getNextShortSequence());
960     return new ActiveMQConnectionConsumer(this, sessionPool, info,
961         maxMessages);
962   }
963     
964     /**
965      * Create a durable connection consumer for this connection (optional
966      * operation). This is an expert facility not used by regular JMS clients.
967      * 
968      * @param topic
969      *            topic to access
970      * @param subscriptionName
971      *            durable subscription name
972      * @param messageSelector
973      *            only messages with properties matching the message selector
974      *            expression are delivered. A value of null or an empty string
975      *            indicates that there is no message selector for the message
976      *            consumer.
977      * @param sessionPool
978      *            the server session pool to associate with this durable
979      *            connection consumer
980      * @param maxMessages
981      *            the maximum number of messages that can be assigned to a
982      *            server session at one time
983      * @param noLocal
984      *            set true if you want to filter out messages published locally
985      *            
986      * @return the durable connection consumer
987      * @throws JMSException
988      *             if the <CODE>Connection</CODE> object fails to create a
989      *             connection consumer due to some internal error or invalid
990      *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
991      * @throws javax.jms.InvalidDestinationException
992      *             if an invalid destination is specified.
993      * @throws javax.jms.InvalidSelectorException
994      *             if the message selector is invalid.
995      * @see javax.jms.ConnectionConsumer
996      * @since 1.1
997      */
998     public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
999             String subscriptionName, String messageSelector,
1000            ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
1001        checkClosed();
1002        ensureClientIDInitialised();
1003        ConsumerInfo info = new ConsumerInfo();
1004        info.setConsumerId(this.handleIdGenerator.generateId());
1005        info.setDestination(ActiveMQMessageTransformation
1006                .transformDestination(topic));
1007        info.setSelector(messageSelector);
1008        info.setConsumerName(subscriptionName);
1009        info.setNoLocal(noLocal);
1010        info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1011        return new ActiveMQConnectionConsumer(this, sessionPool, info,
1012                maxMessages);
1013    }
1014
1015  /**
1016     * Implementation of the PacketListener interface - consume a packet
1017     * 
1018     * @param packet -
1019     *            the Packet to consume
1020     * @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet)
1021     */
1022    public void consume(Packet packet) {
1023        if (!closed && packet != null) {
1024            if (packet.isJMSMessage()) {
1025                ActiveMQMessage message = (ActiveMQMessage) packet;
1026                message.setReadOnly(true);
1027                message.setConsumerIdentifer(clientID);
1028
1029                // lets check for expired messages which is only relevant for
1030                // multicast based stuff
1031                // as a pointcast based network should filter out this stuff
1032                if (transportChannel.isMulticast()) {
1033                    long expiration = message.getJMSExpiration();
1034                    if (expiration > 0) {
1035                        long timeStamp = System.currentTimeMillis();
1036                        if (timeStamp > expiration) {
1037                            if (log.isDebugEnabled()) {
1038                                log.debug("Discarding expired message: " + message);
1039                            }
1040                            return;
1041                        }
1042                    }
1043                }
1044
1045                try {                    
1046                    message = assembleMessage(message);
1047                    if( message !=null ) {
1048                        int count = 0;
1049                        for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) {
1050                            ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next();
1051                            if (dispatcher.isTarget(message)) {
1052                                if (count > 0) {
1053                                    // separate message for each Session etc.
1054                                    message = message.deepCopy();
1055                                }
1056                                dispatcher.dispatch(message);
1057                                count++;
1058                            }
1059                        }
1060                    }
1061                } catch (JMSException jmsEx) {
1062                    handleAsyncException(jmsEx);
1063                }
1064            } else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
1065                CapacityInfo info = (CapacityInfo) packet;
1066                flowControlSleepTime = info.getFlowControlTimeout();
1067                // System.out.println("SET FLOW TIMEOUT = " +
1068                // flowControlSleepTime + " FOR " + info);
1069            } else if (packet.getPacketType() == Packet.KEEP_ALIVE && packet.isReceiptRequired()) {
1070                Receipt receipt = new Receipt();
1071                receipt.setCorrelationId(packet.getId());
1072                receipt.setReceiptRequired(false);
1073                try {
1074                    asyncSendPacket(receipt);
1075                } catch (JMSException jmsEx) {
1076                    handleAsyncException(jmsEx);
1077                }
1078            }
1079        }
1080    }
1081
1082    private final ActiveMQMessage assembleMessage(ActiveMQMessage message) {
1083        ActiveMQMessage result = message;
1084        if (message != null && !isInternalConnection() && message.isMessagePart()) {
1085            if (message.getNumberOfParts() == 1) {
1086                //passed though from another session - i.e.
1087                //a network or remote connection and now assembled
1088                message.resetMessagePart();
1089                result = message;
1090            }
1091            else {
1092                result = null;
1093                String parentId = message.getParentMessageID();
1094                ActiveMQMessage[] array = (ActiveMQMessage[]) assemblies.get(parentId);
1095                if (array == null) {
1096                    array = new ActiveMQMessage[message.getNumberOfParts()];
1097                    assemblies.put(parentId, array);
1098                }
1099                array[message.getPartNumber()] = message;
1100                boolean complete = true;
1101                for (int i = 0;i < array.length;i++) {
1102                    complete &= array[i] != null;
1103                }
1104                if (complete) {
1105                    result = array[0];
1106                    ByteArray[] bas = new ByteArray[array.length];
1107                    try {
1108                        for (int i = 0;i < bas.length;i++) {
1109                            bas[i] = array[i].getBodyAsBytes();
1110                            if (i >= 1){
1111                                array[i].clearBody();
1112                            }
1113                        }
1114                        ByteArray ba = fragmentation.assemble(bas);
1115                        result.setBodyAsBytes(ba);
1116                    }
1117                    catch (IOException ioe) {
1118                        JMSException jmsEx = new JMSException("Failed to assemble fragment message: " + parentId);
1119                        jmsEx.setLinkedException(ioe);
1120                        onException(jmsEx);
1121                    }catch(JMSException jmsEx){
1122                        onException(jmsEx);  
1123                    }
1124                }
1125            }
1126        }
1127        return result;
1128    }
1129
1130  /**
1131   * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
1132   */
1133  public void onException(JMSException jmsEx) {
1134    // Got an exception propagated up from the transport channel
1135    handleAsyncException(jmsEx);
1136    isTransportOK = false;
1137    try {
1138      close();
1139    } catch (JMSException ex) {
1140      log.debug("Exception closing the connection", ex);
1141    }
1142  }
1143
1144  /**
1145   * Creates a <CODE>TopicSession</CODE> object.
1146   * 
1147   * @param transacted
1148   *            indicates whether the session is transacted
1149   * @param acknowledgeMode
1150   *            indicates whether the consumer or the client will acknowledge
1151   *            any messages it receives; ignored if the session is
1152   *            transacted. Legal values are
1153   *            <code>Session.AUTO_ACKNOWLEDGE</code>,
1154   *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1155   *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1156   * @return a newly created topic session
1157   * @throws JMSException
1158   *             if the <CODE>TopicConnection</CODE> object fails to create
1159   *             a session due to some internal error or lack of support for
1160   *             the specific transaction and acknowledgement mode.
1161   * @see Session#AUTO_ACKNOWLEDGE
1162   * @see Session#CLIENT_ACKNOWLEDGE
1163   * @see Session#DUPS_OK_ACKNOWLEDGE
1164   */
1165  public TopicSession createTopicSession(boolean transacted,
1166      int acknowledgeMode) throws JMSException {
1167      checkClosed();
1168      sendConnectionInfoToBroker();
1169    return new ActiveMQTopicSession((ActiveMQSession) createSession(
1170        transacted, acknowledgeMode));
1171  }
1172
1173  /**
1174   * Creates a connection consumer for this connection (optional operation).
1175   * This is an expert facility not used by regular JMS clients.
1176   * 
1177   * @param topic
1178   *            the topic to access
1179   * @param messageSelector
1180   *            only messages with properties matching the message selector
1181   *            expression are delivered. A value of null or an empty string
1182   *            indicates that there is no message selector for the message
1183   *            consumer.
1184   * @param sessionPool
1185   *            the server session pool to associate with this connection
1186   *            consumer
1187   * @param maxMessages
1188   *            the maximum number of messages that can be assigned to a
1189   *            server session at one time
1190   * @return the connection consumer
1191   * @throws JMSException
1192   *             if the <CODE>TopicConnection</CODE> object fails to create
1193   *             a connection consumer due to some internal error or invalid
1194   *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1195   * @throws InvalidDestinationException
1196   *             if an invalid topic is specified.
1197   * @throws InvalidSelectorException
1198   *             if the message selector is invalid.
1199   * @see javax.jms.ConnectionConsumer
1200   */
1201  public ConnectionConsumer createConnectionConsumer(Topic topic,
1202      String messageSelector, ServerSessionPool sessionPool,
1203      int maxMessages) throws JMSException {
1204    checkClosed();
1205    ensureClientIDInitialised();
1206    ConsumerInfo info = new ConsumerInfo();
1207    info.setConsumerId(this.handleIdGenerator.generateId());
1208    info.setDestination(ActiveMQMessageTransformation
1209        .transformDestination(topic));
1210    info.setSelector(messageSelector);
1211    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1212    return new ActiveMQConnectionConsumer(this, sessionPool, info,
1213        maxMessages);
1214  }
1215
1216  /**
1217   * Creates a <CODE>QueueSession</CODE> object.
1218   * 
1219   * @param transacted
1220   *            indicates whether the session is transacted
1221   * @param acknowledgeMode
1222   *            indicates whether the consumer or the client will acknowledge
1223   *            any messages it receives; ignored if the session is
1224   *            transacted. Legal values are
1225   *            <code>Session.AUTO_ACKNOWLEDGE</code>,
1226   *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1227   *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1228   * @return a newly created queue session
1229   * @throws JMSException
1230   *             if the <CODE>QueueConnection</CODE> object fails to create
1231   *             a session due to some internal error or lack of support for
1232   *             the specific transaction and acknowledgement mode.
1233   * @see Session#AUTO_ACKNOWLEDGE
1234   * @see Session#CLIENT_ACKNOWLEDGE
1235   * @see Session#DUPS_OK_ACKNOWLEDGE
1236   */
1237  public QueueSession createQueueSession(boolean transacted,
1238      int acknowledgeMode) throws JMSException {
1239      checkClosed();
1240      sendConnectionInfoToBroker();
1241    return new ActiveMQQueueSession((ActiveMQSession) createSession(
1242        transacted, acknowledgeMode));
1243  }
1244
1245  /**
1246   * Creates a connection consumer for this connection (optional operation).
1247   * This is an expert facility not used by regular JMS clients.
1248   * 
1249   * @param queue
1250   *            the queue to access
1251   * @param messageSelector
1252   *            only messages with properties matching the message selector
1253   *            expression are delivered. A value of null or an empty string
1254   *            indicates that there is no message selector for the message
1255   *            consumer.
1256   * @param sessionPool
1257   *            the server session pool to associate with this connection
1258   *            consumer
1259   * @param maxMessages
1260   *            the maximum number of messages that can be assigned to a
1261   *            server session at one time
1262   * @return the connection consumer
1263   * @throws JMSException
1264   *             if the <CODE>QueueConnection</CODE> object fails to create
1265   *             a connection consumer due to some internal error or invalid
1266   *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1267   * @throws InvalidDestinationException
1268   *             if an invalid queue is specified.
1269   * @throws InvalidSelectorException
1270   *             if the message selector is invalid.
1271   * @see javax.jms.ConnectionConsumer
1272   */
1273  public ConnectionConsumer createConnectionConsumer(Queue queue,
1274      String messageSelector, ServerSessionPool sessionPool,
1275      int maxMessages) throws JMSException {
1276    checkClosed();
1277    ensureClientIDInitialised();
1278    ConsumerInfo info = new ConsumerInfo();
1279    info.setConsumerId(this.handleIdGenerator.generateId());
1280    info.setDestination(ActiveMQMessageTransformation
1281        .transformDestination(queue));
1282    info.setSelector(messageSelector);
1283    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1284    return new ActiveMQConnectionConsumer(this, sessionPool, info,
1285        maxMessages);
1286  }
1287
1288  /**
1289   * Ensures that the clientID was manually specified and not auto-generated.
1290   * If the clientID was not specified this method will throw an exception.
1291   * This method is used to ensure that the clientID + durableSubscriber name
1292   * are used correctly.
1293   * 
1294   * @throws JMSException
1295   */
1296  public void checkClientIDWasManuallySpecified() throws JMSException {
1297    if (!userSpecifiedClientID) {
1298      throw new JMSException(
1299          "You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1300    }
1301  }
1302
1303  /**
1304   * handle disconnect/reconnect events
1305   * 
1306   * @param event
1307   */
1308  public void statusChanged(TransportStatusEvent event) {
1309    log.info("channel status changed: " + event);
1310    if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
1311      isTransportOK = true;
1312      doReconnect();
1313
1314    } else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
1315      isTransportOK = false;
1316      clearMessagesInProgress();
1317    }
1318  }
1319
1320  /**
1321   * send a Packet through the Connection - for internal use only
1322   * 
1323   * @param packet
1324   * @throws JMSException
1325   */
1326  public void asyncSendPacket(Packet packet) throws JMSException {
1327    asyncSendPacket(packet, true);
1328  }
1329
1330  /**
1331   * send a Packet through the Connection - for internal use only
1332   * 
1333   * @param packet
1334   * @param doSendWhileReconnecting
1335   * @throws JMSException
1336   */
1337  public