Home » apache-activemq-5.1.0-src » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import java.io.IOException;
   20   import java.io.InputStream;
   21   import java.io.OutputStream;
   22   import java.net.URI;
   23   import java.net.URISyntaxException;
   24   import java.util.HashMap;
   25   import java.util.Iterator;
   26   import java.util.Map;
   27   import java.util.concurrent.ConcurrentHashMap;
   28   import java.util.concurrent.CopyOnWriteArrayList;
   29   import java.util.concurrent.CountDownLatch;
   30   import java.util.concurrent.LinkedBlockingQueue;
   31   import java.util.concurrent.ThreadFactory;
   32   import java.util.concurrent.ThreadPoolExecutor;
   33   import java.util.concurrent.TimeUnit;
   34   import java.util.concurrent.atomic.AtomicBoolean;
   35   import java.util.concurrent.atomic.AtomicInteger;
   36   
   37   import javax.jms.Connection;
   38   import javax.jms.ConnectionConsumer;
   39   import javax.jms.ConnectionMetaData;
   40   import javax.jms.DeliveryMode;
   41   import javax.jms.Destination;
   42   import javax.jms.ExceptionListener;
   43   import javax.jms.IllegalStateException;
   44   import javax.jms.JMSException;
   45   import javax.jms.Queue;
   46   import javax.jms.QueueConnection;
   47   import javax.jms.QueueSession;
   48   import javax.jms.ServerSessionPool;
   49   import javax.jms.Session;
   50   import javax.jms.Topic;
   51   import javax.jms.TopicConnection;
   52   import javax.jms.TopicSession;
   53   import javax.jms.XAConnection;
   54   
   55   import org.apache.activemq.blob.BlobTransferPolicy;
   56   import org.apache.activemq.command.ActiveMQDestination;
   57   import org.apache.activemq.command.ActiveMQMessage;
   58   import org.apache.activemq.command.ActiveMQTempDestination;
   59   import org.apache.activemq.command.ActiveMQTempQueue;
   60   import org.apache.activemq.command.ActiveMQTempTopic;
   61   import org.apache.activemq.command.BrokerInfo;
   62   import org.apache.activemq.command.Command;
   63   import org.apache.activemq.command.CommandTypes;
   64   import org.apache.activemq.command.ConnectionControl;
   65   import org.apache.activemq.command.ConnectionError;
   66   import org.apache.activemq.command.ConnectionId;
   67   import org.apache.activemq.command.ConnectionInfo;
   68   import org.apache.activemq.command.ConsumerControl;
   69   import org.apache.activemq.command.ConsumerId;
   70   import org.apache.activemq.command.ConsumerInfo;
   71   import org.apache.activemq.command.ControlCommand;
   72   import org.apache.activemq.command.DestinationInfo;
   73   import org.apache.activemq.command.ExceptionResponse;
   74   import org.apache.activemq.command.Message;
   75   import org.apache.activemq.command.MessageDispatch;
   76   import org.apache.activemq.command.MessageId;
   77   import org.apache.activemq.command.ProducerAck;
   78   import org.apache.activemq.command.ProducerId;
   79   import org.apache.activemq.command.RemoveSubscriptionInfo;
   80   import org.apache.activemq.command.Response;
   81   import org.apache.activemq.command.SessionId;
   82   import org.apache.activemq.command.ShutdownInfo;
   83   import org.apache.activemq.command.WireFormatInfo;
   84   import org.apache.activemq.management.JMSConnectionStatsImpl;
   85   import org.apache.activemq.management.JMSStatsImpl;
   86   import org.apache.activemq.management.StatsCapable;
   87   import org.apache.activemq.management.StatsImpl;
   88   import org.apache.activemq.state.CommandVisitorAdapter;
   89   import org.apache.activemq.thread.TaskRunnerFactory;
   90   import org.apache.activemq.transport.Transport;
   91   import org.apache.activemq.transport.TransportListener;
   92   import org.apache.activemq.util.IdGenerator;
   93   import org.apache.activemq.util.IntrospectionSupport;
   94   import org.apache.activemq.util.JMSExceptionSupport;
   95   import org.apache.activemq.util.LongSequenceGenerator;
   96   import org.apache.activemq.util.ServiceSupport;
   97   import org.apache.activemq.advisory.DestinationSource;
   98   import org.apache.commons.logging.Log;
   99   import org.apache.commons.logging.LogFactory;
  100   
  101   public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
  102   
  103       public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
  104       public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
  105       public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  106   
  107       private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
  108       private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
  109   
  110       public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
  111   
  112       protected boolean dispatchAsync=true;
  113       protected boolean alwaysSessionAsync = true;
  114   
  115       private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000);
  116       private final ThreadPoolExecutor asyncConnectionThread;
  117   
  118       // Connection state variables
  119       private final ConnectionInfo info;
  120       private ExceptionListener exceptionListener;
  121       private boolean clientIDSet;
  122       private boolean isConnectionInfoSentToBroker;
  123       private boolean userSpecifiedClientID;
  124   
  125       // Configuration options variables
  126       private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
  127       private BlobTransferPolicy blobTransferPolicy;
  128       private RedeliveryPolicy redeliveryPolicy;
  129       private MessageTransformer transformer;
  130   
  131       private boolean disableTimeStampsByDefault;
  132       private boolean optimizedMessageDispatch = true;
  133       private boolean copyMessageOnSend = true;
  134       private boolean useCompression;
  135       private boolean objectMessageSerializationDefered;
  136       private boolean useAsyncSend;
  137       private boolean optimizeAcknowledge;
  138       private boolean nestedMapAndListEnabled = true;
  139       private boolean useRetroactiveConsumer;
  140       private boolean exclusiveConsumer;
  141       private boolean alwaysSyncSend;
  142       private int closeTimeout = 15000;
  143       private boolean watchTopicAdvisories = true;
  144       private long warnAboutUnstartedConnectionTimeout = 500L;
  145       private int sendTimeout =0;
  146   
  147       private final Transport transport;
  148       private final IdGenerator clientIdGenerator;
  149       private final JMSStatsImpl factoryStats;
  150       private final JMSConnectionStatsImpl stats;
  151   
  152       private final AtomicBoolean started = new AtomicBoolean(false);
  153       private final AtomicBoolean closing = new AtomicBoolean(false);
  154       private final AtomicBoolean closed = new AtomicBoolean(false);
  155       private final AtomicBoolean transportFailed = new AtomicBoolean(false);
  156       private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
  157       private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
  158       private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
  159       private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
  160       private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
  161   
  162       // Maps ConsumerIds to ActiveMQConsumer objects
  163       private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
  164       private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
  165       private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
  166       private final SessionId connectionSessionId;
  167       private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
  168       private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
  169       private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
  170       private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
  171   
  172       private AdvisoryConsumer advisoryConsumer;
  173       private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
  174       private BrokerInfo brokerInfo;
  175       private IOException firstFailureError;
  176       private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
  177   
  178       // Assume that protocol is the latest. Change to the actual protocol
  179       // version when a WireFormatInfo is received.
  180       private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
  181       private long timeCreated;
  182       private ConnectionAudit connectionAudit = new ConnectionAudit();
  183       private DestinationSource destinationSource;
  184   
  185       /**
  186        * Construct an <code>ActiveMQConnection</code>
  187        * 
  188        * @param transport
  189        * @param factoryStats
  190        * @throws Exception
  191        */
  192       protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
  193   
  194           this.transport = transport;
  195           this.clientIdGenerator = clientIdGenerator;
  196           this.factoryStats = factoryStats;
  197   
  198           // Configure a single threaded executor who's core thread can timeout if
  199           // idle
  200           asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
  201               public Thread newThread(Runnable r) {
  202                   Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport);
  203                   thread.setDaemon(true);
  204                   return thread;
  205               }
  206           });
  207           // asyncConnectionThread.allowCoreThreadTimeOut(true);
  208   
  209           this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
  210           this.info.setManageable(true);
  211           this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
  212   
  213           this.transport.setTransportListener(this);
  214   
  215           this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
  216           this.factoryStats.addConnection(this);
  217           this.timeCreated = System.currentTimeMillis();
  218           this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
  219       }
  220   
  221       protected void setUserName(String userName) {
  222           this.info.setUserName(userName);
  223       }
  224   
  225       protected void setPassword(String password) {
  226           this.info.setPassword(password);
  227       }
  228   
  229       /**
  230        * A static helper method to create a new connection
  231        * 
  232        * @return an ActiveMQConnection
  233        * @throws JMSException
  234        */
  235       public static ActiveMQConnection makeConnection() throws JMSException {
  236           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
  237           return (ActiveMQConnection)factory.createConnection();
  238       }
  239   
  240       /**
  241        * A static helper method to create a new connection
  242        * 
  243        * @param uri
  244        * @return and ActiveMQConnection
  245        * @throws JMSException
  246        */
  247       public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
  248           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
  249           return (ActiveMQConnection)factory.createConnection();
  250       }
  251   
  252       /**
  253        * A static helper method to create a new connection
  254        * 
  255        * @param user
  256        * @param password
  257        * @param uri
  258        * @return an ActiveMQConnection
  259        * @throws JMSException
  260        */
  261       public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
  262           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
  263           return (ActiveMQConnection)factory.createConnection();
  264       }
  265   
  266       /**
  267        * @return a number unique for this connection
  268        */
  269       public JMSConnectionStatsImpl getConnectionStats() {
  270           return stats;
  271       }
  272   
  273       /**
  274        * Creates a <CODE>Session</CODE> object.
  275        * 
  276        * @param transacted indicates whether the session is transacted
  277        * @param acknowledgeMode indicates whether the consumer or the client will
  278        *                acknowledge any messages it receives; ignored if the
  279        *                session is transacted. Legal values are
  280        *                <code>Session.AUTO_ACKNOWLEDGE</code>,
  281        *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
  282        *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
  283        * @return a newly created session
  284        * @throws JMSException if the <CODE>Connection</CODE> object fails to
  285        *                 create a session due to some internal error or lack of
  286        *                 support for the specific transaction and acknowledgement
  287        *                 mode.
  288        * @see Session#AUTO_ACKNOWLEDGE
  289        * @see Session#CLIENT_ACKNOWLEDGE
  290        * @see Session#DUPS_OK_ACKNOWLEDGE
  291        * @since 1.1
  292        */
  293       public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
  294           checkClosedOrFailed();
  295           ensureConnectionInfoSent();
  296           return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
  297               ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
  298       }
  299   
  300       /**
  301        * @return sessionId
  302        */
  303       protected SessionId getNextSessionId() {
  304           return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
  305       }
  306   
  307       /**
  308        * Gets the client identifier for this connection.
  309        * <P>
  310        * This value is specific to the JMS provider. It is either preconfigured by
  311        * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
  312        * dynamically by the application by calling the <code>setClientID</code>
  313        * method.
  314        * 
  315        * @return the unique client identifier
  316        * @throws JMSException if the JMS provider fails to return the client ID
  317        *                 for this connection due to some internal error.
  318        */
  319       public String getClientID() throws JMSException {
  320           checkClosedOrFailed();
  321           return this.info.getClientId();
  322       }
  323   
  324       /**
  325        * Sets the client identifier for this connection.
  326        * <P>
  327        * The preferred way to assign a JMS client's client identifier is for it to
  328        * be configured in a client-specific <CODE>ConnectionFactory</CODE>
  329        * object and transparently assigned to the <CODE>Connection</CODE> object
  330        * it creates.
  331        * <P>
  332        * Alternatively, a client can set a connection's client identifier using a
  333        * provider-specific value. The facility to set a connection's client
  334        * identifier explicitly is not a mechanism for overriding the identifier
  335        * that has been administratively configured. It is provided for the case
  336        * where no administratively specified identifier exists. If one does exist,
  337        * an attempt to change it by setting it must throw an
  338        * <CODE>IllegalStateException</CODE>. If a client sets the client
  339        * identifier explicitly, it must do so immediately after it creates the
  340        * connection and before any other action on the connection is taken. After
  341        * this point, setting the client identifier is a programming error that
  342        * should throw an <CODE>IllegalStateException</CODE>.
  343        * <P>
  344        * The purpose of the client identifier is to associate a connection and its
  345        * objects with a state maintained on behalf of the client by a provider.
  346        * The only such state identified by the JMS API is that required to support
  347        * durable subscriptions.
  348        * <P>
  349        * If another connection with the same <code>clientID</code> is already
  350        * running when this method is called, the JMS provider should detect the
  351        * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
  352        * 
  353        * @param newClientID the unique client identifier
  354        * @throws JMSException if the JMS provider fails to set the client ID for
  355        *                 this connection due to some internal error.
  356        * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
  357        *                 invalid or duplicate client ID.
  358        * @throws javax.jms.IllegalStateException if the JMS client attempts to set
  359        *                 a connection's client ID at the wrong time or when it has
  360        *                 been administratively configured.
  361        */
  362       public void setClientID(String newClientID) throws JMSException {
  363           checkClosedOrFailed();
  364   
  365           if (this.clientIDSet) {
  366               throw new IllegalStateException("The clientID has already been set");
  367           }
  368   
  369           if (this.isConnectionInfoSentToBroker) {
  370               throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
  371           }
  372   
  373           this.info.setClientId(newClientID);
  374           this.userSpecifiedClientID = true;
  375           ensureConnectionInfoSent();
  376       }
  377   
  378       /**
  379        * Sets the default client id that the connection will use if explicitly not
  380        * set with the setClientId() call.
  381        */
  382       public void setDefaultClientID(String clientID) throws JMSException {
  383           this.info.setClientId(clientID);
  384           this.userSpecifiedClientID = true;
  385       }
  386   
  387       /**
  388        * Gets the metadata for this connection.
  389        * 
  390        * @return the connection metadata
  391        * @throws JMSException if the JMS provider fails to get the connection
  392        *                 metadata for this connection.
  393        * @see javax.jms.ConnectionMetaData
  394        */
  395       public ConnectionMetaData getMetaData() throws JMSException {
  396           checkClosedOrFailed();
  397           return ActiveMQConnectionMetaData.INSTANCE;
  398       }
  399   
  400       /**
  401        * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
  402        * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
  403        * associated with it.
  404        * 
  405        * @return the <CODE>ExceptionListener</CODE> for this connection, or
  406        *         null. if no <CODE>ExceptionListener</CODE> is associated with
  407        *         this connection.
  408        * @throws JMSException if the JMS provider fails to get the
  409        *                 <CODE>ExceptionListener</CODE> for this connection.
  410        * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
  411        */
  412       public ExceptionListener getExceptionListener() throws JMSException {
  413           checkClosedOrFailed();
  414           return this.exceptionListener;
  415       }
  416   
  417       /**
  418        * Sets an exception listener for this connection.
  419        * <P>
  420        * If a JMS provider detects a serious problem with a connection, it informs
  421        * the connection's <CODE> ExceptionListener</CODE>, if one has been
  422        * registered. It does this by calling the listener's <CODE>onException
  423        * </CODE>
  424        * method, passing it a <CODE>JMSException</CODE> object describing the
  425        * problem.
  426        * <P>
  427        * An exception listener allows a client to be notified of a problem
  428        * asynchronously. Some connections only consume messages, so they would
  429        * have no other way to learn their connection has failed.
  430        * <P>
  431        * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
  432        * <P>
  433        * A JMS provider should attempt to resolve connection problems itself
  434        * before it notifies the client of them.
  435        * 
  436        * @param listener the exception listener
  437        * @throws JMSException if the JMS provider fails to set the exception
  438        *                 listener for this connection.
  439        */
  440       public void setExceptionListener(ExceptionListener listener) throws JMSException {
  441           checkClosedOrFailed();
  442           this.exceptionListener = listener;
  443       }
  444   
  445       /**
  446        * Starts (or restarts) a connection's delivery of incoming messages. A call
  447        * to <CODE>start</CODE> on a connection that has already been started is
  448        * ignored.
  449        * 
  450        * @throws JMSException if the JMS provider fails to start message delivery
  451        *                 due to some internal error.
  452        * @see javax.jms.Connection#stop()
  453        */
  454       public void start() throws JMSException {
  455           checkClosedOrFailed();
  456           ensureConnectionInfoSent();
  457           if (started.compareAndSet(false, true)) {
  458               for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
  459                   ActiveMQSession session = i.next();
  460                   session.start();
  461               }
  462           }
  463       }
  464   
  465       /**
  466        * Temporarily stops a connection's delivery of incoming messages. Delivery
  467        * can be restarted using the connection's <CODE>start</CODE> method. When
  468        * the connection is stopped, delivery to all the connection's message
  469        * consumers is inhibited: synchronous receives block, and messages are not
  470        * delivered to message listeners.
  471        * <P>
  472        * This call blocks until receives and/or message listeners in progress have
  473        * completed.
  474        * <P>
  475        * Stopping a connection has no effect on its ability to send messages. A
  476        * call to <CODE>stop</CODE> on a connection that has already been stopped
  477        * is ignored.
  478        * <P>
  479        * A call to <CODE>stop</CODE> must not return until delivery of messages
  480        * has paused. This means that a client can rely on the fact that none of
  481        * its message listeners will be called and that all threads of control
  482        * waiting for <CODE>receive</CODE> calls to return will not return with a
  483        * message until the connection is restarted. The receive timers for a
  484        * stopped connection continue to advance, so receives may time out while
  485        * the connection is stopped.
  486        * <P>
  487        * If message listeners are running when <CODE>stop</CODE> is invoked, the
  488        * <CODE>stop</CODE> call must wait until all of them have returned before
  489        * it may return. While these message listeners are completing, they must
  490        * have the full services of the connection available to them.
  491        * 
  492        * @throws JMSException if the JMS provider fails to stop message delivery
  493        *                 due to some internal error.
  494        * @see javax.jms.Connection#start()
  495        */
  496       public void stop() throws JMSException {
  497           checkClosedOrFailed();
  498           if (started.compareAndSet(true, false)) {
  499               for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
  500                   ActiveMQSession s = i.next();
  501                   s.stop();
  502               }
  503           }
  504       }
  505   
  506       /**
  507        * Closes the connection.
  508        * <P>
  509        * Since a provider typically allocates significant resources outside the
  510        * JVM on behalf of a connection, clients should close these resources when
  511        * they are not needed. Relying on garbage collection to eventually reclaim
  512        * these resources may not be timely enough.
  513        * <P>
  514        * There is no need to close the sessions, producers, and consumers of a
  515        * closed connection.
  516        * <P>
  517        * Closing a connection causes all temporary destinations to be deleted.
  518        * <P>
  519        * When this method is invoked, it should not return until message
  520        * processing has been shut down in an orderly fashion. This means that all
  521        * message listeners that may have been running have returned, and that all
  522        * pending receives have returned. A close terminates all pending message
  523        * receives on the connection's sessions' consumers. The receives may return
  524        * with a message or with null, depending on whether there was a message
  525        * available at the time of the close. If one or more of the connection's
  526        * sessions' message listeners is processing a message at the time when
  527        * connection <CODE>close</CODE> is invoked, all the facilities of the
  528        * connection and its sessions must remain available to those listeners
  529        * until they return control to the JMS provider.
  530        * <P>
  531        * Closing a connection causes any of its sessions' transactions in progress
  532        * to be rolled back. In the case where a session's work is coordinated by
  533        * an external transaction manager, a session's <CODE>commit</CODE> and
  534        * <CODE> rollback</CODE> methods are not used and the result of a closed
  535        * session's work is determined later by the transaction manager. Closing a
  536        * connection does NOT force an acknowledgment of client-acknowledged
  537        * sessions.
  538        * <P>
  539        * Invoking the <CODE>acknowledge</CODE> method of a received message from
  540        * a closed connection's session must throw an
  541        * <CODE>IllegalStateException</CODE>. Closing a closed connection must
  542        * NOT throw an exception.
  543        * 
  544        * @throws JMSException if the JMS provider fails to close the connection
  545        *                 due to some internal error. For example, a failure to
  546        *                 release resources or to close a socket connection can
  547        *                 cause this exception to be thrown.
  548        */
  549       public void close() throws JMSException {
  550           try {
  551               // If we were running, lets stop first.
  552               stop();
  553   
  554               synchronized (this) {
  555                   if (!closed.get()) {
  556                       closing.set(true);
  557   
  558                       if (destinationSource != null) {
  559                           destinationSource.stop();
  560                           destinationSource = null;
  561                       }
  562                       if (advisoryConsumer != null) {
  563                           advisoryConsumer.dispose();
  564                           advisoryConsumer = null;
  565                       }
  566   
  567                       for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
  568                           ActiveMQSession s = i.next();
  569                           s.dispose();
  570                       }
  571                       for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
  572                           ActiveMQConnectionConsumer c = i.next();
  573                           c.dispose();
  574                       }
  575                       for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
  576                           ActiveMQInputStream c = i.next();
  577                           c.dispose();
  578                       }
  579                       for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
  580                           ActiveMQOutputStream c = i.next();
  581                           c.dispose();
  582                       }
  583   
  584                       if (isConnectionInfoSentToBroker) {
  585                           // If we announced ourselfs to the broker.. Try to let
  586                           // the broker
  587                           // know that the connection is being shutdown.
  588                           doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
  589                           doAsyncSendPacket(new ShutdownInfo());
  590                       }
  591   
  592                       ServiceSupport.dispose(this.transport);
  593   
  594                       started.set(false);
  595   
  596                       // TODO if we move the TaskRunnerFactory to the connection
  597                       // factory
  598                       // then we may need to call
  599                       // factory.onConnectionClose(this);
  600                       sessionTaskRunner.shutdown();
  601                       
  602                       if (asyncConnectionThread != null){
  603                       	asyncConnectionThread.shutdown();
  604                       }
  605   
  606                       closed.set(true);
  607                       closing.set(false);
  608                   }
  609               }
  610           } finally {
  611               factoryStats.removeConnection(this);
  612           }
  613       }
  614   
  615       /**
  616        * Tells the broker to terminate its VM. This can be used to cleanly
  617        * terminate a broker running in a standalone java process. Server must have
  618        * property enable.vm.shutdown=true defined to allow this to work.
  619        */
  620       // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
  621       // implemented.
  622       /*
  623        * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
  624        * command = new BrokerAdminCommand();
  625        * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
  626        * asyncSendPacket(command); }
  627        */
  628   
  629       /**
  630        * Create a durable connection consumer for this connection (optional
  631        * operation). This is an expert facility not used by regular JMS clients.
  632        * 
  633        * @param topic topic to access
  634        * @param subscriptionName durable subscription name
  635        * @param messageSelector only messages with properties matching the message
  636        *                selector expression are delivered. A value of null or an
  637        *                empty string indicates that there is no message selector
  638        *                for the message consumer.
  639        * @param sessionPool the server session pool to associate with this durable
  640        *                connection consumer
  641        * @param maxMessages the maximum number of messages that can be assigned to
  642        *                a server session at one time
  643        * @return the durable connection consumer
  644        * @throws JMSException if the <CODE>Connection</CODE> object fails to
  645        *                 create a connection consumer due to some internal error
  646        *                 or invalid arguments for <CODE>sessionPool</CODE> and
  647        *                 <CODE>messageSelector</CODE>.
  648        * @throws javax.jms.InvalidDestinationException if an invalid destination
  649        *                 is specified.
  650        * @throws javax.jms.InvalidSelectorException if the message selector is
  651        *                 invalid.
  652        * @see javax.jms.ConnectionConsumer
  653        * @since 1.1
  654        */
  655       public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
  656           throws JMSException {
  657           return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
  658       }
  659   
  660       /**
  661        * Create a durable connection consumer for this connection (optional
  662        * operation). This is an expert facility not used by regular JMS clients.
  663        * 
  664        * @param topic topic to access
  665        * @param subscriptionName durable subscription name
  666        * @param messageSelector only messages with properties matching the message
  667        *                selector expression are delivered. A value of null or an
  668        *                empty string indicates that there is no message selector
  669        *                for the message consumer.
  670        * @param sessionPool the server session pool to associate with this durable
  671        *                connection consumer
  672        * @param maxMessages the maximum number of messages that can be assigned to
  673        *                a server session at one time
  674        * @param noLocal set true if you want to filter out messages published
  675        *                locally
  676        * @return the durable connection consumer
  677        * @throws JMSException if the <CODE>Connection</CODE> object fails to
  678        *                 create a connection consumer due to some internal error
  679        *                 or invalid arguments for <CODE>sessionPool</CODE> and
  680        *                 <CODE>messageSelector</CODE>.
  681        * @throws javax.jms.InvalidDestinationException if an invalid destination
  682        *                 is specified.
  683        * @throws javax.jms.InvalidSelectorException if the message selector is
  684        *                 invalid.
  685        * @see javax.jms.ConnectionConsumer
  686        * @since 1.1
  687        */
  688       public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
  689                                                                 boolean noLocal) throws JMSException {
  690           checkClosedOrFailed();
  691           ensureConnectionInfoSent();
  692           SessionId sessionId = new SessionId(info.getConnectionId(), -1);
  693           ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
  694           info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
  695           info.setSubscriptionName(subscriptionName);
  696           info.setSelector(messageSelector);
  697           info.setPrefetchSize(maxMessages);
  698           info.setDispatchAsync(isDispatchAsync());
  699   
  700           // Allows the options on the destination to configure the consumerInfo
  701           if (info.getDestination().getOptions() != null) {
  702               Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
  703               IntrospectionSupport.setProperties(this.info, options, "consumer.");
  704           }
  705   
  706           return new ActiveMQConnectionConsumer(this, sessionPool, info);
  707       }
  708   
  709       // Properties
  710       // -------------------------------------------------------------------------
  711   
  712       /**
  713        * Returns true if this connection has been started
  714        * 
  715        * @return true if this Connection is started
  716        */
  717       public boolean isStarted() {
  718           return started.get();
  719       }
  720   
  721       /**
  722        * Returns true if the connection is closed
  723        */
  724       public boolean isClosed() {
  725           return closed.get();
  726       }
  727   
  728       /**
  729        * Returns true if the connection is in the process of being closed
  730        */
  731       public boolean isClosing() {
  732           return closing.get();
  733       }
  734   
  735       /**
  736        * Returns true if the underlying transport has failed
  737        */
  738       public boolean isTransportFailed() {
  739           return transportFailed.get();
  740       }
  741   
  742       /**
  743        * @return Returns the prefetchPolicy.
  744        */
  745       public ActiveMQPrefetchPolicy getPrefetchPolicy() {
  746           return prefetchPolicy;
  747       }
  748   
  749       /**
  750        * Sets the <a
  751        * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
  752        * policy</a> for consumers created by this connection.
  753        */
  754       public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
  755           this.prefetchPolicy = prefetchPolicy;
  756       }
  757   
  758       /**
  759        */
  760       public Transport getTransportChannel() {
  761           return transport;
  762       }
  763   
  764       /**
  765        * @return Returns the clientID of the connection, forcing one to be
  766        *         generated if one has not yet been configured.
  767        */
  768       public String getInitializedClientID() throws JMSException {
  769           ensureConnectionInfoSent();
  770           return info.getClientId();
  771       }
  772   
  773       /**
  774        * @return Returns the timeStampsDisableByDefault.
  775        */
  776       public boolean isDisableTimeStampsByDefault() {
  777           return disableTimeStampsByDefault;
  778       }
  779   
  780       /**
  781        * Sets whether or not timestamps on messages should be disabled or not. If
  782        * you disable them it adds a small performance boost.
  783        */
  784       public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
  785           this.disableTimeStampsByDefault = timeStampsDisableByDefault;
  786       }
  787   
  788       /**
  789        * @return Returns the dispatchOptimizedMessage.
  790        */
  791       public boolean isOptimizedMessageDispatch() {
  792           return optimizedMessageDispatch;
  793       }
  794   
  795       /**
  796        * If this flag is set then an larger prefetch limit is used - only
  797        * applicable for durable topic subscribers.
  798        */
  799       public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
  800           this.optimizedMessageDispatch = dispatchOptimizedMessage;
  801       }
  802   
  803       /**
  804        * @return Returns the closeTimeout.
  805        */
  806       public int getCloseTimeout() {
  807           return closeTimeout;
  808       }
  809   
  810       /**
  811        * Sets the timeout before a close is considered complete. Normally a
  812        * close() on a connection waits for confirmation from the broker; this
  813        * allows that operation to timeout to save the client hanging if there is
  814        * no broker
  815        */
  816       public void setCloseTimeout(int closeTimeout) {
  817           this.closeTimeout = closeTimeout;
  818       }
  819   
  820       /**
  821        * @return ConnectionInfo
  822        */
  823       public ConnectionInfo getConnectionInfo() {
  824           return this.info;
  825       }
  826   
  827       public boolean isUseRetroactiveConsumer() {
  828           return useRetroactiveConsumer;
  829       }
  830   
  831       /**
  832        * Sets whether or not retroactive consumers are enabled. Retroactive
  833        * consumers allow non-durable topic subscribers to receive old messages
  834        * that were published before the non-durable subscriber started.
  835        */
  836       public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
  837           this.useRetroactiveConsumer = useRetroactiveConsumer;
  838       }
  839   
  840       public boolean isNestedMapAndListEnabled() {
  841           return nestedMapAndListEnabled;
  842       }
  843   
  844       /**
  845        * Enables/disables whether or not Message properties and MapMessage entries
  846        * support <a
  847        * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
  848        * Structures</a> of Map and List objects
  849        */
  850       public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
  851           this.nestedMapAndListEnabled = structuredMapsEnabled;
  852       }
  853   
  854       public boolean isExclusiveConsumer() {
  855           return exclusiveConsumer;
  856       }
  857   
  858       /**
  859        * Enables or disables whether or not queue consumers should be exclusive or
  860        * not for example to preserve ordering when not using <a
  861        * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
  862        * 
  863        * @param exclusiveConsumer
  864        */
  865       public void setExclusiveConsumer(boolean exclusiveConsumer) {
  866           this.exclusiveConsumer = exclusiveConsumer;
  867       }
  868   
  869       /**
  870        * Adds a transport listener so that a client can be notified of events in
  871        * the underlying transport
  872        */
  873       public void addTransportListener(TransportListener transportListener) {
  874           transportListeners.add(transportListener);
  875       }
  876   
  877       public void removeTransportListener(TransportListener transportListener) {
  878           transportListeners.remove(transportListener);
  879       }
  880   
  881       public TaskRunnerFactory getSessionTaskRunner() {
  882           return sessionTaskRunner;
  883       }
  884   
  885       public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
  886           this.sessionTaskRunner = sessionTaskRunner;
  887       }
  888   
  889       public MessageTransformer getTransformer() {
  890           return transformer;
  891       }
  892   
  893       /**
  894        * Sets the transformer used to transform messages before they are sent on
  895        * to the JMS bus or when they are received from the bus but before they are
  896        * delivered to the JMS client
  897        */
  898       public void setTransformer(MessageTransformer transformer) {
  899           this.transformer = transformer;
  900       }
  901   
  902       /**
  903        * @return the statsEnabled
  904        */
  905       public boolean isStatsEnabled() {
  906           return this.stats.isEnabled();
  907       }
  908   
  909       /**
  910        * @param statsEnabled the statsEnabled to set
  911        */
  912       public void setStatsEnabled(boolean statsEnabled) {
  913           this.stats.setEnabled(statsEnabled);
  914       }
  915   
  916       /**
  917        * Returns the {@link DestinationSource} object which can be used to listen to destinations
  918        * being created or destroyed or to enquire about the current destinations available on the broker
  919        *
  920        * @return a lazily created destination source
  921        * @throws JMSException
  922        */
  923       public DestinationSource getDestinationSource() throws JMSException {
  924           if (destinationSource == null) {
  925               destinationSource = new DestinationSource(this);
  926               destinationSource.start();
  927           }
  928           return destinationSource;
  929       }
  930   
  931       // Implementation methods
  932       // -------------------------------------------------------------------------
  933   
  934       /**
  935        * Used internally for adding Sessions to the Connection
  936        * 
  937        * @param session
  938        * @throws JMSException
  939        * @throws JMSException
  940        */
  941       protected void addSession(ActiveMQSession session) throws JMSException {
  942           this.sessions.add(session);
  943           if (sessions.size() > 1 || session.isTransacted()) {
  944               optimizedMessageDispatch = false;
  945           }
  946       }
  947   
  948       /**
  949        * Used interanlly for removing Sessions from a Connection
  950        * 
  951        * @param session
  952        */
  953       protected void removeSession(ActiveMQSession session) {
  954           this.sessions.remove(session);
  955           this.removeDispatcher(session);
  956       }
  957   
  958       /**
  959        * Add a ConnectionConsumer
  960        * 
  961        * @param connectionConsumer
  962        * @throws JMSException
  963        */
  964       protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
  965           this.connectionConsumers.add(connectionConsumer);
  966       }
  967   
  968       /**
  969        * Remove a ConnectionConsumer
  970        * 
  971        * @param connectionConsumer
  972        */
  973       protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
  974           this.connectionConsumers.remove(connectionConsumer);
  975           this.removeDispatcher(connectionConsumer);
  976       }
  977   
  978       /**
  979        * Creates a <CODE>TopicSession</CODE> object.
  980        * 
  981        * @param transacted indicates whether the session is transacted
  982        * @param acknowledgeMode indicates whether the consumer or the client will
  983        *                acknowledge any messages it receives; ignored if the
  984        *                session is transacted. Legal values are
  985        *                <code>Session.AUTO_ACKNOWLEDGE</code>,
  986        *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
  987        *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
  988        * @return a newly created topic session
  989        * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
  990        *                 to create a session due to some internal error or lack of
  991        *                 support for the specific transaction and acknowledgement
  992        *                 mode.
  993        * @see Session#AUTO_ACKNOWLEDGE
  994        * @see Session#CLIENT_ACKNOWLEDGE
  995        * @see Session#DUPS_OK_ACKNOWLEDGE
  996        */
  997       public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
  998           return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
  999       }
 1000   
 1001       /**
 1002        * Creates a connection consumer for this connection (optional operation).
 1003        * This is an expert facility not used by regular JMS clients.
 1004        * 
 1005        * @param topic the topic to access
 1006        * @param messageSelector only messages with properties matching the message
 1007        *                selector expression are delivered. A value of null or an
 1008        *                empty string indicates that there is no message selector
 1009        *                for the message consumer.
 1010        * @param sessionPool the server session pool to associate with this
 1011        *                connection consumer
 1012        * @param maxMessages the maximum number of messages that can be assigned to
 1013        *                a server session at one time
 1014        * @return the connection consumer
 1015        * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
 1016        *                 to create a connection consumer due to some internal
 1017        *                 error or invalid arguments for <CODE>sessionPool</CODE>
 1018        *                 and <CODE>messageSelector</CODE>.
 1019        * @throws javax.jms.InvalidDestinationException if an invalid topic is
 1020        *                 specified.
 1021        * @throws javax.jms.InvalidSelectorException if the message selector is
 1022        *                 invalid.
 1023        * @see javax.jms.ConnectionConsumer
 1024        */
 1025       public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
 1026           return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
 1027       }
 1028   
 1029       /**
 1030        * Creates a connection consumer for this connection (optional operation).
 1031        * This is an expert facility not used by regular JMS clients.
 1032        * 
 1033        * @param queue the queue to access
 1034        * @param messageSelector only messages with properties matching the message
 1035        *                selector expression are delivered. A value of null or an
 1036        *                empty string indicates that there is no message selector
 1037        *                for the message consumer.
 1038        * @param sessionPool the server session pool to associate with this
 1039        *                connection consumer
 1040        * @param maxMessages the maximum number of messages that can be assigned to
 1041        *                a server session at one time
 1042        * @return the connection consumer
 1043        * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
 1044        *                 to create a connection consumer due to some internal
 1045        *                 error or invalid arguments for <CODE>sessionPool</CODE>
 1046        *                 and <CODE>messageSelector</CODE>.
 1047        * @throws javax.jms.InvalidDestinationException if an invalid queue is
 1048        *                 specified.
 1049        * @throws javax.jms.InvalidSelectorException if the message selector is
 1050        *                 invalid.
 1051        * @see javax.jms.ConnectionConsumer
 1052        */
 1053       public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
 1054           return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
 1055       }
 1056   
 1057       /**
 1058        * Creates a connection consumer for this connection (optional operation).
 1059        * This is an expert facility not used by regular JMS clients.
 1060        * 
 1061        * @param destination the destination to access
 1062        * @param messageSelector only messages with properties matching the message
 1063        *                selector expression are delivered. A value of null or an
 1064        *                empty string indicates that there is no message selector
 1065        *                for the message consumer.
 1066        * @param sessionPool the server session pool to associate with this
 1067        *                connection consumer
 1068        * @param maxMessages the maximum number of messages that can be assigned to
 1069        *                a server session at one time
 1070        * @return the connection consumer
 1071        * @throws JMSException if the <CODE>Connection</CODE> object fails to
 1072        *                 create a connection consumer due to some internal error
 1073        *                 or invalid arguments for <CODE>sessionPool</CODE> and
 1074        *                 <CODE>messageSelector</CODE>.
 1075        * @throws javax.jms.InvalidDestinationException if an invalid destination
 1076        *                 is specified.
 1077        * @throws javax.jms.InvalidSelectorException if the message selector is
 1078        *                 invalid.
 1079        * @see javax.jms.ConnectionConsumer
 1080        * @since 1.1
 1081        */
 1082       public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
 1083           return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
 1084       }
 1085   
 1086       public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
 1087           throws JMSException {
 1088   
 1089           checkClosedOrFailed();
 1090           ensureConnectionInfoSent();
 1091   
 1092           ConsumerId consumerId = createConsumerId();
 1093           ConsumerInfo info = new ConsumerInfo(consumerId);
 1094           info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
 1095           info.setSelector(messageSelector);
 1096           info.setPrefetchSize(maxMessages);
 1097           info.setNoLocal(noLocal);
 1098           info.setDispatchAsync(isDispatchAsync());
 1099   
 1100           // Allows the options on the destination to configure the consumerInfo
 1101           if (info.getDestination().getOptions() != null) {
 1102               Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
 1103               IntrospectionSupport.setProperties(info, options, "consumer.");
 1104           }
 1105   
 1106           return new ActiveMQConnectionConsumer(this, sessionPool, info);
 1107       }
 1108   
 1109       /**
 1110        * @return
 1111        */
 1112       private ConsumerId createConsumerId() {
 1113           return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
 1114       }
 1115   
 1116       /**
 1117        * @return
 1118        */
 1119       private ProducerId createProducerId() {
 1120           return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
 1121       }
 1122   
 1123       /**
 1124        * Creates a <CODE>QueueSession</CODE> object.
 1125        * 
 1126        * @param transacted indicates whether the session is transacted
 1127        * @param acknowledgeMode indicates whether the consumer or the client will
 1128        *                acknowledge any messages it receives; ignored if the
 1129        *                session is transacted. Legal values are
 1130        *                <code>Session.AUTO_ACKNOWLEDGE</code>,
 1131        *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
 1132        *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
 1133        * @return a newly created queue session
 1134        * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
 1135        *                 to create a session due to some internal error or lack of
 1136        *                 support for the specific transaction and acknowledgement
 1137        *                 mode.
 1138        * @see Session#AUTO_ACKNOWLEDGE
 1139        * @see Session#CLIENT_ACKNOWLEDGE
 1140        * @see Session#DUPS_OK_ACKNOWLEDGE
 1141        */
 1142       public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
 1143           return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
 1144       }
 1145   
 1146       /**
 1147        * Ensures that the clientID was manually specified and not auto-generated.
 1148        * If the clientID was not specified this method will throw an exception.
 1149        * This method is used to ensure that the clientID + durableSubscriber name
 1150        * are used correctly.
 1151        * 
 1152        * @throws JMSException
 1153        */
 1154       public void checkClientIDWasManuallySpecified() throws JMSException {
 1155           if (!userSpecifiedClientID) {
 1156               throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
 1157           }
 1158       }
 1159   
 1160       /**
 1161        * send a Packet through the Connection - for internal use only
 1162        * 
 1163        * @param command
 1164        * @throws JMSException
 1165        */
 1166       public void asyncSendPacket(Command command) throws JMSException {
 1167           if (isClosed()) {
 1168               throw new ConnectionClosedException();
 1169           } else {
 1170               doAsyncSendPacket(command);
 1171           }
 1172       }
 1173   
 1174   	private void doAsyncSendPacket(Command command) throws JMSException {
 1175   		try {
 1176   		    this.transport.oneway(command);
 1177   		} catch (IOException e) {
 1178   		    throw JMSExceptionSupport.create(e);
 1179   		}
 1180   	}
 1181   
 1182       /**
 1183        * Send a packet through a Connection - for internal use only
 1184        * 
 1185        * @param command
 1186        * @return
 1187        * @throws JMSException
 1188        */
 1189       public Response syncSendPacket(Command command) throws JMSException {
 1190           if (isClosed()) {
 1191               throw new ConnectionClosedException();
 1192           } else {
 1193   
 1194               try {
 1195                   Response response = (Response)this.transport.request(command);
 1196                   if (response.isException()) {
 1197                       ExceptionResponse er = (ExceptionResponse)response;
 1198                       if (er.getException() instanceof JMSException) {
 1199                           throw (JMSException)er.getException();
 1200                       } else {
 1201                           throw JMSExceptionSupport.create(er.getException());
 1202                       }
 1203                   }
 1204                   return response;
 1205               } catch (IOException e) {
 1206                   throw JMSExceptionSupport.create(e);
 1207               }
 1208           }
 1209       }
 1210   
 1211       /**
 1212        * Send a packet through a Connection - for internal use only
 1213        * 
 1214        * @param command
 1215        * @return
 1216        * @throws JMSException
 1217        */
 1218       public Response syncSendPacket(Command command, int timeout) throws JMSException {
 1219           if (isClosed() || closing.get()) {
 1220               throw new ConnectionClosedException();
 1221           } else {
 1222               return doSyncSendPacket(command, timeout);
 1223           }
 1224       }
 1225   
 1226   	private Response doSyncSendPacket(Command command, int timeout)
 1227   			throws JMSException {
 1228   		try {
 1229   		    Response response = (Response)this.transport.request(command, timeout);
 1230   		    if (response != null && response.isException()) {
 1231   		        ExceptionResponse er = (ExceptionResponse)response;
 1232   		        if (er.getException() instanceof JMSException) {
 1233   		            throw (JMSException)er.getException();
 1234   		        } else {
 1235   		            throw JMSExceptionSupport.create(er.getException());
 1236   		        }
 1237   		    }
 1238   		    return response;
 1239   		} catch (IOException e) {
 1240   		    throw JMSExceptionSupport.create(e);
 1241   		}
 1242   	}
 1243   
 1244       /**
 1245        * @return statistics for this Connection
 1246        */
 1247       public StatsImpl getStats() {
 1248           return stats;
 1249       }
 1250   
 1251       /**
 1252        * simply throws an exception if the Connection is already closed or the
 1253        * Transport has failed
 1254        * 
 1255        * @throws JMSException
 1256        */
 1257       protected synchronized void checkClosedOrFailed() throws JMSException {
 1258           checkClosed();
 1259           if (transportFailed.get()) {
 1260               throw new ConnectionFailedException(firstFailureError);
 1261           }
 1262       }
 1263   
 1264       /**
 1265        * simply throws an exception if the Connection is already closed
 1266        * 
 1267        * @throws JMSException
 1268        */
 1269       protected synchronized void checkClosed() throws JMSException {
 1270           if (closed.get()) {
 1271               throw new ConnectionClosedException();
 1272           }
 1273       }
 1274   
 1275       /**
 1276        * Send the ConnectionInfo to the Broker
 1277        * 
 1278        * @throws JMSException
 1279        */
 1280       protected synchronized void ensureConnectionInfoSent() throws JMSException {
 1281           // Can we skip sending the ConnectionInfo packet??
 1282           if (isConnectionInfoSentToBroker || closed.get()) {
 1283               return;
 1284           }
 1285   
 1286           if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
 1287               info.setClientId(clientIdGenerator.generateId());
 1288           }
 1289           syncSendPacket(info);
 1290   
 1291           this.isConnectionInfoSentToBroker = true;
 1292           // Add a temp destination advisory consumer so that
 1293           // We know what the valid temporary destinations are on the
 1294           // broker without having to do an RPC to the broker.
 1295   
 1296           ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
 1297           if (watchTopicAdvisories) {
 1298               advisoryConsumer = new AdvisoryConsumer(this, consumerId);
 1299           }
 1300       }
 1301   
 1302       public synchronized boolean isWatchTopicAdvisories() {
 1303           return watchTopicAdvisories;
 1304       }
 1305   
 1306       public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
 1307           this.watchTopicAdvisories = watchTopicAdvisories;
 1308       }
 1309   
 1310       /**
 1311        * @return Returns the useAsyncSend.
 1312        */
 1313       public boolean isUseAsyncSend() {
 1314           return useAsyncSend;
 1315       }
 1316   
 1317       /**
 1318        * Forces the use of <a
 1319        * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
 1320        * adds a massive performance boost; but means that the send() method will
 1321        * return immediately whether the message has been sent or not which could
 1322        * lead to message loss.
 1323        */
 1324       public void setUseAsyncSend(boolean useAsyncSend) {
 1325           this.useAsyncSend = useAsyncSend;
 1326       }
 1327   
 1328       /**
 1329        * @return true if always sync send messages
 1330        */
 1331       public boolean isAlwaysSyncSend() {
 1332           return this.alwaysSyncSend;
 1333       }
 1334   
 1335       /**
 1336        * Set true if always require messages to be sync sent
 1337        * 
 1338        * @param alwaysSyncSend
 1339        */
 1340       public void setAlwaysSyncSend(boolean alwaysSyncSend) {
 1341           this.alwaysSyncSend = alwaysSyncSend;
 1342       }
 1343   
 1344       /**
 1345        * Cleans up this connection so that it's state is as if the connection was
 1346        * just created. This allows the Resource Adapter to clean up a connection
 1347        * so that it can be reused without having to close and recreate the
 1348        * connection.
 1349        */
 1350       public void cleanup() throws JMSException {
 1351   
 1352           if (advisoryConsumer != null) {
 1353               advisoryConsumer.dispose();
 1354               advisoryConsumer = null;
 1355           }
 1356   
 1357           for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 1358               ActiveMQSession s = i.next();
 1359               s.dispose();
 1360           }
 1361           for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
 1362               ActiveMQConnectionConsumer c = i.next();
 1363               c.dispose();
 1364           }
 1365           for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
 1366               ActiveMQInputStream c = i.next();
 1367               c.dispose();
 1368           }
 1369           for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
 1370               ActiveMQOutputStream c = i.next();
 1371               c.dispose();
 1372           }
 1373   
 1374           if (isConnectionInfoSentToBroker) {
 1375               if (!transportFailed.get() && !closing.get()) {
 1376                   asyncSendPacket(info.createRemoveCommand());
 1377               }
 1378               isConnectionInfoSentToBroker = false;
 1379           }
 1380           if (userSpecifiedClientID) {
 1381               info.setClientId(null);
 1382               userSpecifiedClientID = false;
 1383           }
 1384           clientIDSet = false;
 1385   
 1386           started.set(false);
 1387       }
 1388   
 1389       /**
 1390        * Changes the associated username/password that is associated with this
 1391        * connection. If the connection has been used, you must called cleanup()
 1392        * before calling this method.
 1393        * 
 1394        * @throws IllegalStateException if the connection is in used.
 1395        */
 1396       public void changeUserInfo(String userName, String password) throws JMSException {
 1397           if (isConnectionInfoSentToBroker) {
 1398               throw new IllegalStateException("changeUserInfo used Connection is not allowed");
 1399           }
 1400           this.info.setUserName(userName);
 1401           this.info.setPassword(password);
 1402       }
 1403   
 1404       /**
 1405        * @return Returns the resourceManagerId.
 1406        * @throws JMSException
 1407        */
 1408       public String getResourceManagerId() throws JMSException {
 1409           waitForBrokerInfo();
 1410           if (brokerInfo == null) {
 1411               throw new JMSException("Connection failed before Broker info was received.");
 1412           }
 1413           return brokerInfo.getBrokerId().getValue();
 1414       }
 1415   
 1416       /**
 1417        * Returns the broker name if one is available or null if one is not
 1418        * available yet.
 1419        */
 1420       public String getBrokerName() {
 1421           try {
 1422               brokerInfoReceived.await(5, TimeUnit.SECONDS);
 1423               if (brokerInfo == null) {
 1424                   return null;
 1425               }
 1426               return brokerInfo.getBrokerName();
 1427           } catch (InterruptedException e) {
 1428               Thread.currentThread().interrupt();
 1429               return null;
 1430           }
 1431       }
 1432   
 1433       /**
 1434        * Returns the broker information if it is available or null if it is not
 1435        * available yet.
 1436        */
 1437       public BrokerInfo getBrokerInfo() {
 1438           return brokerInfo;
 1439       }
 1440   
 1441       /**
 1442        * @return Returns the RedeliveryPolicy.
 1443        * @throws JMSException
 1444        */
 1445       public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
 1446           return redeliveryPolicy;
 1447       }
 1448   
 1449       /**
 1450        * Sets the redelivery policy to be used when messages are rolled back
 1451        */
 1452       public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
 1453           this.redeliveryPolicy = redeliveryPolicy;
 1454       }
 1455   
 1456       public BlobTransferPolicy getBlobTransferPolicy() {
 1457           if (blobTransferPolicy == null) {
 1458               blobTransferPolicy = createBlobTransferPolicy();
 1459           }
 1460           return blobTransferPolicy;
 1461       }
 1462   
 1463       /**
 1464        * Sets the policy used to describe how out-of-band BLOBs (Binary Large
 1465        * OBjects) are transferred from producers to brokers to consumers
 1466        */
 1467       public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
 1468           this.blobTransferPolicy = blobTransferPolicy;
 1469       }
 1470   
 1471       /**
 1472        * @return Returns the alwaysSessionAsync.
 1473        */
 1474       public boolean isAlwaysSessionAsync() {
 1475           return alwaysSessionAsync;
 1476       }
 1477   
 1478       /**
 1479        * If this flag is set then a separate thread is not used for dispatching
 1480        * messages for each Session in the Connection. However, a separate thread
 1481        * is always used if there is more than one session, or the session isn't in
 1482        * auto acknowledge or duplicates ok mode
 1483        */
 1484       public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
 1485           this.alwaysSessionAsync = alwaysSessionAsync;
 1486       }
 1487   
 1488       /**
 1489        * @return Returns the optimizeAcknowledge.
 1490        */
 1491       public boolean isOptimizeAcknowledge() {
 1492           return optimizeAcknowledge;
 1493       }
 1494   
 1495       /**
 1496        * Enables an optimised acknowledgement mode where messages are acknowledged
 1497        * in batches rather than individually
 1498        * 
 1499        * @param optimizeAcknowledge The optimizeAcknowledge to set.
 1500        */
 1501       public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
 1502           this.optimizeAcknowledge = optimizeAcknowledge;
 1503       }
 1504   
 1505       public long getWarnAboutUnstartedConnectionTimeout() {
 1506           return warnAboutUnstartedConnectionTimeout;
 1507       }
 1508   
 1509       /**
 1510        * Enables the timeout from a connection creation to when a warning is
 1511        * generated if the connection is not properly started via {@link #start()}
 1512        * and a message is received by a consumer. It is a very common gotcha to
 1513        * forget to <a
 1514        * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
 1515        * the connection</a> so this option makes the default case to create a
 1516        * warning if the user forgets. To disable the warning just set the value to <
 1517        * 0 (say -1).
 1518        */
 1519       public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
 1520           this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
 1521       }
 1522       
 1523       /**
 1524        * @return the sendTimeout
 1525        */
 1526       public int getSendTimeout() {
 1527           return sendTimeout;
 1528       }
 1529   
 1530       /**
 1531        * @param sendTimeout the sendTimeout to set
 1532        */
 1533       public void setSendTimeout(int sendTimeout) {
 1534           this.sendTimeout = sendTimeout;
 1535       }
 1536   
 1537   
 1538       /**
 1539        * Returns the time this connection was created
 1540        */
 1541       public long getTimeCreated() {
 1542           return timeCreated;
 1543       }
 1544   
 1545       private void waitForBrokerInfo() throws JMSException {
 1546           try {
 1547               brokerInfoReceived.await();
 1548           } catch (InterruptedException e) {
 1549               Thread.currentThread().interrupt();
 1550               throw JMSExceptionSupport.create(e);
 1551           }
 1552       }
 1553   
 1554       // Package protected so that it can be used in unit tests
 1555       public Transport getTransport() {
 1556           return transport;
 1557       }
 1558   
 1559       public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
 1560           producers.put(producerId, producer);
 1561       }
 1562   
 1563       public void removeProducer(ProducerId producerId) {
 1564           producers.remove(producerId);
 1565       }
 1566   
 1567       public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
 1568           dispatchers.put(consumerId, dispatcher);
 1569       }
 1570   
 1571       public void removeDispatcher(ConsumerId consumerId) {
 1572           dispatchers.remove(consumerId);
 1573       }
 1574   
 1575       /**
 1576        * @param o - the command to consume
 1577        */
 1578       public void onCommand(final Object o) {
 1579           final Command command = (Command)o;
 1580           if (!closed.get() && command != null) {
 1581               try {
 1582                   command.visit(new CommandVisitorAdapter() {
 1583                       @Override
 1584                       public Response processMessageDispatch(MessageDispatch md) throws Exception {
 1585                           ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
 1586                           if (dispatcher != null) {
 1587                               // Copy in case a embedded broker is dispatching via
 1588                               // vm://
 1589                               // md.getMessage() == null to signal end of queue
 1590                               // browse.
 1591                               Message msg = md.getMessage();
 1592                               if (msg != null) {
 1593                                   msg = msg.copy();
 1594                                   msg.setReadOnlyBody(true);
 1595                                   msg.setReadOnlyProperties(true);
 1596                                   msg.setRedeliveryCounter(md.getRedeliveryCounter());
 1597                                   msg.setConnection(ActiveMQConnection.this);
 1598                                   md.setMessage(msg);
 1599                               }
 1600                               dispatcher.dispatch(md);
 1601                           }
 1602                           return null;
 1603                       }
 1604   
 1605                       @Override
 1606                       public Response processProducerAck(ProducerAck pa) throws Exception {
 1607                           if (pa != null && pa.getProducerId() != null) {
 1608                               ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
 1609                               if (producer != null) {
 1610                                   producer.onProducerAck(pa);
 1611                               }
 1612                           }
 1613                           return null;
 1614                       }
 1615   
 1616                       @Override
 1617                       public Response processBrokerInfo(BrokerInfo info) throws Exception {
 1618                           brokerInfo = info;
 1619                           brokerInfoReceived.countDown();
 1620                           optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
 1621                           getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
 1622                           return null;
 1623                       }
 1624   
 1625                       @Override
 1626                       public Response processConnectionError(final ConnectionError error) throws Exception {
 1627                           asyncConnectionThread.execute(new Runnable() {
 1628                               public void run() {
 1629                                   onAsyncException(error.getException());
 1630                               }
 1631                           });
 1632                           return null;
 1633                       }
 1634   
 1635                       @Override
 1636                       public Response processControlCommand(ControlCommand command) throws Exception {
 1637                           onControlCommand(command);
 1638                           return null;
 1639                       }
 1640   
 1641                       @Override
 1642                       public Response processConnectionControl(ConnectionControl control) throws Exception {
 1643                           onConnectionControl((ConnectionControl)command);
 1644                           return null;
 1645                       }
 1646   
 1647                       @Override
 1648                       public Response processConsumerControl(ConsumerControl control) throws Exception {
 1649                           onConsumerControl((ConsumerControl)command);
 1650                           return null;
 1651                       }
 1652   
 1653                       @Override
 1654                       public Response processWireFormat(WireFormatInfo info) throws Exception {
 1655                           onWireFormatInfo((WireFormatInfo)command);
 1656                           return null;
 1657                       }
 1658                   });
 1659               } catch (Exception e) {
 1660                   onAsyncException(e);
 1661               }
 1662   
 1663           }
 1664           for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
 1665               TransportListener listener = iter.next();
 1666               listener.onCommand(command);
 1667           }
 1668       }
 1669   
 1670       protected void onWireFormatInfo(WireFormatInfo info) {
 1671           protocolVersion.set(info.getVersion());
 1672       }
 1673   
 1674       /**
 1675        * Used for handling async exceptions
 1676        * 
 1677        * @param error
 1678        */
 1679       public void onAsyncException(Throwable error) {
 1680           if (!closed.get() && !closing.get()) {
 1681               if (this.exceptionListener != null) {
 1682   
 1683                   if (!(error instanceof JMSException)) {
 1684                       error = JMSExceptionSupport.create(error);
 1685                   }
 1686                   final JMSException e = (JMSException)error;
 1687   
 1688                   asyncConnectionThread.execute(new Runnable() {
 1689                       public void run() {
 1690                           ActiveMQConnection.this.exceptionListener.onException(e);
 1691                       }
 1692                   });
 1693   
 1694               } else {
 1695                   LOG.debug("Async exception with no exception listener: " + error, error);
 1696               }
 1697           }
 1698       }
 1699   
 1700       public void onException(final IOException error) {
 1701   		onAsyncException(error);
 1702   		if (!closing.get() && !closed.get()) {
 1703   			asyncConnectionThread.execute(new Runnable() {
 1704   				public void run() {
 1705   					transportFailed(error);
 1706   					ServiceSupport.dispose(ActiveMQConnection.this.transport);
 1707   					brokerInfoReceived.countDown();
 1708   
 1709   					for (Iterator<TransportListener> iter = transportListeners
 1710   							.iterator(); iter.hasNext();) {
 1711   						TransportListener listener = iter.next();
 1712   						listener.onException(error);
 1713   					}
 1714   				}
 1715   			});
 1716   		}
 1717   	}
 1718   
 1719       public void transportInterupted() {
 1720           for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 1721               ActiveMQSession s = i.next();
 1722               s.clearMessagesInProgress();
 1723           }
 1724           for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
 1725               TransportListener listener = iter.next();
 1726               listener.transportInterupted();
 1727           }
 1728       }
 1729   
 1730       public void transportResumed() {
 1731           for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
 1732               TransportListener listener = iter.next();
 1733               listener.transportResumed();
 1734           }
 1735           for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 1736               ActiveMQSession s = i.next();
 1737               s.deliverAcks();
 1738           }
 1739       }
 1740   
 1741       /**
 1742        * Create the DestinationInfo object for the temporary destination.
 1743        * 
 1744        * @param topic - if its true topic, else queue.
 1745        * @return DestinationInfo
 1746        * @throws JMSException
 1747        */
 1748       protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
 1749   
 1750           // Check if Destination info is of temporary type.
 1751           ActiveMQTempDestination dest;
 1752           if (topic) {
 1753               dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
 1754           } else {
 1755               dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
 1756           }
 1757   
 1758           DestinationInfo info = new DestinationInfo();
 1759           info.setConnectionId(this.info.getConnectionId());
 1760           info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
 1761           info.setDestination(dest);
 1762           syncSendPacket(info);
 1763   
 1764           dest.setConnection(this);
 1765           activeTempDestinations.put(dest, dest);
 1766           return dest;
 1767       }
 1768   
 1769       /**
 1770        * @param destination
 1771        * @throws JMSException
 1772        */
 1773       public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
 1774   
 1775           checkClosedOrFailed();
 1776   
 1777           for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 1778               ActiveMQSession s = i.next();
 1779               if (s.isInUse(destination)) {
 1780                   throw new JMSException("A consumer is consuming from the temporary destination");
 1781               }
 1782           }
 1783   
 1784           activeTempDestinations.remove(destination);
 1785   
 1786           DestinationInfo info = new DestinationInfo();
 1787           info.setConnectionId(this.info.getConnectionId());
 1788           info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
 1789           info.setDestination(destination);
 1790           info.setTimeout(0);
 1791           syncSendPacket(info);
 1792       }
 1793   
 1794       public boolean isDeleted(ActiveMQDestination dest) {
 1795   
 1796           // If we are not watching the advisories.. then
 1797           // we will assume that the temp destination does exist.
 1798           if (advisoryConsumer == null) {
 1799               return false;
 1800           }
 1801   
 1802           return !activeTempDestinations.contains(dest);
 1803       }
 1804   
 1805       public boolean isCopyMessageOnSend() {
 1806           return copyMessageOnSend;
 1807       }
 1808   
 1809       public LongSequenceGenerator getLocalTransactionIdGenerator() {
 1810           return localTransactionIdGenerator;
 1811       }
 1812   
 1813       public boolean isUseCompression() {
 1814           return useCompression;
 1815       }
 1816   
 1817       /**
 1818        * Enables the use of compression of the message bodies
 1819        */
 1820       public void setUseCompression(boolean useCompression) {
 1821           this.useCompression = useCompression;
 1822       }
 1823   
 1824       public void destroyDestination(ActiveMQDestination destination) throws JMSException {
 1825   
 1826           checkClosedOrFailed();
 1827           ensureConnectionInfoSent();
 1828   
 1829           DestinationInfo info = new DestinationInfo();
 1830           info.setConnectionId(this.info.getConnectionId());
 1831           info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
 1832           info.setDestination(destination);
 1833           info.setTimeout(0);
 1834           syncSendPacket(info);
 1835   
 1836       }
 1837   
 1838       public boolean isDispatchAsync() {
 1839           return dispatchAsync;
 1840       }
 1841   
 1842       /**
 1843        * Enables or disables the default setting of whether or not consumers have
 1844        * their messages <a
 1845        * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
 1846        * synchronously or asynchronously by the broker</a>. For non-durable
 1847        * topics for example we typically dispatch synchronously by default to
 1848        * minimize context switches which boost performance. However sometimes its
 1849        * better to go slower to ensure that a single blocked consumer socket does
 1850        * not block delivery to other consumers.
 1851        * 
 1852        * @param asyncDispatch If true then consumers created on this connection
 1853        *                will default to having their messages dispatched
 1854        *                asynchronously. The default value is false.
 1855        */
 1856       public void setDispatchAsync(boolean asyncDispatch) {
 1857           this.dispatchAsync = asyncDispatch;
 1858       }
 1859   
 1860       public boolean isObjectMessageSerializationDefered() {
 1861           return objectMessageSerializationDefered;
 1862       }
 1863   
 1864       /**
 1865        * When an object is set on an ObjectMessage, the JMS spec requires the
 1866        * object to be serialized by that set method. Enabling this flag causes the
 1867        * object to not get serialized. The object may subsequently get serialized
 1868        * if the message needs to be sent over a socket or stored to disk.
 1869        */
 1870       public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
 1871           this.objectMessageSerializationDefered = objectMessageSerializationDefered;
 1872       }
 1873   
 1874       public InputStream createInputStream(Destination dest) throws JMSException {
 1875           return createInputStream(dest, null);
 1876       }
 1877   
 1878       public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
 1879           return createInputStream(dest, messageSelector, false);
 1880       }
 1881   
 1882       public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
 1883           return doCreateInputStream(dest, messageSelector, noLocal, null);
 1884       }
 1885   
 1886       public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
 1887           return createInputStream(dest, null, false);
 1888       }
 1889   
 1890       public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
 1891           return createDurableInputStream(dest, name, messageSelector, false);
 1892       }
 1893   
 1894       public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
 1895           return doCreateInputStream(dest, messageSelector, noLocal, name);
 1896       }
 1897   
 1898       private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException {
 1899           checkClosedOrFailed();
 1900           ensureConnectionInfoSent();
 1901           return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
 1902       }
 1903   
 1904       /**
 1905        * Creates a persistent output stream; individual messages will be written
 1906        * to disk/database by the broker
 1907        */
 1908       public OutputStream createOutputStream(Destination dest) throws JMSException {
 1909           return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
 1910       }
 1911   
 1912       /**
 1913        * Creates a non persistent output stream; messages will not be written to
 1914        * disk
 1915        */
 1916       public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
 1917           return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
 1918       }
 1919   
 1920       /**
 1921        * Creates an output stream allowing full control over the delivery mode,
 1922        * the priority and time to live of the messages and the properties added to
 1923        * messages on the stream.
 1924        * 
 1925        * @param streamProperties defines a map of key-value pairs where the keys
 1926        *                are strings and the values are primitive values (numbers
 1927        *                and strings) which are appended to the messages similarly
 1928        *                to using the
 1929        *                {@link javax.jms.Message#setObjectProperty(String, Object)}
 1930        *                method
 1931        */
 1932       public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
 1933           checkClosedOrFailed();
 1934           ensureConnectionInfoSent();
 1935           return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
 1936       }
 1937   
 1938       /**
 1939        * Unsubscribes a durable subscription that has been created by a client.
 1940        * <P>
 1941        * This method deletes the state being maintained on behalf of the
 1942        * subscriber by its provider.
 1943        * <P>
 1944        * It is erroneous for a client to delete a durable subscription while there
 1945        * is an active <CODE>MessageConsumer </CODE> or
 1946        * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
 1947        * message is part of a pending transaction or has not been acknowledged in
 1948        * the session.
 1949        * 
 1950        * @param name the name used to identify this subscription
 1951        * @throws JMSException if the session fails to unsubscribe to the durable
 1952        *                 subscription due to some internal error.
 1953        * @throws InvalidDestinationException if an invalid subscription name is
 1954        *                 specified.
 1955        * @since 1.1
 1956        */
 1957       public void unsubscribe(String name) throws JMSException {
 1958           checkClosedOrFailed();
 1959           RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
 1960           rsi.setConnectionId(getConnectionInfo().getConnectionId());
 1961           rsi.setSubscriptionName(name);
 1962           rsi.setClientId(getConnectionInfo().getClientId());
 1963           syncSendPacket(rsi);
 1964       }
 1965   
 1966       /**
 1967        * Internal send method optimized: - It does not copy the message - It can
 1968        * only handle ActiveMQ messages. - You can specify if the send is async or
 1969        * sync - Does not allow you to send /w a transaction.
 1970        */
 1971       void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
 1972           checkClosedOrFailed();
 1973   
 1974           if (destination.isTemporary() && isDeleted(destination)) {
 1975               throw new JMSException("Cannot publish to a deleted Destination: " + destination);
 1976           }
 1977   
 1978           msg.setJMSDestination(destination);
 1979           msg.setJMSDeliveryMode(deliveryMode);
 1980           long expiration = 0L;
 1981   
 1982           if (!isDisableTimeStampsByDefault()) {
 1983               long timeStamp = System.currentTimeMillis();
 1984               msg.setJMSTimestamp(timeStamp);
 1985               if (timeToLive > 0) {
 1986                   expiration = timeToLive + timeStamp;
 1987               }
 1988           }
 1989   
 1990           msg.setJMSExpiration(expiration);
 1991           msg.setJMSPriority(priority);
 1992   
 1993           msg.setJMSRedelivered(false);
 1994           msg.setMessageId(messageId);
 1995   
 1996           msg.onSend();
 1997   
 1998           msg.setProducerId(msg.getMessageId().getProducerId());
 1999   
 2000           if (LOG.isDebugEnabled()) {
 2001               LOG.debug("Sending message: " + msg);
 2002           }
 2003   
 2004           if (async) {
 2005               asyncSendPacket(msg);
 2006           } else {
 2007               syncSendPacket(msg);
 2008           }
 2009   
 2010       }
 2011   
 2012       public void addOutputStream(ActiveMQOutputStream stream) {
 2013           outputStreams.add(stream);
 2014       }
 2015   
 2016       public void removeOutputStream(ActiveMQOutputStream stream) {
 2017           outputStreams.remove(stream);
 2018       }
 2019   
 2020       public void addInputStream(ActiveMQInputStream stream) {
 2021           inputStreams.add(stream);
 2022       }
 2023   
 2024       public void removeInputStream(ActiveMQInputStream stream) {
 2025           inputStreams.remove(stream);
 2026       }
 2027   
 2028       protected void onControlCommand(ControlCommand command) {
 2029           String text = command.getCommand();
 2030           if (text != null) {
 2031               if (text.equals("shutdown")) {
 2032                   LOG.info("JVM told to shutdown");
 2033                   System.exit(0);
 2034               }
 2035           }
 2036       }
 2037   
 2038       protected void onConnectionControl(ConnectionControl command) {
 2039           if (command.isFaultTolerant()) {
 2040               this.optimizeAcknowledge = false;
 2041               for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 2042                   ActiveMQSession s = i.next();
 2043                   s.setOptimizeAcknowledge(false);
 2044               }
 2045           }
 2046       }
 2047   
 2048       protected void onConsumerControl(ConsumerControl command) {
 2049           if (command.isClose()) {
 2050               for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 2051                   ActiveMQSession s = i.next();
 2052                   s.close(command.getConsumerId());
 2053               }
 2054           } else {
 2055               for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 2056                   ActiveMQSession s = i.next();
 2057                   s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
 2058               }
 2059           }
 2060       }
 2061   
 2062       protected void transportFailed(IOException error) {
 2063           transportFailed.set(true);
 2064           if (firstFailureError == null) {
 2065               firstFailureError = error;
 2066           }
 2067       }
 2068   
 2069       /**
 2070        * Should a JMS message be copied to a new JMS Message object as part of the
 2071        * send() method in JMS. This is enabled by default to be compliant with the
 2072        * JMS specification. You can disable it if you do not mutate JMS messages
 2073        * after they are sent for a performance boost
 2074        */
 2075       public void setCopyMessageOnSend(boolean copyMessageOnSend) {
 2076           this.copyMessageOnSend = copyMessageOnSend;
 2077       }
 2078   
 2079       public String toString() {
 2080           return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
 2081       }
 2082   
 2083       protected BlobTransferPolicy createBlobTransferPolicy() {
 2084           return new BlobTransferPolicy();
 2085       }
 2086   
 2087       public int getProtocolVersion() {
 2088           return protocolVersion.get();
 2089       }
 2090   
 2091       public int getProducerWindowSize() {
 2092           return producerWindowSize;
 2093       }
 2094   
 2095       public void setProducerWindowSize(int producerWindowSize) {
 2096           this.producerWindowSize = producerWindowSize;
 2097       }
 2098   
 2099       protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
 2100           connectionAudit.removeDispatcher(dispatcher);
 2101       }
 2102   
 2103       protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
 2104           return connectionAudit.isDuplicate(dispatcher, message);
 2105       }
 2106   
 2107       protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
 2108           connectionAudit.rollbackDuplicate(dispatcher, message);
 2109       }
 2110   }

Save This Page
Home » apache-activemq-5.1.0-src » org.apache » activemq » [javadoc | source]