Source code: org/activemq/ActiveMQConnection.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23 import java.util.Map;
24
25 import javax.jms.Connection;
26 import javax.jms.ConnectionConsumer;
27 import javax.jms.ConnectionMetaData;
28 import javax.jms.DeliveryMode;
29 import javax.jms.Destination;
30 import javax.jms.ExceptionListener;
31 import javax.jms.IllegalStateException;
32 import javax.jms.JMSException;
33 import javax.jms.Queue;
34 import javax.jms.QueueConnection;
35 import javax.jms.QueueSession;
36 import javax.jms.ServerSessionPool;
37 import javax.jms.Session;
38 import javax.jms.Topic;
39 import javax.jms.TopicConnection;
40 import javax.jms.TopicSession;
41 import javax.jms.XAConnection;
42
43 import org.activemq.advisories.TempDestinationAdvisor;
44 import org.activemq.advisories.TempDestinationAdvisoryEvent;
45 import org.activemq.capacity.CapacityMonitorEvent;
46 import org.activemq.capacity.CapacityMonitorEventListener;
47 import org.activemq.filter.AndFilter;
48 import org.activemq.filter.Filter;
49 import org.activemq.filter.FilterFactory;
50 import org.activemq.filter.FilterFactoryImpl;
51 import org.activemq.filter.NoLocalFilter;
52 import org.activemq.io.util.ByteArray;
53 import org.activemq.io.util.ByteArrayCompression;
54 import org.activemq.io.util.ByteArrayFragmentation;
55 import org.activemq.io.util.MemoryBoundedObjectManager;
56 import org.activemq.io.util.MemoryBoundedQueue;
57 import org.activemq.io.util.MemoryBoundedQueueManager;
58 import org.activemq.management.JMSConnectionStatsImpl;
59 import org.activemq.management.JMSStatsImpl;
60 import org.activemq.management.StatsCapable;
61 import org.activemq.management.StatsImpl;
62 import org.activemq.message.ActiveMQDestination;
63 import org.activemq.message.ActiveMQMessage;
64 import org.activemq.message.ActiveMQObjectMessage;
65 import org.activemq.message.BrokerAdminCommand;
66 import org.activemq.message.CapacityInfo;
67 import org.activemq.message.CleanupConnectionInfo;
68 import org.activemq.message.ConnectionInfo;
69 import org.activemq.message.ConsumerInfo;
70 import org.activemq.message.Packet;
71 import org.activemq.message.PacketListener;
72 import org.activemq.message.ProducerInfo;
73 import org.activemq.message.Receipt;
74 import org.activemq.message.ResponseReceipt;
75 import org.activemq.message.SessionInfo;
76 import org.activemq.message.TransactionInfo;
77 import org.activemq.message.WireFormatInfo;
78 import org.activemq.message.XATransactionInfo;
79 import org.activemq.transport.TransportChannel;
80 import org.activemq.transport.TransportStatusEvent;
81 import org.activemq.transport.TransportStatusEventListener;
82 import org.activemq.util.IdGenerator;
83 import org.activemq.util.JMSExceptionHelper;
84 import org.apache.commons.logging.Log;
85 import org.apache.commons.logging.LogFactory;
86
87 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
88 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
89 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
90 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
91
92 /**
93 * A <CODE>Connection</CODE> object is a client's active connection to its JMS
94 * provider. It typically allocates provider resources outside the Java virtual
95 * machine (JVM).
96 * <P>
97 * Connections support concurrent use.
98 * <P>
99 * A connection serves several purposes:
100 * <UL>
101 * <LI>It encapsulates an open connection with a JMS provider. It typically
102 * represents an open TCP/IP socket between a client and the service provider
103 * software.
104 * <LI>Its creation is where client authentication takes place.
105 * <LI>It can specify a unique client identifier.
106 * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
107 * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
108 * </UL>
109 * <P>
110 * Because the creation of a connection involves setting up authentication and
111 * communication, a connection is a relatively heavyweight object. Most clients
112 * will do all their messaging with a single connection. Other more advanced
113 * applications may use several connections. The JMS API does not architect a
114 * reason for using multiple connections; however, there may be operational
115 * reasons for doing so.
116 * <P>
117 * A JMS client typically creates a connection, one or more sessions, and a
118 * number of message producers and consumers. When a connection is created, it
119 * is in stopped mode. That means that no messages are being delivered.
120 * <P>
121 * It is typical to leave the connection in stopped mode until setup is complete
122 * (that is, until all message consumers have been created). At that point, the
123 * client calls the connection's <CODE>start</CODE> method, and messages begin
124 * arriving at the connection's consumers. This setup convention minimizes any
125 * client confusion that may result from asynchronous message delivery while the
126 * client is still in the process of setting itself up.
127 * <P>
128 * A connection can be started immediately, and the setup can be done
129 * afterwards. Clients that do this must be prepared to handle asynchronous
130 * message delivery while they are still in the process of setting up.
131 * <P>
132 * A message producer can send messages while a connection is stopped. <p/>This
133 * class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE>
134 * object is an active connection to a publish/subscribe JMS provider. A client
135 * uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE>
136 * objects for producing and consuming messages.
137 * <P>
138 * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>,
139 * from which specialized topic-related objects can be created. A more general,
140 * and recommended approach is to use the <CODE>Connection </CODE> object.
141 * <P>
142 * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE>
143 * object is an active connection to a point-to-point JMS provider. A client
144 * uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE>
145 * objects for producing and consuming messages.
146 * <P>
147 * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>,
148 * from which specialized queue-related objects can be created. A more general,
149 * and recommended, approach is to use the <CODE>Connection </CODE> object.
150 * <P>
151 * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to
152 * the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE>
153 * method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE>
154 * if used from <CODE>QueueConnection</CODE>. // *
155 *
156 * @version $Revision: 1.1.1.1 $
157 * @see javax.jms.Connection
158 * @see javax.jms.ConnectionFactory
159 * @see javax.jms.QueueConnection
160 * @see javax.jms.TopicConnection
161 * @see javax.jms.TopicConnectionFactory
162 * @see javax.jms.QueueConnection
163 * @see javax.jms.QueueConnectionFactory
164 */
165 public class ActiveMQConnection implements Connection, PacketListener,
166 ExceptionListener, TopicConnection, QueueConnection, StatsCapable,
167 CapacityMonitorEventListener, TransportStatusEventListener, Closeable {
168
169 /**
170 * Default UserName for the Connection
171 */
172 public static final String DEFAULT_USER = "defaultUser";
173
174 /**
175 * Default URL for the ActiveMQ Broker
176 */
177 public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
178
179 /**
180 * Default client URL. If using a message broker in a hub(s)/spoke
181 * architecture - use the DEFAULT_BROKER_URL
182 *
183 * @see ActiveMQConnection#DEFAULT_BROKER_URL
184 */
185 public static final String DEFAULT_URL = "peer://development";
186
187 /**
188 * Default Password for the Connection
189 */
190 public static final String DEFAULT_PASSWORD = "defaultPassword";
191
192 private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
193
194 private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
195
196 // properties
197 private ActiveMQConnectionFactory factory;
198
199 private String userName;
200
201 private String password;
202
203 protected String clientID;
204
205 private int sendCloseTimeout = 2000;
206
207 private TransportChannel transportChannel;
208
209 private ExceptionListener exceptionListener;
210
211 private ActiveMQPrefetchPolicy prefetchPolicy;
212
213 private JMSStatsImpl factoryStats;
214
215 private MemoryBoundedObjectManager memoryManager;
216
217 private MemoryBoundedQueueManager boundedQueueManager;
218
219 protected IdGenerator handleIdGenerator;
220
221 private IdGenerator clientIdGenerator;
222
223 protected IdGenerator packetIdGenerator;
224
225 private IdGenerator sessionIdGenerator;
226
227 private JMSConnectionStatsImpl stats;
228
229 // internal state
230 private CopyOnWriteArrayList sessions;
231
232 private CopyOnWriteArrayList messageDispatchers;
233
234 private CopyOnWriteArrayList connectionConsumers;
235
236 private SynchronizedInt consumerNumberGenerator;
237
238 private ActiveMQConnectionMetaData connectionMetaData;
239
240 private boolean closed;
241
242 private SynchronizedBoolean started;
243
244 private boolean clientIDSet;
245
246 private boolean isConnectionInfoSentToBroker;
247
248 private boolean isTransportOK;
249
250 private boolean startedTransport;
251
252 private long startTime;
253
254 private long flowControlSleepTime = 0;
255
256 private boolean quickClose;
257
258 private boolean internalConnection;// used for notifying that the
259 // connection is used for networks etc.
260
261 private boolean userSpecifiedClientID;
262
263 /**
264 * Should we use an async send for persistent non transacted messages ?
265 */
266 protected boolean useAsyncSend = true;
267
268 private int sendConnectionInfoTimeout = 30000;
269
270 private boolean disableTimeStampsByDefault = false;
271
272 private boolean J2EEcompliant = true;
273
274 private boolean prepareMessageBodyOnSend = true;
275
276 private boolean copyMessageOnSend = true;
277
278 // compression and fragmentation variables
279
280 private boolean doMessageCompression = true;
281
282 private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data
283 // size
284 // above
285 // which
286 // compression
287 // will
288 // be
289 // used
290
291 private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
292
293 private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default
294 // compression
295 // strategy
296
297 private boolean doMessageFragmentation = false;
298
299 private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
300
301 private boolean cachingEnabled = true;
302
303 private boolean optimizedMessageDispatch = false;
304
305 private CopyOnWriteArrayList transientConsumedRedeliverCache;
306
307 private FilterFactory filterFactory;
308
309 private Map tempDestinationMap;
310
311 private Map validDestinationsMap;
312
313 private String resourceManagerId;
314 //used for assembling message fragments
315 private final ConcurrentHashMap assemblies= new ConcurrentHashMap();
316 private final ByteArrayFragmentation fragmentation = new ByteArrayFragmentation();
317
318 /**
319 * A static helper method to create a new connection
320 *
321 * @return an ActiveMQConnection
322 * @throws JMSException
323 */
324 public static ActiveMQConnection makeConnection() throws JMSException {
325 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
326 return (ActiveMQConnection) factory.createConnection();
327 }
328
329 /**
330 * A static helper method to create a new connection
331 *
332 * @param uri
333 * @return and ActiveMQConnection
334 * @throws JMSException
335 */
336 public static ActiveMQConnection makeConnection(String uri)
337 throws JMSException {
338 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
339 return (ActiveMQConnection) factory.createConnection();
340 }
341
342 /**
343 * A static helper method to create a new connection
344 *
345 * @param user
346 * @param password
347 * @param uri
348 * @return an ActiveMQConnection
349 * @throws JMSException
350 */
351 public static ActiveMQConnection makeConnection(String user,
352 String password, String uri) throws JMSException {
353 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,
354 password, uri);
355 return (ActiveMQConnection) factory.createConnection();
356 }
357
358 /**
359 * Constructs a connection from an existing TransportChannel and
360 * user/password.
361 *
362 * @param factory
363 * @param theUserName
364 * the users name
365 * @param thePassword
366 * the password
367 * @param transportChannel
368 * the transport channel to communicate with the server
369 * @throws JMSException
370 */
371 public ActiveMQConnection(ActiveMQConnectionFactory factory,
372 String theUserName, String thePassword,
373 TransportChannel transportChannel) throws JMSException {
374 this(factory, theUserName, thePassword);
375 this.transportChannel = transportChannel;
376 this.transportChannel.setPacketListener(this);
377 this.transportChannel.setExceptionListener(this);
378 this.transportChannel.addTransportStatusEventListener(this);
379 this.isTransportOK = true;
380 }
381
382 protected ActiveMQConnection(ActiveMQConnectionFactory factory,
383 String theUserName, String thePassword) {
384 this.factory = factory;
385 this.userName = theUserName;
386 this.password = thePassword;
387 this.clientIdGenerator = new IdGenerator();
388 this.packetIdGenerator = new IdGenerator();
389 this.handleIdGenerator = new IdGenerator();
390 this.sessionIdGenerator = new IdGenerator();
391 this.consumerNumberGenerator = new SynchronizedInt(0);
392 this.sessions = new CopyOnWriteArrayList();
393 this.messageDispatchers = new CopyOnWriteArrayList();
394 this.connectionConsumers = new CopyOnWriteArrayList();
395 this.connectionMetaData = new ActiveMQConnectionMetaData();
396 this.started = new SynchronizedBoolean(false);
397 this.startTime = System.currentTimeMillis();
398 this.prefetchPolicy = new ActiveMQPrefetchPolicy();
399 this.memoryManager = new MemoryBoundedObjectManager(clientID,
400 DEFAULT_CONNECTION_MEMORY_LIMIT);
401 this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager);
402 this.memoryManager.addCapacityEventListener(this);
403 boolean transactional = this instanceof XAConnection;
404 factoryStats = factory.getFactoryStats();
405 factoryStats.addConnection(this);
406 stats = new JMSConnectionStatsImpl(sessions, transactional);
407 this.transientConsumedRedeliverCache = new CopyOnWriteArrayList();
408 this.tempDestinationMap = new ConcurrentHashMap();
409 this.validDestinationsMap = new ConcurrentHashMap();
410 factory.onConnectionCreate(this);
411 }
412
413 /**
414 * @return statistics for this Connection
415 */
416 public StatsImpl getStats() {
417 return stats;
418 }
419
420 /**
421 * @return a number unique for this connection
422 */
423 public JMSConnectionStatsImpl getConnectionStats() {
424 return stats;
425 }
426
427 /**
428 * Creates a <CODE>Session</CODE> object.
429 *
430 * @param transacted
431 * indicates whether the session is transacted
432 * @param acknowledgeMode
433 * indicates whether the consumer or the client will acknowledge
434 * any messages it receives; ignored if the session is
435 * transacted. Legal values are
436 * <code>Session.AUTO_ACKNOWLEDGE</code>,
437 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
438 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
439 * @return a newly created session
440 * @throws JMSException
441 * if the <CODE>Connection</CODE> object fails to create a
442 * session due to some internal error or lack of support for the
443 * specific transaction and acknowledgement mode.
444 * @see Session#AUTO_ACKNOWLEDGE
445 * @see Session#CLIENT_ACKNOWLEDGE
446 * @see Session#DUPS_OK_ACKNOWLEDGE
447 * @since 1.1
448 */
449 public Session createSession(boolean transacted, int acknowledgeMode)
450 throws JMSException {
451 checkClosed();
452 sendConnectionInfoToBroker();
453 return new ActiveMQSession(
454 this,
455 (transacted ? Session.SESSION_TRANSACTED
456 : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
457 : acknowledgeMode)));
458 }
459
460 /**
461 * Creates a <CODE>Session</CODE> object.
462 *
463 * @param transacted
464 * indicates whether the session is transacted
465 * @param acknowledgeMode
466 * indicates whether the consumer or the client will acknowledge
467 * any messages it receives; ignored if the session is
468 * transacted. Legal values are
469 * <code>Session.AUTO_ACKNOWLEDGE</code>,
470 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
471 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
472 * @param optimizedDispatch
473 * @return a newly created session
474 * @throws JMSException
475 * if the <CODE>Connection</CODE> object fails to create a
476 * session due to some internal error or lack of support for the
477 * specific transaction and acknowledgement mode.
478 * @see Session#AUTO_ACKNOWLEDGE
479 * @see Session#CLIENT_ACKNOWLEDGE
480 * @see Session#DUPS_OK_ACKNOWLEDGE
481 * @since 1.1
482 */
483 public Session createSession(boolean transacted, int acknowledgeMode,
484 boolean optimizedDispatch) throws JMSException {
485 checkClosed();
486 sendConnectionInfoToBroker();
487 return new ActiveMQSession(this,
488 (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode),
489 optimizedDispatch);
490 }
491
492 /**
493 * Gets the client identifier for this connection.
494 * <P>
495 * This value is specific to the JMS provider. It is either preconfigured by
496 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
497 * dynamically by the application by calling the <code>setClientID</code>
498 * method.
499 *
500 * @return the unique client identifier
501 * @throws JMSException
502 * if the JMS provider fails to return the client ID for this
503 * connection due to some internal error.
504 */
505 public String getClientID() throws JMSException {
506 checkClosed();
507 return this.clientID;
508 }
509
510 /**
511 * Sets the client identifier for this connection.
512 * <P>
513 * The preferred way to assign a JMS client's client identifier is for it to
514 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
515 * object and transparently assigned to the <CODE>Connection</CODE> object
516 * it creates.
517 * <P>
518 * Alternatively, a client can set a connection's client identifier using a
519 * provider-specific value. The facility to set a connection's client
520 * identifier explicitly is not a mechanism for overriding the identifier
521 * that has been administratively configured. It is provided for the case
522 * where no administratively specified identifier exists. If one does exist,
523 * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
524 * If a client sets the client identifier explicitly, it must do so
525 * immediately after it creates the connection and before any other action
526 * on the connection is taken. After this point, setting the client
527 * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
528 * <P>
529 * The purpose of the client identifier is to associate a connection and its
530 * objects with a state maintained on behalf of the client by a provider.
531 * The only such state identified by the JMS API is that required to support
532 * durable subscriptions.
533 * <P>
534 * If another connection with the same <code>clientID</code> is already
535 * running when this method is called, the JMS provider should detect the
536 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
537 *
538 * @param newClientID
539 * the unique client identifier
540 * @throws JMSException
541 * if the JMS provider fails to set the client ID for this
542 * connection due to some internal error.
543 * @throws javax.jms.InvalidClientIDException
544 * if the JMS client specifies an invalid or duplicate client
545 * ID.
546 * @throws javax.jms.IllegalStateException
547 * if the JMS client attempts to set a connection's client ID at
548 * the wrong time or when it has been administratively
549 * configured.
550 */
551 public void setClientID(String newClientID) throws JMSException {
552 if (this.clientIDSet) {
553 throw new IllegalStateException("The clientID has already been set");
554 }
555 if (this.isConnectionInfoSentToBroker) {
556 throw new IllegalStateException(
557 "Setting clientID on a used Connection is not allowed");
558 }
559 checkClosed();
560 this.clientID = newClientID;
561 this.userSpecifiedClientID = true;
562 ensureClientIDInitialised();
563 }
564
565 /**
566 * Gets the metadata for this connection.
567 *
568 * @return the connection metadata
569 * @throws JMSException
570 * if the JMS provider fails to get the connection metadata for
571 * this connection.
572 * @see javax.jms.ConnectionMetaData
573 */
574 public ConnectionMetaData getMetaData() throws JMSException {
575 checkClosed();
576 return this.connectionMetaData;
577 }
578
579 /**
580 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
581 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
582 * associated with it.
583 *
584 * @return the <CODE>ExceptionListener</CODE> for this connection, or
585 * null. if no <CODE>ExceptionListener</CODE> is associated with
586 * this connection.
587 * @throws JMSException
588 * if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
589 * for this connection.
590 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
591 */
592 public ExceptionListener getExceptionListener() throws JMSException {
593 checkClosed();
594 return this.exceptionListener;
595 }
596
597 /**
598 * Sets an exception listener for this connection.
599 * <P>
600 * If a JMS provider detects a serious problem with a connection, it informs
601 * the connection's <CODE> ExceptionListener</CODE>, if one has been
602 * registered. It does this by calling the listener's <CODE>onException
603 * </CODE> method, passing it a <CODE>JMSException</CODE> object
604 * describing the problem.
605 * <P>
606 * An exception listener allows a client to be notified of a problem
607 * asynchronously. Some connections only consume messages, so they would
608 * have no other way to learn their connection has failed.
609 * <P>
610 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
611 * <P>
612 * A JMS provider should attempt to resolve connection problems itself
613 * before it notifies the client of them.
614 *
615 * @param listener
616 * the exception listener
617 * @throws JMSException
618 * if the JMS provider fails to set the exception listener for
619 * this connection.
620 */
621 public void setExceptionListener(ExceptionListener listener)
622 throws JMSException {
623 checkClosed();
624 this.exceptionListener = listener;
625 this.transportChannel.setExceptionListener(listener);
626 }
627
628 /**
629 * Starts (or restarts) a connection's delivery of incoming messages. A call
630 * to <CODE>start</CODE> on a connection that has already been started is
631 * ignored.
632 *
633 * @throws JMSException
634 * if the JMS provider fails to start message delivery due to
635 * some internal error.
636 * @see javax.jms.Connection#stop()
637 */
638 public void start() throws JMSException {
639 checkClosed();
640 if (started.commit(false, true)) {
641 // We have a change in connection info to send.
642 // send the Connection info again
643 sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
644 for (Iterator i = sessions.iterator(); i.hasNext();) {
645 ActiveMQSession s = (ActiveMQSession) i.next();
646 s.start();
647 }
648 }
649 }
650
651 /**
652 * @return true if this Connection is started
653 */
654 protected boolean isStarted() {
655 return started.get();
656 }
657
658 /**
659 * Temporarily stops a connection's delivery of incoming messages. Delivery
660 * can be restarted using the connection's <CODE>start</CODE> method. When
661 * the connection is stopped, delivery to all the connection's message
662 * consumers is inhibited: synchronous receives block, and messages are not
663 * delivered to message listeners.
664 * <P>
665 * This call blocks until receives and/or message listeners in progress have
666 * completed.
667 * <P>
668 * Stopping a connection has no effect on its ability to send messages. A
669 * call to <CODE>stop</CODE> on a connection that has already been stopped
670 * is ignored.
671 * <P>
672 * A call to <CODE>stop</CODE> must not return until delivery of messages
673 * has paused. This means that a client can rely on the fact that none of
674 * its message listeners will be called and that all threads of control
675 * waiting for <CODE>receive</CODE> calls to return will not return with a
676 * message until the connection is restarted. The receive timers for a
677 * stopped connection continue to advance, so receives may time out while
678 * the connection is stopped.
679 * <P>
680 * If message listeners are running when <CODE>stop</CODE> is invoked, the
681 * <CODE>stop</CODE> call must wait until all of them have returned before
682 * it may return. While these message listeners are completing, they must
683 * have the full services of the connection available to them.
684 *
685 * @throws JMSException
686 * if the JMS provider fails to stop message delivery due to
687 * some internal error.
688 * @see javax.jms.Connection#start()
689 */
690 public void stop() throws JMSException {
691 checkClosed();
692 if (started.commit(true, false)) {
693 for (Iterator i = sessions.iterator(); i.hasNext();) {
694 ActiveMQSession s = (ActiveMQSession) i.next();
695 s.stop();
696 }
697 sendConnectionInfoToBroker(2000, true, false);
698 }
699 }
700
701 /**
702 * Closes the connection.
703 * <P>
704 * Since a provider typically allocates significant resources outside the
705 * JVM on behalf of a connection, clients should close these resources when
706 * they are not needed. Relying on garbage collection to eventually reclaim
707 * these resources may not be timely enough.
708 * <P>
709 * There is no need to close the sessions, producers, and consumers of a
710 * closed connection.
711 * <P>
712 * Closing a connection causes all temporary destinations to be deleted.
713 * <P>
714 * When this method is invoked, it should not return until message
715 * processing has been shut down in an orderly fashion. This means that all
716 * message listeners that may have been running have returned, and that all
717 * pending receives have returned. A close terminates all pending message
718 * receives on the connection's sessions' consumers. The receives may return
719 * with a message or with null, depending on whether there was a message
720 * available at the time of the close. If one or more of the connection's
721 * sessions' message listeners is processing a message at the time when
722 * connection <CODE>close</CODE> is invoked, all the facilities of the
723 * connection and its sessions must remain available to those listeners
724 * until they return control to the JMS provider.
725 * <P>
726 * Closing a connection causes any of its sessions' transactions in progress
727 * to be rolled back. In the case where a session's work is coordinated by
728 * an external transaction manager, a session's <CODE>commit</CODE> and
729 * <CODE> rollback</CODE> methods are not used and the result of a closed
730 * session's work is determined later by the transaction manager. Closing a
731 * connection does NOT force an acknowledgment of client-acknowledged
732 * sessions.
733 * <P>
734 * Invoking the <CODE>acknowledge</CODE> method of a received message from
735 * a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
736 * Closing a closed connection must NOT throw an exception.
737 *
738 * @throws JMSException
739 * if the JMS provider fails to close the connection due to some
740 * internal error. For example, a failure to release resources
741 * or to close a socket connection can cause this exception to
742 * be thrown.
743 */
744 public void close() throws JMSException {
745 this.transportChannel.setPendingStop(true);
746 synchronized (this) {
747 if (!closed) {
748 memoryManager.removeCapacityEventListener(this);
749 try {
750 closeTemporaryDestinations();
751 for (Iterator i = this.sessions.iterator(); i.hasNext();) {
752 ActiveMQSession s = (ActiveMQSession) i.next();
753 s.close();
754 }
755 for (Iterator i = this.connectionConsumers.iterator(); i
756 .hasNext();) {
757 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
758 .next();
759 c.close();
760 }
761 try {
762 sendConnectionInfoToBroker(sendCloseTimeout, true, true);
763 } catch (TimeoutExpiredException e) {
764 log
765 .warn("Failed to send close to broker, timeout expired of: "
766 + sendCloseTimeout + " millis");
767 }
768 this.connectionConsumers.clear();
769 this.messageDispatchers.clear();
770 this.transportChannel.stop();
771 } finally {
772 this.sessions.clear();
773 started.set(false);
774 factory.onConnectionClose(this);
775 }
776 closed = true;
777 transientConsumedRedeliverCache.clear();
778 validDestinationsMap.clear();
779 factoryStats.removeConnection(this);
780 }
781 }
782
783 }
784
785 /**
786 * Tells the broker to terminate its VM. This can be used to cleanly terminate a broker running in
787 * a standalone java process. Server must have property enable.vm.shutdown=true defined
788 * to allow this to work.
789 */
790 public void terminateBrokerVM() throws JMSException {
791 BrokerAdminCommand command = new BrokerAdminCommand();
792 command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
793 asyncSendPacket(command);
794 }
795
796 /**
797 * simply throws an exception if the Connection is already closed
798 *
799 * @throws JMSException
800 */
801 protected synchronized void checkClosed() throws JMSException {
802 if (!startedTransport) {
803 startedTransport = true;
804 this.transportChannel.setCachingEnabled(isCachingEnabled());
805 if (useAsyncSend == false) {
806 this.transportChannel.setNoDelay(true);
807 }
808
809 this.transportChannel.setUsedInternally(internalConnection);
810 this.transportChannel.start();
811 if (transportChannel.doesSupportWireFormatVersioning()) {
812 WireFormatInfo info = new WireFormatInfo();
813 info.setVersion(transportChannel.getCurrentWireFormatVersion());
814 this.asyncSendPacket(info);
815 }
816 }
817 if (this.closed) {
818 throw new ConnectionClosedException();
819 }
820 }
821
822 /**
823 * Creates a connection consumer for this connection (optional operation).
824 * This is an expert facility not used by regular JMS clients.
825 *
826 * @param destination
827 * the destination to access
828 * @param messageSelector
829 * only messages with properties matching the message selector
830 * expression are delivered. A value of null or an empty string
831 * indicates that there is no message selector for the message
832 * consumer.
833 * @param sessionPool
834 * the server session pool to associate with this connection
835 * consumer
836 * @param maxMessages
837 * the maximum number of messages that can be assigned to a
838 * server session at one time
839 * @return the connection consumer
840 * @throws JMSException
841 * if the <CODE>Connection</CODE> object fails to create a
842 * connection consumer due to some internal error or invalid
843 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
844 * @throws javax.jms.InvalidDestinationException
845 * if an invalid destination is specified.
846 * @throws javax.jms.InvalidSelectorException
847 * if the message selector is invalid.
848 * @see javax.jms.ConnectionConsumer
849 * @since 1.1
850 */
851 public ConnectionConsumer createConnectionConsumer(Destination destination,
852 String messageSelector, ServerSessionPool sessionPool,
853 int maxMessages) throws JMSException {
854 checkClosed();
855 ensureClientIDInitialised();
856 ConsumerInfo info = new ConsumerInfo();
857 info.setConsumerId(handleIdGenerator.generateId());
858 info.setDestination(ActiveMQMessageTransformation
859 .transformDestination(destination));
860 info.setSelector(messageSelector);
861 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
862 return new ActiveMQConnectionConsumer(this, sessionPool, info,
863 maxMessages);
864 }
865
866 /**
867 * Creates a connection consumer for this connection (optional operation).
868 * This is an expert facility not used by regular JMS clients.
869 *
870 * @param destination
871 * the destination to access
872 * @param messageSelector
873 * only messages with properties matching the message selector
874 * expression are delivered. A value of null or an empty string
875 * indicates that there is no message selector for the message
876 * consumer.
877 * @param sessionPool
878 * the server session pool to associate with this connection
879 * consumer
880 * @param maxMessages
881 * the maximum number of messages that can be assigned to a
882 * server session at one time
883 * @param noLocal
884 * set true if you want to filter out messages published locally
885 *
886 * @return the connection consumer
887 * @throws JMSException
888 * if the <CODE>Connection</CODE> object fails to create a
889 * connection consumer due to some internal error or invalid
890 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
891 * @throws javax.jms.InvalidDestinationException
892 * if an invalid destination is specified.
893 * @throws javax.jms.InvalidSelectorException
894 * if the message selector is invalid.
895 * @see javax.jms.ConnectionConsumer
896 * @since 1.1
897 */
898 public ConnectionConsumer createConnectionConsumer(Destination destination,
899 String messageSelector, ServerSessionPool sessionPool,
900 int maxMessages, boolean noLocal) throws JMSException {
901
902 checkClosed();
903 ensureClientIDInitialised();
904 ConsumerInfo info = new ConsumerInfo();
905 info.setConsumerId(handleIdGenerator.generateId());
906 info.setDestination(ActiveMQMessageTransformation
907 .transformDestination(destination));
908 info.setSelector(messageSelector);
909 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
910 info.setNoLocal(noLocal);
911 return new ActiveMQConnectionConsumer(this, sessionPool, info,
912 maxMessages);
913 }
914
915
916
917 /**
918 * Create a durable connection consumer for this connection (optional
919 * operation). This is an expert facility not used by regular JMS clients.
920 *
921 * @param topic
922 * topic to access
923 * @param subscriptionName
924 * durable subscription name
925 * @param messageSelector
926 * only messages with properties matching the message selector
927 * expression are delivered. A value of null or an empty string
928 * indicates that there is no message selector for the message
929 * consumer.
930 * @param sessionPool
931 * the server session pool to associate with this durable
932 * connection consumer
933 * @param maxMessages
934 * the maximum number of messages that can be assigned to a
935 * server session at one time
936 * @return the durable connection consumer
937 * @throws JMSException
938 * if the <CODE>Connection</CODE> object fails to create a
939 * connection consumer due to some internal error or invalid
940 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
941 * @throws javax.jms.InvalidDestinationException
942 * if an invalid destination is specified.
943 * @throws javax.jms.InvalidSelectorException
944 * if the message selector is invalid.
945 * @see javax.jms.ConnectionConsumer
946 * @since 1.1
947 */
948 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
949 String subscriptionName, String messageSelector,
950 ServerSessionPool sessionPool, int maxMessages) throws JMSException {
951 checkClosed();
952 ensureClientIDInitialised();
953 ConsumerInfo info = new ConsumerInfo();
954 info.setConsumerId(this.handleIdGenerator.generateId());
955 info.setDestination(ActiveMQMessageTransformation
956 .transformDestination(topic));
957 info.setSelector(messageSelector);
958 info.setConsumerName(subscriptionName);
959 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
960 return new ActiveMQConnectionConsumer(this, sessionPool, info,
961 maxMessages);
962 }
963
964 /**
965 * Create a durable connection consumer for this connection (optional
966 * operation). This is an expert facility not used by regular JMS clients.
967 *
968 * @param topic
969 * topic to access
970 * @param subscriptionName
971 * durable subscription name
972 * @param messageSelector
973 * only messages with properties matching the message selector
974 * expression are delivered. A value of null or an empty string
975 * indicates that there is no message selector for the message
976 * consumer.
977 * @param sessionPool
978 * the server session pool to associate with this durable
979 * connection consumer
980 * @param maxMessages
981 * the maximum number of messages that can be assigned to a
982 * server session at one time
983 * @param noLocal
984 * set true if you want to filter out messages published locally
985 *
986 * @return the durable connection consumer
987 * @throws JMSException
988 * if the <CODE>Connection</CODE> object fails to create a
989 * connection consumer due to some internal error or invalid
990 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
991 * @throws javax.jms.InvalidDestinationException
992 * if an invalid destination is specified.
993 * @throws javax.jms.InvalidSelectorException
994 * if the message selector is invalid.
995 * @see javax.jms.ConnectionConsumer
996 * @since 1.1
997 */
998 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
999 String subscriptionName, String messageSelector,
1000 ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
1001 checkClosed();
1002 ensureClientIDInitialised();
1003 ConsumerInfo info = new ConsumerInfo();
1004 info.setConsumerId(this.handleIdGenerator.generateId());
1005 info.setDestination(ActiveMQMessageTransformation
1006 .transformDestination(topic));
1007 info.setSelector(messageSelector);
1008 info.setConsumerName(subscriptionName);
1009 info.setNoLocal(noLocal);
1010 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1011 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1012 maxMessages);
1013 }
1014
1015 /**
1016 * Implementation of the PacketListener interface - consume a packet
1017 *
1018 * @param packet -
1019 * the Packet to consume
1020 * @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet)
1021 */
1022 public void consume(Packet packet) {
1023 if (!closed && packet != null) {
1024 if (packet.isJMSMessage()) {
1025 ActiveMQMessage message = (ActiveMQMessage) packet;
1026 message.setReadOnly(true);
1027 message.setConsumerIdentifer(clientID);
1028
1029 // lets check for expired messages which is only relevant for
1030 // multicast based stuff
1031 // as a pointcast based network should filter out this stuff
1032 if (transportChannel.isMulticast()) {
1033 long expiration = message.getJMSExpiration();
1034 if (expiration > 0) {
1035 long timeStamp = System.currentTimeMillis();
1036 if (timeStamp > expiration) {
1037 if (log.isDebugEnabled()) {
1038 log.debug("Discarding expired message: " + message);
1039 }
1040 return;
1041 }
1042 }
1043 }
1044
1045 try {
1046 message = assembleMessage(message);
1047 if( message !=null ) {
1048 int count = 0;
1049 for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) {
1050 ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next();
1051 if (dispatcher.isTarget(message)) {
1052 if (count > 0) {
1053 // separate message for each Session etc.
1054 message = message.deepCopy();
1055 }
1056 dispatcher.dispatch(message);
1057 count++;
1058 }
1059 }
1060 }
1061 } catch (JMSException jmsEx) {
1062 handleAsyncException(jmsEx);
1063 }
1064 } else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
1065 CapacityInfo info = (CapacityInfo) packet;
1066 flowControlSleepTime = info.getFlowControlTimeout();
1067 // System.out.println("SET FLOW TIMEOUT = " +
1068 // flowControlSleepTime + " FOR " + info);
1069 } else if (packet.getPacketType() == Packet.KEEP_ALIVE && packet.isReceiptRequired()) {
1070 Receipt receipt = new Receipt();
1071 receipt.setCorrelationId(packet.getId());
1072 receipt.setReceiptRequired(false);
1073 try {
1074 asyncSendPacket(receipt);
1075 } catch (JMSException jmsEx) {
1076 handleAsyncException(jmsEx);
1077 }
1078 }
1079 }
1080 }
1081
1082 private final ActiveMQMessage assembleMessage(ActiveMQMessage message) {
1083 ActiveMQMessage result = message;
1084 if (message != null && !isInternalConnection() && message.isMessagePart()) {
1085 if (message.getNumberOfParts() == 1) {
1086 //passed though from another session - i.e.
1087 //a network or remote connection and now assembled
1088 message.resetMessagePart();
1089 result = message;
1090 }
1091 else {
1092 result = null;
1093 String parentId = message.getParentMessageID();
1094 ActiveMQMessage[] array = (ActiveMQMessage[]) assemblies.get(parentId);
1095 if (array == null) {
1096 array = new ActiveMQMessage[message.getNumberOfParts()];
1097 assemblies.put(parentId, array);
1098 }
1099 array[message.getPartNumber()] = message;
1100 boolean complete = true;
1101 for (int i = 0;i < array.length;i++) {
1102 complete &= array[i] != null;
1103 }
1104 if (complete) {
1105 result = array[0];
1106 ByteArray[] bas = new ByteArray[array.length];
1107 try {
1108 for (int i = 0;i < bas.length;i++) {
1109 bas[i] = array[i].getBodyAsBytes();
1110 if (i >= 1){
1111 array[i].clearBody();
1112 }
1113 }
1114 ByteArray ba = fragmentation.assemble(bas);
1115 result.setBodyAsBytes(ba);
1116 }
1117 catch (IOException ioe) {
1118 JMSException jmsEx = new JMSException("Failed to assemble fragment message: " + parentId);
1119 jmsEx.setLinkedException(ioe);
1120 onException(jmsEx);
1121 }catch(JMSException jmsEx){
1122 onException(jmsEx);
1123 }
1124 }
1125 }
1126 }
1127 return result;
1128 }
1129
1130 /**
1131 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
1132 */
1133 public void onException(JMSException jmsEx) {
1134 // Got an exception propagated up from the transport channel
1135 handleAsyncException(jmsEx);
1136 isTransportOK = false;
1137 try {
1138 close();
1139 } catch (JMSException ex) {
1140 log.debug("Exception closing the connection", ex);
1141 }
1142 }
1143
1144 /**
1145 * Creates a <CODE>TopicSession</CODE> object.
1146 *
1147 * @param transacted
1148 * indicates whether the session is transacted
1149 * @param acknowledgeMode
1150 * indicates whether the consumer or the client will acknowledge
1151 * any messages it receives; ignored if the session is
1152 * transacted. Legal values are
1153 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1154 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1155 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1156 * @return a newly created topic session
1157 * @throws JMSException
1158 * if the <CODE>TopicConnection</CODE> object fails to create
1159 * a session due to some internal error or lack of support for
1160 * the specific transaction and acknowledgement mode.
1161 * @see Session#AUTO_ACKNOWLEDGE
1162 * @see Session#CLIENT_ACKNOWLEDGE
1163 * @see Session#DUPS_OK_ACKNOWLEDGE
1164 */
1165 public TopicSession createTopicSession(boolean transacted,
1166 int acknowledgeMode) throws JMSException {
1167 checkClosed();
1168 sendConnectionInfoToBroker();
1169 return new ActiveMQTopicSession((ActiveMQSession) createSession(
1170 transacted, acknowledgeMode));
1171 }
1172
1173 /**
1174 * Creates a connection consumer for this connection (optional operation).
1175 * This is an expert facility not used by regular JMS clients.
1176 *
1177 * @param topic
1178 * the topic to access
1179 * @param messageSelector
1180 * only messages with properties matching the message selector
1181 * expression are delivered. A value of null or an empty string
1182 * indicates that there is no message selector for the message
1183 * consumer.
1184 * @param sessionPool
1185 * the server session pool to associate with this connection
1186 * consumer
1187 * @param maxMessages
1188 * the maximum number of messages that can be assigned to a
1189 * server session at one time
1190 * @return the connection consumer
1191 * @throws JMSException
1192 * if the <CODE>TopicConnection</CODE> object fails to create
1193 * a connection consumer due to some internal error or invalid
1194 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1195 * @throws InvalidDestinationException
1196 * if an invalid topic is specified.
1197 * @throws InvalidSelectorException
1198 * if the message selector is invalid.
1199 * @see javax.jms.ConnectionConsumer
1200 */
1201 public ConnectionConsumer createConnectionConsumer(Topic topic,
1202 String messageSelector, ServerSessionPool sessionPool,
1203 int maxMessages) throws JMSException {
1204 checkClosed();
1205 ensureClientIDInitialised();
1206 ConsumerInfo info = new ConsumerInfo();
1207 info.setConsumerId(this.handleIdGenerator.generateId());
1208 info.setDestination(ActiveMQMessageTransformation
1209 .transformDestination(topic));
1210 info.setSelector(messageSelector);
1211 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1212 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1213 maxMessages);
1214 }
1215
1216 /**
1217 * Creates a <CODE>QueueSession</CODE> object.
1218 *
1219 * @param transacted
1220 * indicates whether the session is transacted
1221 * @param acknowledgeMode
1222 * indicates whether the consumer or the client will acknowledge
1223 * any messages it receives; ignored if the session is
1224 * transacted. Legal values are
1225 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1226 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1227 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1228 * @return a newly created queue session
1229 * @throws JMSException
1230 * if the <CODE>QueueConnection</CODE> object fails to create
1231 * a session due to some internal error or lack of support for
1232 * the specific transaction and acknowledgement mode.
1233 * @see Session#AUTO_ACKNOWLEDGE
1234 * @see Session#CLIENT_ACKNOWLEDGE
1235 * @see Session#DUPS_OK_ACKNOWLEDGE
1236 */
1237 public QueueSession createQueueSession(boolean transacted,
1238 int acknowledgeMode) throws JMSException {
1239 checkClosed();
1240 sendConnectionInfoToBroker();
1241 return new ActiveMQQueueSession((ActiveMQSession) createSession(
1242 transacted, acknowledgeMode));
1243 }
1244
1245 /**
1246 * Creates a connection consumer for this connection (optional operation).
1247 * This is an expert facility not used by regular JMS clients.
1248 *
1249 * @param queue
1250 * the queue to access
1251 * @param messageSelector
1252 * only messages with properties matching the message selector
1253 * expression are delivered. A value of null or an empty string
1254 * indicates that there is no message selector for the message
1255 * consumer.
1256 * @param sessionPool
1257 * the server session pool to associate with this connection
1258 * consumer
1259 * @param maxMessages
1260 * the maximum number of messages that can be assigned to a
1261 * server session at one time
1262 * @return the connection consumer
1263 * @throws JMSException
1264 * if the <CODE>QueueConnection</CODE> object fails to create
1265 * a connection consumer due to some internal error or invalid
1266 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1267 * @throws InvalidDestinationException
1268 * if an invalid queue is specified.
1269 * @throws InvalidSelectorException
1270 * if the message selector is invalid.
1271 * @see javax.jms.ConnectionConsumer
1272 */
1273 public ConnectionConsumer createConnectionConsumer(Queue queue,
1274 String messageSelector, ServerSessionPool sessionPool,
1275 int maxMessages) throws JMSException {
1276 checkClosed();
1277 ensureClientIDInitialised();
1278 ConsumerInfo info = new ConsumerInfo();
1279 info.setConsumerId(this.handleIdGenerator.generateId());
1280 info.setDestination(ActiveMQMessageTransformation
1281 .transformDestination(queue));
1282 info.setSelector(messageSelector);
1283 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1284 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1285 maxMessages);
1286 }
1287
1288 /**
1289 * Ensures that the clientID was manually specified and not auto-generated.
1290 * If the clientID was not specified this method will throw an exception.
1291 * This method is used to ensure that the clientID + durableSubscriber name
1292 * are used correctly.
1293 *
1294 * @throws JMSException
1295 */
1296 public void checkClientIDWasManuallySpecified() throws JMSException {
1297 if (!userSpecifiedClientID) {
1298 throw new JMSException(
1299 "You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1300 }
1301 }
1302
1303 /**
1304 * handle disconnect/reconnect events
1305 *
1306 * @param event
1307 */
1308 public void statusChanged(TransportStatusEvent event) {
1309 log.info("channel status changed: " + event);
1310 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
1311 isTransportOK = true;
1312 doReconnect();
1313
1314 } else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
1315 isTransportOK = false;
1316 clearMessagesInProgress();
1317 }
1318 }
1319
1320 /**
1321 * send a Packet through the Connection - for internal use only
1322 *
1323 * @param packet
1324 * @throws JMSException
1325 */
1326 public void asyncSendPacket(Packet packet) throws JMSException {
1327 asyncSendPacket(packet, true);
1328 }
1329
1330 /**
1331 * send a Packet through the Connection - for internal use only
1332 *
1333 * @param packet
1334 * @param doSendWhileReconnecting
1335 * @throws JMSException
1336 */
1337 public