1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.activemq;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import javax.jms.Connection;
38 import javax.jms.ConnectionConsumer;
39 import javax.jms.ConnectionMetaData;
40 import javax.jms.DeliveryMode;
41 import javax.jms.Destination;
42 import javax.jms.ExceptionListener;
43 import javax.jms.IllegalStateException;
44 import javax.jms.JMSException;
45 import javax.jms.Queue;
46 import javax.jms.QueueConnection;
47 import javax.jms.QueueSession;
48 import javax.jms.ServerSessionPool;
49 import javax.jms.Session;
50 import javax.jms.Topic;
51 import javax.jms.TopicConnection;
52 import javax.jms.TopicSession;
53 import javax.jms.XAConnection;
54
55 import org.apache.activemq.blob.BlobTransferPolicy;
56 import org.apache.activemq.command.ActiveMQDestination;
57 import org.apache.activemq.command.ActiveMQMessage;
58 import org.apache.activemq.command.ActiveMQTempDestination;
59 import org.apache.activemq.command.ActiveMQTempQueue;
60 import org.apache.activemq.command.ActiveMQTempTopic;
61 import org.apache.activemq.command.BrokerInfo;
62 import org.apache.activemq.command.Command;
63 import org.apache.activemq.command.CommandTypes;
64 import org.apache.activemq.command.ConnectionControl;
65 import org.apache.activemq.command.ConnectionError;
66 import org.apache.activemq.command.ConnectionId;
67 import org.apache.activemq.command.ConnectionInfo;
68 import org.apache.activemq.command.ConsumerControl;
69 import org.apache.activemq.command.ConsumerId;
70 import org.apache.activemq.command.ConsumerInfo;
71 import org.apache.activemq.command.ControlCommand;
72 import org.apache.activemq.command.DestinationInfo;
73 import org.apache.activemq.command.ExceptionResponse;
74 import org.apache.activemq.command.Message;
75 import org.apache.activemq.command.MessageDispatch;
76 import org.apache.activemq.command.MessageId;
77 import org.apache.activemq.command.ProducerAck;
78 import org.apache.activemq.command.ProducerId;
79 import org.apache.activemq.command.RemoveSubscriptionInfo;
80 import org.apache.activemq.command.Response;
81 import org.apache.activemq.command.SessionId;
82 import org.apache.activemq.command.ShutdownInfo;
83 import org.apache.activemq.command.WireFormatInfo;
84 import org.apache.activemq.management.JMSConnectionStatsImpl;
85 import org.apache.activemq.management.JMSStatsImpl;
86 import org.apache.activemq.management.StatsCapable;
87 import org.apache.activemq.management.StatsImpl;
88 import org.apache.activemq.state.CommandVisitorAdapter;
89 import org.apache.activemq.thread.TaskRunnerFactory;
90 import org.apache.activemq.transport.Transport;
91 import org.apache.activemq.transport.TransportListener;
92 import org.apache.activemq.util.IdGenerator;
93 import org.apache.activemq.util.IntrospectionSupport;
94 import org.apache.activemq.util.JMSExceptionSupport;
95 import org.apache.activemq.util.LongSequenceGenerator;
96 import org.apache.activemq.util.ServiceSupport;
97 import org.apache.activemq.advisory.DestinationSource;
98 import org.apache.commons.logging.Log;
99 import org.apache.commons.logging.LogFactory;
100
101 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
102
103 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
104 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
105 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
106
107 private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
108 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
109
110 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
111
112 protected boolean dispatchAsync=true;
113 protected boolean alwaysSessionAsync = true;
114
115 private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000);
116 private final ThreadPoolExecutor asyncConnectionThread;
117
118 // Connection state variables
119 private final ConnectionInfo info;
120 private ExceptionListener exceptionListener;
121 private boolean clientIDSet;
122 private boolean isConnectionInfoSentToBroker;
123 private boolean userSpecifiedClientID;
124
125 // Configuration options variables
126 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
127 private BlobTransferPolicy blobTransferPolicy;
128 private RedeliveryPolicy redeliveryPolicy;
129 private MessageTransformer transformer;
130
131 private boolean disableTimeStampsByDefault;
132 private boolean optimizedMessageDispatch = true;
133 private boolean copyMessageOnSend = true;
134 private boolean useCompression;
135 private boolean objectMessageSerializationDefered;
136 private boolean useAsyncSend;
137 private boolean optimizeAcknowledge;
138 private boolean nestedMapAndListEnabled = true;
139 private boolean useRetroactiveConsumer;
140 private boolean exclusiveConsumer;
141 private boolean alwaysSyncSend;
142 private int closeTimeout = 15000;
143 private boolean watchTopicAdvisories = true;
144 private long warnAboutUnstartedConnectionTimeout = 500L;
145 private int sendTimeout =0;
146
147 private final Transport transport;
148 private final IdGenerator clientIdGenerator;
149 private final JMSStatsImpl factoryStats;
150 private final JMSConnectionStatsImpl stats;
151
152 private final AtomicBoolean started = new AtomicBoolean(false);
153 private final AtomicBoolean closing = new AtomicBoolean(false);
154 private final AtomicBoolean closed = new AtomicBoolean(false);
155 private final AtomicBoolean transportFailed = new AtomicBoolean(false);
156 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
157 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
158 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
159 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
160 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
161
162 // Maps ConsumerIds to ActiveMQConsumer objects
163 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
164 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
165 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
166 private final SessionId connectionSessionId;
167 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
168 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
169 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
170 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
171
172 private AdvisoryConsumer advisoryConsumer;
173 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
174 private BrokerInfo brokerInfo;
175 private IOException firstFailureError;
176 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
177
178 // Assume that protocol is the latest. Change to the actual protocol
179 // version when a WireFormatInfo is received.
180 private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
181 private long timeCreated;
182 private ConnectionAudit connectionAudit = new ConnectionAudit();
183 private DestinationSource destinationSource;
184
185 /**
186 * Construct an <code>ActiveMQConnection</code>
187 *
188 * @param transport
189 * @param factoryStats
190 * @throws Exception
191 */
192 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
193
194 this.transport = transport;
195 this.clientIdGenerator = clientIdGenerator;
196 this.factoryStats = factoryStats;
197
198 // Configure a single threaded executor who's core thread can timeout if
199 // idle
200 asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
201 public Thread newThread(Runnable r) {
202 Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport);
203 thread.setDaemon(true);
204 return thread;
205 }
206 });
207 // asyncConnectionThread.allowCoreThreadTimeOut(true);
208
209 this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
210 this.info.setManageable(true);
211 this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
212
213 this.transport.setTransportListener(this);
214
215 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
216 this.factoryStats.addConnection(this);
217 this.timeCreated = System.currentTimeMillis();
218 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
219 }
220
221 protected void setUserName(String userName) {
222 this.info.setUserName(userName);
223 }
224
225 protected void setPassword(String password) {
226 this.info.setPassword(password);
227 }
228
229 /**
230 * A static helper method to create a new connection
231 *
232 * @return an ActiveMQConnection
233 * @throws JMSException
234 */
235 public static ActiveMQConnection makeConnection() throws JMSException {
236 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
237 return (ActiveMQConnection)factory.createConnection();
238 }
239
240 /**
241 * A static helper method to create a new connection
242 *
243 * @param uri
244 * @return and ActiveMQConnection
245 * @throws JMSException
246 */
247 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
248 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
249 return (ActiveMQConnection)factory.createConnection();
250 }
251
252 /**
253 * A static helper method to create a new connection
254 *
255 * @param user
256 * @param password
257 * @param uri
258 * @return an ActiveMQConnection
259 * @throws JMSException
260 */
261 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
262 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
263 return (ActiveMQConnection)factory.createConnection();
264 }
265
266 /**
267 * @return a number unique for this connection
268 */
269 public JMSConnectionStatsImpl getConnectionStats() {
270 return stats;
271 }
272
273 /**
274 * Creates a <CODE>Session</CODE> object.
275 *
276 * @param transacted indicates whether the session is transacted
277 * @param acknowledgeMode indicates whether the consumer or the client will
278 * acknowledge any messages it receives; ignored if the
279 * session is transacted. Legal values are
280 * <code>Session.AUTO_ACKNOWLEDGE</code>,
281 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
282 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
283 * @return a newly created session
284 * @throws JMSException if the <CODE>Connection</CODE> object fails to
285 * create a session due to some internal error or lack of
286 * support for the specific transaction and acknowledgement
287 * mode.
288 * @see Session#AUTO_ACKNOWLEDGE
289 * @see Session#CLIENT_ACKNOWLEDGE
290 * @see Session#DUPS_OK_ACKNOWLEDGE
291 * @since 1.1
292 */
293 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
294 checkClosedOrFailed();
295 ensureConnectionInfoSent();
296 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
297 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
298 }
299
300 /**
301 * @return sessionId
302 */
303 protected SessionId getNextSessionId() {
304 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
305 }
306
307 /**
308 * Gets the client identifier for this connection.
309 * <P>
310 * This value is specific to the JMS provider. It is either preconfigured by
311 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
312 * dynamically by the application by calling the <code>setClientID</code>
313 * method.
314 *
315 * @return the unique client identifier
316 * @throws JMSException if the JMS provider fails to return the client ID
317 * for this connection due to some internal error.
318 */
319 public String getClientID() throws JMSException {
320 checkClosedOrFailed();
321 return this.info.getClientId();
322 }
323
324 /**
325 * Sets the client identifier for this connection.
326 * <P>
327 * The preferred way to assign a JMS client's client identifier is for it to
328 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
329 * object and transparently assigned to the <CODE>Connection</CODE> object
330 * it creates.
331 * <P>
332 * Alternatively, a client can set a connection's client identifier using a
333 * provider-specific value. The facility to set a connection's client
334 * identifier explicitly is not a mechanism for overriding the identifier
335 * that has been administratively configured. It is provided for the case
336 * where no administratively specified identifier exists. If one does exist,
337 * an attempt to change it by setting it must throw an
338 * <CODE>IllegalStateException</CODE>. If a client sets the client
339 * identifier explicitly, it must do so immediately after it creates the
340 * connection and before any other action on the connection is taken. After
341 * this point, setting the client identifier is a programming error that
342 * should throw an <CODE>IllegalStateException</CODE>.
343 * <P>
344 * The purpose of the client identifier is to associate a connection and its
345 * objects with a state maintained on behalf of the client by a provider.
346 * The only such state identified by the JMS API is that required to support
347 * durable subscriptions.
348 * <P>
349 * If another connection with the same <code>clientID</code> is already
350 * running when this method is called, the JMS provider should detect the
351 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
352 *
353 * @param newClientID the unique client identifier
354 * @throws JMSException if the JMS provider fails to set the client ID for
355 * this connection due to some internal error.
356 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
357 * invalid or duplicate client ID.
358 * @throws javax.jms.IllegalStateException if the JMS client attempts to set
359 * a connection's client ID at the wrong time or when it has
360 * been administratively configured.
361 */
362 public void setClientID(String newClientID) throws JMSException {
363 checkClosedOrFailed();
364
365 if (this.clientIDSet) {
366 throw new IllegalStateException("The clientID has already been set");
367 }
368
369 if (this.isConnectionInfoSentToBroker) {
370 throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
371 }
372
373 this.info.setClientId(newClientID);
374 this.userSpecifiedClientID = true;
375 ensureConnectionInfoSent();
376 }
377
378 /**
379 * Sets the default client id that the connection will use if explicitly not
380 * set with the setClientId() call.
381 */
382 public void setDefaultClientID(String clientID) throws JMSException {
383 this.info.setClientId(clientID);
384 this.userSpecifiedClientID = true;
385 }
386
387 /**
388 * Gets the metadata for this connection.
389 *
390 * @return the connection metadata
391 * @throws JMSException if the JMS provider fails to get the connection
392 * metadata for this connection.
393 * @see javax.jms.ConnectionMetaData
394 */
395 public ConnectionMetaData getMetaData() throws JMSException {
396 checkClosedOrFailed();
397 return ActiveMQConnectionMetaData.INSTANCE;
398 }
399
400 /**
401 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
402 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
403 * associated with it.
404 *
405 * @return the <CODE>ExceptionListener</CODE> for this connection, or
406 * null. if no <CODE>ExceptionListener</CODE> is associated with
407 * this connection.
408 * @throws JMSException if the JMS provider fails to get the
409 * <CODE>ExceptionListener</CODE> for this connection.
410 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
411 */
412 public ExceptionListener getExceptionListener() throws JMSException {
413 checkClosedOrFailed();
414 return this.exceptionListener;
415 }
416
417 /**
418 * Sets an exception listener for this connection.
419 * <P>
420 * If a JMS provider detects a serious problem with a connection, it informs
421 * the connection's <CODE> ExceptionListener</CODE>, if one has been
422 * registered. It does this by calling the listener's <CODE>onException
423 * </CODE>
424 * method, passing it a <CODE>JMSException</CODE> object describing the
425 * problem.
426 * <P>
427 * An exception listener allows a client to be notified of a problem
428 * asynchronously. Some connections only consume messages, so they would
429 * have no other way to learn their connection has failed.
430 * <P>
431 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
432 * <P>
433 * A JMS provider should attempt to resolve connection problems itself
434 * before it notifies the client of them.
435 *
436 * @param listener the exception listener
437 * @throws JMSException if the JMS provider fails to set the exception
438 * listener for this connection.
439 */
440 public void setExceptionListener(ExceptionListener listener) throws JMSException {
441 checkClosedOrFailed();
442 this.exceptionListener = listener;
443 }
444
445 /**
446 * Starts (or restarts) a connection's delivery of incoming messages. A call
447 * to <CODE>start</CODE> on a connection that has already been started is
448 * ignored.
449 *
450 * @throws JMSException if the JMS provider fails to start message delivery
451 * due to some internal error.
452 * @see javax.jms.Connection#stop()
453 */
454 public void start() throws JMSException {
455 checkClosedOrFailed();
456 ensureConnectionInfoSent();
457 if (started.compareAndSet(false, true)) {
458 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
459 ActiveMQSession session = i.next();
460 session.start();
461 }
462 }
463 }
464
465 /**
466 * Temporarily stops a connection's delivery of incoming messages. Delivery
467 * can be restarted using the connection's <CODE>start</CODE> method. When
468 * the connection is stopped, delivery to all the connection's message
469 * consumers is inhibited: synchronous receives block, and messages are not
470 * delivered to message listeners.
471 * <P>
472 * This call blocks until receives and/or message listeners in progress have
473 * completed.
474 * <P>
475 * Stopping a connection has no effect on its ability to send messages. A
476 * call to <CODE>stop</CODE> on a connection that has already been stopped
477 * is ignored.
478 * <P>
479 * A call to <CODE>stop</CODE> must not return until delivery of messages
480 * has paused. This means that a client can rely on the fact that none of
481 * its message listeners will be called and that all threads of control
482 * waiting for <CODE>receive</CODE> calls to return will not return with a
483 * message until the connection is restarted. The receive timers for a
484 * stopped connection continue to advance, so receives may time out while
485 * the connection is stopped.
486 * <P>
487 * If message listeners are running when <CODE>stop</CODE> is invoked, the
488 * <CODE>stop</CODE> call must wait until all of them have returned before
489 * it may return. While these message listeners are completing, they must
490 * have the full services of the connection available to them.
491 *
492 * @throws JMSException if the JMS provider fails to stop message delivery
493 * due to some internal error.
494 * @see javax.jms.Connection#start()
495 */
496 public void stop() throws JMSException {
497 checkClosedOrFailed();
498 if (started.compareAndSet(true, false)) {
499 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
500 ActiveMQSession s = i.next();
501 s.stop();
502 }
503 }
504 }
505
506 /**
507 * Closes the connection.
508 * <P>
509 * Since a provider typically allocates significant resources outside the
510 * JVM on behalf of a connection, clients should close these resources when
511 * they are not needed. Relying on garbage collection to eventually reclaim
512 * these resources may not be timely enough.
513 * <P>
514 * There is no need to close the sessions, producers, and consumers of a
515 * closed connection.
516 * <P>
517 * Closing a connection causes all temporary destinations to be deleted.
518 * <P>
519 * When this method is invoked, it should not return until message
520 * processing has been shut down in an orderly fashion. This means that all
521 * message listeners that may have been running have returned, and that all
522 * pending receives have returned. A close terminates all pending message
523 * receives on the connection's sessions' consumers. The receives may return
524 * with a message or with null, depending on whether there was a message
525 * available at the time of the close. If one or more of the connection's
526 * sessions' message listeners is processing a message at the time when
527 * connection <CODE>close</CODE> is invoked, all the facilities of the
528 * connection and its sessions must remain available to those listeners
529 * until they return control to the JMS provider.
530 * <P>
531 * Closing a connection causes any of its sessions' transactions in progress
532 * to be rolled back. In the case where a session's work is coordinated by
533 * an external transaction manager, a session's <CODE>commit</CODE> and
534 * <CODE> rollback</CODE> methods are not used and the result of a closed
535 * session's work is determined later by the transaction manager. Closing a
536 * connection does NOT force an acknowledgment of client-acknowledged
537 * sessions.
538 * <P>
539 * Invoking the <CODE>acknowledge</CODE> method of a received message from
540 * a closed connection's session must throw an
541 * <CODE>IllegalStateException</CODE>. Closing a closed connection must
542 * NOT throw an exception.
543 *
544 * @throws JMSException if the JMS provider fails to close the connection
545 * due to some internal error. For example, a failure to
546 * release resources or to close a socket connection can
547 * cause this exception to be thrown.
548 */
549 public void close() throws JMSException {
550 try {
551 // If we were running, lets stop first.
552 stop();
553
554 synchronized (this) {
555 if (!closed.get()) {
556 closing.set(true);
557
558 if (destinationSource != null) {
559 destinationSource.stop();
560 destinationSource = null;
561 }
562 if (advisoryConsumer != null) {
563 advisoryConsumer.dispose();
564 advisoryConsumer = null;
565 }
566
567 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
568 ActiveMQSession s = i.next();
569 s.dispose();
570 }
571 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
572 ActiveMQConnectionConsumer c = i.next();
573 c.dispose();
574 }
575 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
576 ActiveMQInputStream c = i.next();
577 c.dispose();
578 }
579 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
580 ActiveMQOutputStream c = i.next();
581 c.dispose();
582 }
583
584 if (isConnectionInfoSentToBroker) {
585 // If we announced ourselfs to the broker.. Try to let
586 // the broker
587 // know that the connection is being shutdown.
588 doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
589 doAsyncSendPacket(new ShutdownInfo());
590 }
591
592 ServiceSupport.dispose(this.transport);
593
594 started.set(false);
595
596 // TODO if we move the TaskRunnerFactory to the connection
597 // factory
598 // then we may need to call
599 // factory.onConnectionClose(this);
600 sessionTaskRunner.shutdown();
601
602 if (asyncConnectionThread != null){
603 asyncConnectionThread.shutdown();
604 }
605
606 closed.set(true);
607 closing.set(false);
608 }
609 }
610 } finally {
611 factoryStats.removeConnection(this);
612 }
613 }
614
615 /**
616 * Tells the broker to terminate its VM. This can be used to cleanly
617 * terminate a broker running in a standalone java process. Server must have
618 * property enable.vm.shutdown=true defined to allow this to work.
619 */
620 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
621 // implemented.
622 /*
623 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
624 * command = new BrokerAdminCommand();
625 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
626 * asyncSendPacket(command); }
627 */
628
629 /**
630 * Create a durable connection consumer for this connection (optional
631 * operation). This is an expert facility not used by regular JMS clients.
632 *
633 * @param topic topic to access
634 * @param subscriptionName durable subscription name
635 * @param messageSelector only messages with properties matching the message
636 * selector expression are delivered. A value of null or an
637 * empty string indicates that there is no message selector
638 * for the message consumer.
639 * @param sessionPool the server session pool to associate with this durable
640 * connection consumer
641 * @param maxMessages the maximum number of messages that can be assigned to
642 * a server session at one time
643 * @return the durable connection consumer
644 * @throws JMSException if the <CODE>Connection</CODE> object fails to
645 * create a connection consumer due to some internal error
646 * or invalid arguments for <CODE>sessionPool</CODE> and
647 * <CODE>messageSelector</CODE>.
648 * @throws javax.jms.InvalidDestinationException if an invalid destination
649 * is specified.
650 * @throws javax.jms.InvalidSelectorException if the message selector is
651 * invalid.
652 * @see javax.jms.ConnectionConsumer
653 * @since 1.1
654 */
655 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
656 throws JMSException {
657 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
658 }
659
660 /**
661 * Create a durable connection consumer for this connection (optional
662 * operation). This is an expert facility not used by regular JMS clients.
663 *
664 * @param topic topic to access
665 * @param subscriptionName durable subscription name
666 * @param messageSelector only messages with properties matching the message
667 * selector expression are delivered. A value of null or an
668 * empty string indicates that there is no message selector
669 * for the message consumer.
670 * @param sessionPool the server session pool to associate with this durable
671 * connection consumer
672 * @param maxMessages the maximum number of messages that can be assigned to
673 * a server session at one time
674 * @param noLocal set true if you want to filter out messages published
675 * locally
676 * @return the durable connection consumer
677 * @throws JMSException if the <CODE>Connection</CODE> object fails to
678 * create a connection consumer due to some internal error
679 * or invalid arguments for <CODE>sessionPool</CODE> and
680 * <CODE>messageSelector</CODE>.
681 * @throws javax.jms.InvalidDestinationException if an invalid destination
682 * is specified.
683 * @throws javax.jms.InvalidSelectorException if the message selector is
684 * invalid.
685 * @see javax.jms.ConnectionConsumer
686 * @since 1.1
687 */
688 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
689 boolean noLocal) throws JMSException {
690 checkClosedOrFailed();
691 ensureConnectionInfoSent();
692 SessionId sessionId = new SessionId(info.getConnectionId(), -1);
693 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
694 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
695 info.setSubscriptionName(subscriptionName);
696 info.setSelector(messageSelector);
697 info.setPrefetchSize(maxMessages);
698 info.setDispatchAsync(isDispatchAsync());
699
700 // Allows the options on the destination to configure the consumerInfo
701 if (info.getDestination().getOptions() != null) {
702 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
703 IntrospectionSupport.setProperties(this.info, options, "consumer.");
704 }
705
706 return new ActiveMQConnectionConsumer(this, sessionPool, info);
707 }
708
709 // Properties
710 // -------------------------------------------------------------------------
711
712 /**
713 * Returns true if this connection has been started
714 *
715 * @return true if this Connection is started
716 */
717 public boolean isStarted() {
718 return started.get();
719 }
720
721 /**
722 * Returns true if the connection is closed
723 */
724 public boolean isClosed() {
725 return closed.get();
726 }
727
728 /**
729 * Returns true if the connection is in the process of being closed
730 */
731 public boolean isClosing() {
732 return closing.get();
733 }
734
735 /**
736 * Returns true if the underlying transport has failed
737 */
738 public boolean isTransportFailed() {
739 return transportFailed.get();
740 }
741
742 /**
743 * @return Returns the prefetchPolicy.
744 */
745 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
746 return prefetchPolicy;
747 }
748
749 /**
750 * Sets the <a
751 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
752 * policy</a> for consumers created by this connection.
753 */
754 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
755 this.prefetchPolicy = prefetchPolicy;
756 }
757
758 /**
759 */
760 public Transport getTransportChannel() {
761 return transport;
762 }
763
764 /**
765 * @return Returns the clientID of the connection, forcing one to be
766 * generated if one has not yet been configured.
767 */
768 public String getInitializedClientID() throws JMSException {
769 ensureConnectionInfoSent();
770 return info.getClientId();
771 }
772
773 /**
774 * @return Returns the timeStampsDisableByDefault.
775 */
776 public boolean isDisableTimeStampsByDefault() {
777 return disableTimeStampsByDefault;
778 }
779
780 /**
781 * Sets whether or not timestamps on messages should be disabled or not. If
782 * you disable them it adds a small performance boost.
783 */
784 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
785 this.disableTimeStampsByDefault = timeStampsDisableByDefault;
786 }
787
788 /**
789 * @return Returns the dispatchOptimizedMessage.
790 */
791 public boolean isOptimizedMessageDispatch() {
792 return optimizedMessageDispatch;
793 }
794
795 /**
796 * If this flag is set then an larger prefetch limit is used - only
797 * applicable for durable topic subscribers.
798 */
799 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
800 this.optimizedMessageDispatch = dispatchOptimizedMessage;
801 }
802
803 /**
804 * @return Returns the closeTimeout.
805 */
806 public int getCloseTimeout() {
807 return closeTimeout;
808 }
809
810 /**
811 * Sets the timeout before a close is considered complete. Normally a
812 * close() on a connection waits for confirmation from the broker; this
813 * allows that operation to timeout to save the client hanging if there is
814 * no broker
815 */
816 public void setCloseTimeout(int closeTimeout) {
817 this.closeTimeout = closeTimeout;
818 }
819
820 /**
821 * @return ConnectionInfo
822 */
823 public ConnectionInfo getConnectionInfo() {
824 return this.info;
825 }
826
827 public boolean isUseRetroactiveConsumer() {
828 return useRetroactiveConsumer;
829 }
830
831 /**
832 * Sets whether or not retroactive consumers are enabled. Retroactive
833 * consumers allow non-durable topic subscribers to receive old messages
834 * that were published before the non-durable subscriber started.
835 */
836 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
837 this.useRetroactiveConsumer = useRetroactiveConsumer;
838 }
839
840 public boolean isNestedMapAndListEnabled() {
841 return nestedMapAndListEnabled;
842 }
843
844 /**
845 * Enables/disables whether or not Message properties and MapMessage entries
846 * support <a
847 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
848 * Structures</a> of Map and List objects
849 */
850 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
851 this.nestedMapAndListEnabled = structuredMapsEnabled;
852 }
853
854 public boolean isExclusiveConsumer() {
855 return exclusiveConsumer;
856 }
857
858 /**
859 * Enables or disables whether or not queue consumers should be exclusive or
860 * not for example to preserve ordering when not using <a
861 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
862 *
863 * @param exclusiveConsumer
864 */
865 public void setExclusiveConsumer(boolean exclusiveConsumer) {
866 this.exclusiveConsumer = exclusiveConsumer;
867 }
868
869 /**
870 * Adds a transport listener so that a client can be notified of events in
871 * the underlying transport
872 */
873 public void addTransportListener(TransportListener transportListener) {
874 transportListeners.add(transportListener);
875 }
876
877 public void removeTransportListener(TransportListener transportListener) {
878 transportListeners.remove(transportListener);
879 }
880
881 public TaskRunnerFactory getSessionTaskRunner() {
882 return sessionTaskRunner;
883 }
884
885 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
886 this.sessionTaskRunner = sessionTaskRunner;
887 }
888
889 public MessageTransformer getTransformer() {
890 return transformer;
891 }
892
893 /**
894 * Sets the transformer used to transform messages before they are sent on
895 * to the JMS bus or when they are received from the bus but before they are
896 * delivered to the JMS client
897 */
898 public void setTransformer(MessageTransformer transformer) {
899 this.transformer = transformer;
900 }
901
902 /**
903 * @return the statsEnabled
904 */
905 public boolean isStatsEnabled() {
906 return this.stats.isEnabled();
907 }
908
909 /**
910 * @param statsEnabled the statsEnabled to set
911 */
912 public void setStatsEnabled(boolean statsEnabled) {
913 this.stats.setEnabled(statsEnabled);
914 }
915
916 /**
917 * Returns the {@link DestinationSource} object which can be used to listen to destinations
918 * being created or destroyed or to enquire about the current destinations available on the broker
919 *
920 * @return a lazily created destination source
921 * @throws JMSException
922 */
923 public DestinationSource getDestinationSource() throws JMSException {
924 if (destinationSource == null) {
925 destinationSource = new DestinationSource(this);
926 destinationSource.start();
927 }
928 return destinationSource;
929 }
930
931 // Implementation methods
932 // -------------------------------------------------------------------------
933
934 /**
935 * Used internally for adding Sessions to the Connection
936 *
937 * @param session
938 * @throws JMSException
939 * @throws JMSException
940 */
941 protected void addSession(ActiveMQSession session) throws JMSException {
942 this.sessions.add(session);
943 if (sessions.size() > 1 || session.isTransacted()) {
944 optimizedMessageDispatch = false;
945 }
946 }
947
948 /**
949 * Used interanlly for removing Sessions from a Connection
950 *
951 * @param session
952 */
953 protected void removeSession(ActiveMQSession session) {
954 this.sessions.remove(session);
955 this.removeDispatcher(session);
956 }
957
958 /**
959 * Add a ConnectionConsumer
960 *
961 * @param connectionConsumer
962 * @throws JMSException
963 */
964 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
965 this.connectionConsumers.add(connectionConsumer);
966 }
967
968 /**
969 * Remove a ConnectionConsumer
970 *
971 * @param connectionConsumer
972 */
973 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
974 this.connectionConsumers.remove(connectionConsumer);
975 this.removeDispatcher(connectionConsumer);
976 }
977
978 /**
979 * Creates a <CODE>TopicSession</CODE> object.
980 *
981 * @param transacted indicates whether the session is transacted
982 * @param acknowledgeMode indicates whether the consumer or the client will
983 * acknowledge any messages it receives; ignored if the
984 * session is transacted. Legal values are
985 * <code>Session.AUTO_ACKNOWLEDGE</code>,
986 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
987 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
988 * @return a newly created topic session
989 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
990 * to create a session due to some internal error or lack of
991 * support for the specific transaction and acknowledgement
992 * mode.
993 * @see Session#AUTO_ACKNOWLEDGE
994 * @see Session#CLIENT_ACKNOWLEDGE
995 * @see Session#DUPS_OK_ACKNOWLEDGE
996 */
997 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
998 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
999 }
1000
1001 /**
1002 * Creates a connection consumer for this connection (optional operation).
1003 * This is an expert facility not used by regular JMS clients.
1004 *
1005 * @param topic the topic to access
1006 * @param messageSelector only messages with properties matching the message
1007 * selector expression are delivered. A value of null or an
1008 * empty string indicates that there is no message selector
1009 * for the message consumer.
1010 * @param sessionPool the server session pool to associate with this
1011 * connection consumer
1012 * @param maxMessages the maximum number of messages that can be assigned to
1013 * a server session at one time
1014 * @return the connection consumer
1015 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1016 * to create a connection consumer due to some internal
1017 * error or invalid arguments for <CODE>sessionPool</CODE>
1018 * and <CODE>messageSelector</CODE>.
1019 * @throws javax.jms.InvalidDestinationException if an invalid topic is
1020 * specified.
1021 * @throws javax.jms.InvalidSelectorException if the message selector is
1022 * invalid.
1023 * @see javax.jms.ConnectionConsumer
1024 */
1025 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1026 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1027 }
1028
1029 /**
1030 * Creates a connection consumer for this connection (optional operation).
1031 * This is an expert facility not used by regular JMS clients.
1032 *
1033 * @param queue the queue to access
1034 * @param messageSelector only messages with properties matching the message
1035 * selector expression are delivered. A value of null or an
1036 * empty string indicates that there is no message selector
1037 * for the message consumer.
1038 * @param sessionPool the server session pool to associate with this
1039 * connection consumer
1040 * @param maxMessages the maximum number of messages that can be assigned to
1041 * a server session at one time
1042 * @return the connection consumer
1043 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1044 * to create a connection consumer due to some internal
1045 * error or invalid arguments for <CODE>sessionPool</CODE>
1046 * and <CODE>messageSelector</CODE>.
1047 * @throws javax.jms.InvalidDestinationException if an invalid queue is
1048 * specified.
1049 * @throws javax.jms.InvalidSelectorException if the message selector is
1050 * invalid.
1051 * @see javax.jms.ConnectionConsumer
1052 */
1053 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1054 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1055 }
1056
1057 /**
1058 * Creates a connection consumer for this connection (optional operation).
1059 * This is an expert facility not used by regular JMS clients.
1060 *
1061 * @param destination the destination to access
1062 * @param messageSelector only messages with properties matching the message
1063 * selector expression are delivered. A value of null or an
1064 * empty string indicates that there is no message selector
1065 * for the message consumer.
1066 * @param sessionPool the server session pool to associate with this
1067 * connection consumer
1068 * @param maxMessages the maximum number of messages that can be assigned to
1069 * a server session at one time
1070 * @return the connection consumer
1071 * @throws JMSException if the <CODE>Connection</CODE> object fails to
1072 * create a connection consumer due to some internal error
1073 * or invalid arguments for <CODE>sessionPool</CODE> and
1074 * <CODE>messageSelector</CODE>.
1075 * @throws javax.jms.InvalidDestinationException if an invalid destination
1076 * is specified.
1077 * @throws javax.jms.InvalidSelectorException if the message selector is
1078 * invalid.
1079 * @see javax.jms.ConnectionConsumer
1080 * @since 1.1
1081 */
1082 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1083 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1084 }
1085
1086 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1087 throws JMSException {
1088
1089 checkClosedOrFailed();
1090 ensureConnectionInfoSent();
1091
1092 ConsumerId consumerId = createConsumerId();
1093 ConsumerInfo info = new ConsumerInfo(consumerId);
1094 info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1095 info.setSelector(messageSelector);
1096 info.setPrefetchSize(maxMessages);
1097 info.setNoLocal(noLocal);
1098 info.setDispatchAsync(isDispatchAsync());
1099
1100 // Allows the options on the destination to configure the consumerInfo
1101 if (info.getDestination().getOptions() != null) {
1102 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
1103 IntrospectionSupport.setProperties(info, options, "consumer.");
1104 }
1105
1106 return new ActiveMQConnectionConsumer(this, sessionPool, info);
1107 }
1108
1109 /**
1110 * @return
1111 */
1112 private ConsumerId createConsumerId() {
1113 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1114 }
1115
1116 /**
1117 * @return
1118 */
1119 private ProducerId createProducerId() {
1120 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1121 }
1122
1123 /**
1124 * Creates a <CODE>QueueSession</CODE> object.
1125 *
1126 * @param transacted indicates whether the session is transacted
1127 * @param acknowledgeMode indicates whether the consumer or the client will
1128 * acknowledge any messages it receives; ignored if the
1129 * session is transacted. Legal values are
1130 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1131 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1132 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1133 * @return a newly created queue session
1134 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1135 * to create a session due to some internal error or lack of
1136 * support for the specific transaction and acknowledgement
1137 * mode.
1138 * @see Session#AUTO_ACKNOWLEDGE
1139 * @see Session#CLIENT_ACKNOWLEDGE
1140 * @see Session#DUPS_OK_ACKNOWLEDGE
1141 */
1142 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1143 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1144 }
1145
1146 /**
1147 * Ensures that the clientID was manually specified and not auto-generated.
1148 * If the clientID was not specified this method will throw an exception.
1149 * This method is used to ensure that the clientID + durableSubscriber name
1150 * are used correctly.
1151 *
1152 * @throws JMSException
1153 */
1154 public void checkClientIDWasManuallySpecified() throws JMSException {
1155 if (!userSpecifiedClientID) {
1156 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1157 }
1158 }
1159
1160 /**
1161 * send a Packet through the Connection - for internal use only
1162 *
1163 * @param command
1164 * @throws JMSException
1165 */
1166 public void asyncSendPacket(Command command) throws JMSException {
1167 if (isClosed()) {
1168 throw new ConnectionClosedException();
1169 } else {
1170 doAsyncSendPacket(command);
1171 }
1172 }
1173
1174 private void doAsyncSendPacket(Command command) throws JMSException {
1175 try {
1176 this.transport.oneway(command);
1177 } catch (IOException e) {
1178 throw JMSExceptionSupport.create(e);
1179 }
1180 }
1181
1182 /**
1183 * Send a packet through a Connection - for internal use only
1184 *
1185 * @param command
1186 * @return
1187 * @throws JMSException
1188 */
1189 public Response syncSendPacket(Command command) throws JMSException {
1190 if (isClosed()) {
1191 throw new ConnectionClosedException();
1192 } else {
1193
1194 try {
1195 Response response = (Response)this.transport.request(command);
1196 if (response.isException()) {
1197 ExceptionResponse er = (ExceptionResponse)response;
1198 if (er.getException() instanceof JMSException) {
1199 throw (JMSException)er.getException();
1200 } else {
1201 throw JMSExceptionSupport.create(er.getException());
1202 }
1203 }
1204 return response;
1205 } catch (IOException e) {
1206 throw JMSExceptionSupport.create(e);
1207 }
1208 }
1209 }
1210
1211 /**
1212 * Send a packet through a Connection - for internal use only
1213 *
1214 * @param command
1215 * @return
1216 * @throws JMSException
1217 */
1218 public Response syncSendPacket(Command command, int timeout) throws JMSException {
1219 if (isClosed() || closing.get()) {
1220 throw new ConnectionClosedException();
1221 } else {
1222 return doSyncSendPacket(command, timeout);
1223 }
1224 }
1225
1226 private Response doSyncSendPacket(Command command, int timeout)
1227 throws JMSException {
1228 try {
1229 Response response = (Response)this.transport.request(command, timeout);
1230 if (response != null && response.isException()) {
1231 ExceptionResponse er = (ExceptionResponse)response;
1232 if (er.getException() instanceof JMSException) {
1233 throw (JMSException)er.getException();
1234 } else {
1235 throw JMSExceptionSupport.create(er.getException());
1236 }
1237 }
1238 return response;
1239 } catch (IOException e) {
1240 throw JMSExceptionSupport.create(e);
1241 }
1242 }
1243
1244 /**
1245 * @return statistics for this Connection
1246 */
1247 public StatsImpl getStats() {
1248 return stats;
1249 }
1250
1251 /**
1252 * simply throws an exception if the Connection is already closed or the
1253 * Transport has failed
1254 *
1255 * @throws JMSException
1256 */
1257 protected synchronized void checkClosedOrFailed() throws JMSException {
1258 checkClosed();
1259 if (transportFailed.get()) {
1260 throw new ConnectionFailedException(firstFailureError);
1261 }
1262 }
1263
1264 /**
1265 * simply throws an exception if the Connection is already closed
1266 *
1267 * @throws JMSException
1268 */
1269 protected synchronized void checkClosed() throws JMSException {
1270 if (closed.get()) {
1271 throw new ConnectionClosedException();
1272 }
1273 }
1274
1275 /**
1276 * Send the ConnectionInfo to the Broker
1277 *
1278 * @throws JMSException
1279 */
1280 protected synchronized void ensureConnectionInfoSent() throws JMSException {
1281 // Can we skip sending the ConnectionInfo packet??
1282 if (isConnectionInfoSentToBroker || closed.get()) {
1283 return;
1284 }
1285
1286 if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1287 info.setClientId(clientIdGenerator.generateId());
1288 }
1289 syncSendPacket(info);
1290
1291 this.isConnectionInfoSentToBroker = true;
1292 // Add a temp destination advisory consumer so that
1293 // We know what the valid temporary destinations are on the
1294 // broker without having to do an RPC to the broker.
1295
1296 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1297 if (watchTopicAdvisories) {
1298 advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1299 }
1300 }
1301
1302 public synchronized boolean isWatchTopicAdvisories() {
1303 return watchTopicAdvisories;
1304 }
1305
1306 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1307 this.watchTopicAdvisories = watchTopicAdvisories;
1308 }
1309
1310 /**
1311 * @return Returns the useAsyncSend.
1312 */
1313 public boolean isUseAsyncSend() {
1314 return useAsyncSend;
1315 }
1316
1317 /**
1318 * Forces the use of <a
1319 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1320 * adds a massive performance boost; but means that the send() method will
1321 * return immediately whether the message has been sent or not which could
1322 * lead to message loss.
1323 */
1324 public void setUseAsyncSend(boolean useAsyncSend) {
1325 this.useAsyncSend = useAsyncSend;
1326 }
1327
1328 /**
1329 * @return true if always sync send messages
1330 */
1331 public boolean isAlwaysSyncSend() {
1332 return this.alwaysSyncSend;
1333 }
1334
1335 /**
1336 * Set true if always require messages to be sync sent
1337 *
1338 * @param alwaysSyncSend
1339 */
1340 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1341 this.alwaysSyncSend = alwaysSyncSend;
1342 }
1343
1344 /**
1345 * Cleans up this connection so that it's state is as if the connection was
1346 * just created. This allows the Resource Adapter to clean up a connection
1347 * so that it can be reused without having to close and recreate the
1348 * connection.
1349 */
1350 public void cleanup() throws JMSException {
1351
1352 if (advisoryConsumer != null) {
1353 advisoryConsumer.dispose();
1354 advisoryConsumer = null;
1355 }
1356
1357 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1358 ActiveMQSession s = i.next();
1359 s.dispose();
1360 }
1361 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1362 ActiveMQConnectionConsumer c = i.next();
1363 c.dispose();
1364 }
1365 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1366 ActiveMQInputStream c = i.next();
1367 c.dispose();
1368 }
1369 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1370 ActiveMQOutputStream c = i.next();
1371 c.dispose();
1372 }
1373
1374 if (isConnectionInfoSentToBroker) {
1375 if (!transportFailed.get() && !closing.get()) {
1376 asyncSendPacket(info.createRemoveCommand());
1377 }
1378 isConnectionInfoSentToBroker = false;
1379 }
1380 if (userSpecifiedClientID) {
1381 info.setClientId(null);
1382 userSpecifiedClientID = false;
1383 }
1384 clientIDSet = false;
1385
1386 started.set(false);
1387 }
1388
1389 /**
1390 * Changes the associated username/password that is associated with this
1391 * connection. If the connection has been used, you must called cleanup()
1392 * before calling this method.
1393 *
1394 * @throws IllegalStateException if the connection is in used.
1395 */
1396 public void changeUserInfo(String userName, String password) throws JMSException {
1397 if (isConnectionInfoSentToBroker) {
1398 throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1399 }
1400 this.info.setUserName(userName);
1401 this.info.setPassword(password);
1402 }
1403
1404 /**
1405 * @return Returns the resourceManagerId.
1406 * @throws JMSException
1407 */
1408 public String getResourceManagerId() throws JMSException {
1409 waitForBrokerInfo();
1410 if (brokerInfo == null) {
1411 throw new JMSException("Connection failed before Broker info was received.");
1412 }
1413 return brokerInfo.getBrokerId().getValue();
1414 }
1415
1416 /**
1417 * Returns the broker name if one is available or null if one is not
1418 * available yet.
1419 */
1420 public String getBrokerName() {
1421 try {
1422 brokerInfoReceived.await(5, TimeUnit.SECONDS);
1423 if (brokerInfo == null) {
1424 return null;
1425 }
1426 return brokerInfo.getBrokerName();
1427 } catch (InterruptedException e) {
1428 Thread.currentThread().interrupt();
1429 return null;
1430 }
1431 }
1432
1433 /**
1434 * Returns the broker information if it is available or null if it is not
1435 * available yet.
1436 */
1437 public BrokerInfo getBrokerInfo() {
1438 return brokerInfo;
1439 }
1440
1441 /**
1442 * @return Returns the RedeliveryPolicy.
1443 * @throws JMSException
1444 */
1445 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1446 return redeliveryPolicy;
1447 }
1448
1449 /**
1450 * Sets the redelivery policy to be used when messages are rolled back
1451 */
1452 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1453 this.redeliveryPolicy = redeliveryPolicy;
1454 }
1455
1456 public BlobTransferPolicy getBlobTransferPolicy() {
1457 if (blobTransferPolicy == null) {
1458 blobTransferPolicy = createBlobTransferPolicy();
1459 }
1460 return blobTransferPolicy;
1461 }
1462
1463 /**
1464 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1465 * OBjects) are transferred from producers to brokers to consumers
1466 */
1467 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1468 this.blobTransferPolicy = blobTransferPolicy;
1469 }
1470
1471 /**
1472 * @return Returns the alwaysSessionAsync.
1473 */
1474 public boolean isAlwaysSessionAsync() {
1475 return alwaysSessionAsync;
1476 }
1477
1478 /**
1479 * If this flag is set then a separate thread is not used for dispatching
1480 * messages for each Session in the Connection. However, a separate thread
1481 * is always used if there is more than one session, or the session isn't in
1482 * auto acknowledge or duplicates ok mode
1483 */
1484 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1485 this.alwaysSessionAsync = alwaysSessionAsync;
1486 }
1487
1488 /**
1489 * @return Returns the optimizeAcknowledge.
1490 */
1491 public boolean isOptimizeAcknowledge() {
1492 return optimizeAcknowledge;
1493 }
1494
1495 /**
1496 * Enables an optimised acknowledgement mode where messages are acknowledged
1497 * in batches rather than individually
1498 *
1499 * @param optimizeAcknowledge The optimizeAcknowledge to set.
1500 */
1501 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1502 this.optimizeAcknowledge = optimizeAcknowledge;
1503 }
1504
1505 public long getWarnAboutUnstartedConnectionTimeout() {
1506 return warnAboutUnstartedConnectionTimeout;
1507 }
1508
1509 /**
1510 * Enables the timeout from a connection creation to when a warning is
1511 * generated if the connection is not properly started via {@link #start()}
1512 * and a message is received by a consumer. It is a very common gotcha to
1513 * forget to <a
1514 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1515 * the connection</a> so this option makes the default case to create a
1516 * warning if the user forgets. To disable the warning just set the value to <
1517 * 0 (say -1).
1518 */
1519 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1520 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1521 }
1522
1523 /**
1524 * @return the sendTimeout
1525 */
1526 public int getSendTimeout() {
1527 return sendTimeout;
1528 }
1529
1530 /**
1531 * @param sendTimeout the sendTimeout to set
1532 */
1533 public void setSendTimeout(int sendTimeout) {
1534 this.sendTimeout = sendTimeout;
1535 }
1536
1537
1538 /**
1539 * Returns the time this connection was created
1540 */
1541 public long getTimeCreated() {
1542 return timeCreated;
1543 }
1544
1545 private void waitForBrokerInfo() throws JMSException {
1546 try {
1547 brokerInfoReceived.await();
1548 } catch (InterruptedException e) {
1549 Thread.currentThread().interrupt();
1550 throw JMSExceptionSupport.create(e);
1551 }
1552 }
1553
1554 // Package protected so that it can be used in unit tests
1555 public Transport getTransport() {
1556 return transport;
1557 }
1558
1559 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1560 producers.put(producerId, producer);
1561 }
1562
1563 public void removeProducer(ProducerId producerId) {
1564 producers.remove(producerId);
1565 }
1566
1567 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1568 dispatchers.put(consumerId, dispatcher);
1569 }
1570
1571 public void removeDispatcher(ConsumerId consumerId) {
1572 dispatchers.remove(consumerId);
1573 }
1574
1575 /**
1576 * @param o - the command to consume
1577 */
1578 public void onCommand(final Object o) {
1579 final Command command = (Command)o;
1580 if (!closed.get() && command != null) {
1581 try {
1582 command.visit(new CommandVisitorAdapter() {
1583 @Override
1584 public Response processMessageDispatch(MessageDispatch md) throws Exception {
1585 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1586 if (dispatcher != null) {
1587 // Copy in case a embedded broker is dispatching via
1588 // vm://
1589 // md.getMessage() == null to signal end of queue
1590 // browse.
1591 Message msg = md.getMessage();
1592 if (msg != null) {
1593 msg = msg.copy();
1594 msg.setReadOnlyBody(true);
1595 msg.setReadOnlyProperties(true);
1596 msg.setRedeliveryCounter(md.getRedeliveryCounter());
1597 msg.setConnection(ActiveMQConnection.this);
1598 md.setMessage(msg);
1599 }
1600 dispatcher.dispatch(md);
1601 }
1602 return null;
1603 }
1604
1605 @Override
1606 public Response processProducerAck(ProducerAck pa) throws Exception {
1607 if (pa != null && pa.getProducerId() != null) {
1608 ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1609 if (producer != null) {
1610 producer.onProducerAck(pa);
1611 }
1612 }
1613 return null;
1614 }
1615
1616 @Override
1617 public Response processBrokerInfo(BrokerInfo info) throws Exception {
1618 brokerInfo = info;
1619 brokerInfoReceived.countDown();
1620 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1621 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1622 return null;
1623 }
1624
1625 @Override
1626 public Response processConnectionError(final ConnectionError error) throws Exception {
1627 asyncConnectionThread.execute(new Runnable() {
1628 public void run() {
1629 onAsyncException(error.getException());
1630 }
1631 });
1632 return null;
1633 }
1634
1635 @Override
1636 public Response processControlCommand(ControlCommand command) throws Exception {
1637 onControlCommand(command);
1638 return null;
1639 }
1640
1641 @Override
1642 public Response processConnectionControl(ConnectionControl control) throws Exception {
1643 onConnectionControl((ConnectionControl)command);
1644 return null;
1645 }
1646
1647 @Override
1648 public Response processConsumerControl(ConsumerControl control) throws Exception {
1649 onConsumerControl((ConsumerControl)command);
1650 return null;
1651 }
1652
1653 @Override
1654 public Response processWireFormat(WireFormatInfo info) throws Exception {
1655 onWireFormatInfo((WireFormatInfo)command);
1656 return null;
1657 }
1658 });
1659 } catch (Exception e) {
1660 onAsyncException(e);
1661 }
1662
1663 }
1664 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1665 TransportListener listener = iter.next();
1666 listener.onCommand(command);
1667 }
1668 }
1669
1670 protected void onWireFormatInfo(WireFormatInfo info) {
1671 protocolVersion.set(info.getVersion());
1672 }
1673
1674 /**
1675 * Used for handling async exceptions
1676 *
1677 * @param error
1678 */
1679 public void onAsyncException(Throwable error) {
1680 if (!closed.get() && !closing.get()) {
1681 if (this.exceptionListener != null) {
1682
1683 if (!(error instanceof JMSException)) {
1684 error = JMSExceptionSupport.create(error);
1685 }
1686 final JMSException e = (JMSException)error;
1687
1688 asyncConnectionThread.execute(new Runnable() {
1689 public void run() {
1690 ActiveMQConnection.this.exceptionListener.onException(e);
1691 }
1692 });
1693
1694 } else {
1695 LOG.debug("Async exception with no exception listener: " + error, error);
1696 }
1697 }
1698 }
1699
1700 public void onException(final IOException error) {
1701 onAsyncException(error);
1702 if (!closing.get() && !closed.get()) {
1703 asyncConnectionThread.execute(new Runnable() {
1704 public void run() {
1705 transportFailed(error);
1706 ServiceSupport.dispose(ActiveMQConnection.this.transport);
1707 brokerInfoReceived.countDown();
1708
1709 for (Iterator<TransportListener> iter = transportListeners
1710 .iterator(); iter.hasNext();) {
1711 TransportListener listener = iter.next();
1712 listener.onException(error);
1713 }
1714 }
1715 });
1716 }
1717 }
1718
1719 public void transportInterupted() {
1720 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1721 ActiveMQSession s = i.next();
1722 s.clearMessagesInProgress();
1723 }
1724 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1725 TransportListener listener = iter.next();
1726 listener.transportInterupted();
1727 }
1728 }
1729
1730 public void transportResumed() {
1731 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1732 TransportListener listener = iter.next();
1733 listener.transportResumed();
1734 }
1735 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1736 ActiveMQSession s = i.next();
1737 s.deliverAcks();
1738 }
1739 }
1740
1741 /**
1742 * Create the DestinationInfo object for the temporary destination.
1743 *
1744 * @param topic - if its true topic, else queue.
1745 * @return DestinationInfo
1746 * @throws JMSException
1747 */
1748 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1749
1750 // Check if Destination info is of temporary type.
1751 ActiveMQTempDestination dest;
1752 if (topic) {
1753 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1754 } else {
1755 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1756 }
1757
1758 DestinationInfo info = new DestinationInfo();
1759 info.setConnectionId(this.info.getConnectionId());
1760 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1761 info.setDestination(dest);
1762 syncSendPacket(info);
1763
1764 dest.setConnection(this);
1765 activeTempDestinations.put(dest, dest);
1766 return dest;
1767 }
1768
1769 /**
1770 * @param destination
1771 * @throws JMSException
1772 */
1773 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1774
1775 checkClosedOrFailed();
1776
1777 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1778 ActiveMQSession s = i.next();
1779 if (s.isInUse(destination)) {
1780 throw new JMSException("A consumer is consuming from the temporary destination");
1781 }
1782 }
1783
1784 activeTempDestinations.remove(destination);
1785
1786 DestinationInfo info = new DestinationInfo();
1787 info.setConnectionId(this.info.getConnectionId());
1788 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1789 info.setDestination(destination);
1790 info.setTimeout(0);
1791 syncSendPacket(info);
1792 }
1793
1794 public boolean isDeleted(ActiveMQDestination dest) {
1795
1796 // If we are not watching the advisories.. then
1797 // we will assume that the temp destination does exist.
1798 if (advisoryConsumer == null) {
1799 return false;
1800 }
1801
1802 return !activeTempDestinations.contains(dest);
1803 }
1804
1805 public boolean isCopyMessageOnSend() {
1806 return copyMessageOnSend;
1807 }
1808
1809 public LongSequenceGenerator getLocalTransactionIdGenerator() {
1810 return localTransactionIdGenerator;
1811 }
1812
1813 public boolean isUseCompression() {
1814 return useCompression;
1815 }
1816
1817 /**
1818 * Enables the use of compression of the message bodies
1819 */
1820 public void setUseCompression(boolean useCompression) {
1821 this.useCompression = useCompression;
1822 }
1823
1824 public void destroyDestination(ActiveMQDestination destination) throws JMSException {
1825
1826 checkClosedOrFailed();
1827 ensureConnectionInfoSent();
1828
1829 DestinationInfo info = new DestinationInfo();
1830 info.setConnectionId(this.info.getConnectionId());
1831 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1832 info.setDestination(destination);
1833 info.setTimeout(0);
1834 syncSendPacket(info);
1835
1836 }
1837
1838 public boolean isDispatchAsync() {
1839 return dispatchAsync;
1840 }
1841
1842 /**
1843 * Enables or disables the default setting of whether or not consumers have
1844 * their messages <a
1845 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
1846 * synchronously or asynchronously by the broker</a>. For non-durable
1847 * topics for example we typically dispatch synchronously by default to
1848 * minimize context switches which boost performance. However sometimes its
1849 * better to go slower to ensure that a single blocked consumer socket does
1850 * not block delivery to other consumers.
1851 *
1852 * @param asyncDispatch If true then consumers created on this connection
1853 * will default to having their messages dispatched
1854 * asynchronously. The default value is false.
1855 */
1856 public void setDispatchAsync(boolean asyncDispatch) {
1857 this.dispatchAsync = asyncDispatch;
1858 }
1859
1860 public boolean isObjectMessageSerializationDefered() {
1861 return objectMessageSerializationDefered;
1862 }
1863
1864 /**
1865 * When an object is set on an ObjectMessage, the JMS spec requires the
1866 * object to be serialized by that set method. Enabling this flag causes the
1867 * object to not get serialized. The object may subsequently get serialized
1868 * if the message needs to be sent over a socket or stored to disk.
1869 */
1870 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
1871 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
1872 }
1873
1874 public InputStream createInputStream(Destination dest) throws JMSException {
1875 return createInputStream(dest, null);
1876 }
1877
1878 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
1879 return createInputStream(dest, messageSelector, false);
1880 }
1881
1882 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
1883 return doCreateInputStream(dest, messageSelector, noLocal, null);
1884 }
1885
1886 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
1887 return createInputStream(dest, null, false);
1888 }
1889
1890 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
1891 return createDurableInputStream(dest, name, messageSelector, false);
1892 }
1893
1894 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
1895 return doCreateInputStream(dest, messageSelector, noLocal, name);
1896 }
1897
1898 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException {
1899 checkClosedOrFailed();
1900 ensureConnectionInfoSent();
1901 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
1902 }
1903
1904 /**
1905 * Creates a persistent output stream; individual messages will be written
1906 * to disk/database by the broker
1907 */
1908 public OutputStream createOutputStream(Destination dest) throws JMSException {
1909 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
1910 }
1911
1912 /**
1913 * Creates a non persistent output stream; messages will not be written to
1914 * disk
1915 */
1916 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
1917 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
1918 }
1919
1920 /**
1921 * Creates an output stream allowing full control over the delivery mode,
1922 * the priority and time to live of the messages and the properties added to
1923 * messages on the stream.
1924 *
1925 * @param streamProperties defines a map of key-value pairs where the keys
1926 * are strings and the values are primitive values (numbers
1927 * and strings) which are appended to the messages similarly
1928 * to using the
1929 * {@link javax.jms.Message#setObjectProperty(String, Object)}
1930 * method
1931 */
1932 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
1933 checkClosedOrFailed();
1934 ensureConnectionInfoSent();
1935 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
1936 }
1937
1938 /**
1939 * Unsubscribes a durable subscription that has been created by a client.
1940 * <P>
1941 * This method deletes the state being maintained on behalf of the
1942 * subscriber by its provider.
1943 * <P>
1944 * It is erroneous for a client to delete a durable subscription while there
1945 * is an active <CODE>MessageConsumer </CODE> or
1946 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1947 * message is part of a pending transaction or has not been acknowledged in
1948 * the session.
1949 *
1950 * @param name the name used to identify this subscription
1951 * @throws JMSException if the session fails to unsubscribe to the durable
1952 * subscription due to some internal error.
1953 * @throws InvalidDestinationException if an invalid subscription name is
1954 * specified.
1955 * @since 1.1
1956 */
1957 public void unsubscribe(String name) throws JMSException {
1958 checkClosedOrFailed();
1959 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
1960 rsi.setConnectionId(getConnectionInfo().getConnectionId());
1961 rsi.setSubscriptionName(name);
1962 rsi.setClientId(getConnectionInfo().getClientId());
1963 syncSendPacket(rsi);
1964 }
1965
1966 /**
1967 * Internal send method optimized: - It does not copy the message - It can
1968 * only handle ActiveMQ messages. - You can specify if the send is async or
1969 * sync - Does not allow you to send /w a transaction.
1970 */
1971 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
1972 checkClosedOrFailed();
1973
1974 if (destination.isTemporary() && isDeleted(destination)) {
1975 throw new JMSException("Cannot publish to a deleted Destination: " + destination);
1976 }
1977
1978 msg.setJMSDestination(destination);
1979 msg.setJMSDeliveryMode(deliveryMode);
1980 long expiration = 0L;
1981
1982 if (!isDisableTimeStampsByDefault()) {
1983 long timeStamp = System.currentTimeMillis();
1984 msg.setJMSTimestamp(timeStamp);
1985 if (timeToLive > 0) {
1986 expiration = timeToLive + timeStamp;
1987 }
1988 }
1989
1990 msg.setJMSExpiration(expiration);
1991 msg.setJMSPriority(priority);
1992
1993 msg.setJMSRedelivered(false);
1994 msg.setMessageId(messageId);
1995
1996 msg.onSend();
1997
1998 msg.setProducerId(msg.getMessageId().getProducerId());
1999
2000 if (LOG.isDebugEnabled()) {
2001 LOG.debug("Sending message: " + msg);
2002 }
2003
2004 if (async) {
2005 asyncSendPacket(msg);
2006 } else {
2007 syncSendPacket(msg);
2008 }
2009
2010 }
2011
2012 public void addOutputStream(ActiveMQOutputStream stream) {
2013 outputStreams.add(stream);
2014 }
2015
2016 public void removeOutputStream(ActiveMQOutputStream stream) {
2017 outputStreams.remove(stream);
2018 }
2019
2020 public void addInputStream(ActiveMQInputStream stream) {
2021 inputStreams.add(stream);
2022 }
2023
2024 public void removeInputStream(ActiveMQInputStream stream) {
2025 inputStreams.remove(stream);
2026 }
2027
2028 protected void onControlCommand(ControlCommand command) {
2029 String text = command.getCommand();
2030 if (text != null) {
2031 if (text.equals("shutdown")) {
2032 LOG.info("JVM told to shutdown");
2033 System.exit(0);
2034 }
2035 }
2036 }
2037
2038 protected void onConnectionControl(ConnectionControl command) {
2039 if (command.isFaultTolerant()) {
2040 this.optimizeAcknowledge = false;
2041 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2042 ActiveMQSession s = i.next();
2043 s.setOptimizeAcknowledge(false);
2044 }
2045 }
2046 }
2047
2048 protected void onConsumerControl(ConsumerControl command) {
2049 if (command.isClose()) {
2050 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2051 ActiveMQSession s = i.next();
2052 s.close(command.getConsumerId());
2053 }
2054 } else {
2055 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2056 ActiveMQSession s = i.next();
2057 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2058 }
2059 }
2060 }
2061
2062 protected void transportFailed(IOException error) {
2063 transportFailed.set(true);
2064 if (firstFailureError == null) {
2065 firstFailureError = error;
2066 }
2067 }
2068
2069 /**
2070 * Should a JMS message be copied to a new JMS Message object as part of the
2071 * send() method in JMS. This is enabled by default to be compliant with the
2072 * JMS specification. You can disable it if you do not mutate JMS messages
2073 * after they are sent for a performance boost
2074 */
2075 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2076 this.copyMessageOnSend = copyMessageOnSend;
2077 }
2078
2079 public String toString() {
2080 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2081 }
2082
2083 protected BlobTransferPolicy createBlobTransferPolicy() {
2084 return new BlobTransferPolicy();
2085 }
2086
2087 public int getProtocolVersion() {
2088 return protocolVersion.get();
2089 }
2090
2091 public int getProducerWindowSize() {
2092 return producerWindowSize;
2093 }
2094
2095 public void setProducerWindowSize(int producerWindowSize) {
2096 this.producerWindowSize = producerWindowSize;
2097 }
2098
2099 protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2100 connectionAudit.removeDispatcher(dispatcher);
2101 }
2102
2103 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2104 return connectionAudit.isDuplicate(dispatcher, message);
2105 }
2106
2107 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2108 connectionAudit.rollbackDuplicate(dispatcher, message);
2109 }
2110 }