Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ActiveMQConnectionFactory.java


1   /**
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq;
19  
20  import java.net.URI;
21  import java.net.URISyntaxException;
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Properties;
27  
28  import javax.jms.Connection;
29  import javax.jms.ConnectionFactory;
30  import javax.jms.JMSException;
31  import javax.jms.QueueConnection;
32  import javax.jms.QueueConnectionFactory;
33  import javax.jms.TopicConnection;
34  import javax.jms.TopicConnectionFactory;
35  import javax.naming.Context;
36  
37  import org.activemq.broker.Broker;
38  import org.activemq.broker.BrokerConnector;
39  import org.activemq.broker.BrokerContainer;
40  import org.activemq.broker.BrokerContainerFactory;
41  import org.activemq.broker.BrokerContext;
42  import org.activemq.broker.impl.BrokerClientImpl;
43  import org.activemq.broker.impl.BrokerConnectorImpl;
44  import org.activemq.broker.impl.BrokerContainerFactoryImpl;
45  import org.activemq.io.WireFormat;
46  import org.activemq.io.WireFormatLoader;
47  import org.activemq.io.impl.DefaultWireFormat;
48  import org.activemq.io.util.ByteArrayCompression;
49  import org.activemq.io.util.ByteArrayFragmentation;
50  import org.activemq.jndi.JNDIBaseStorable;
51  import org.activemq.management.JMSStatsImpl;
52  import org.activemq.management.StatsCapable;
53  import org.activemq.management.StatsImpl;
54  import org.activemq.message.ActiveMQQueue;
55  import org.activemq.message.ActiveMQTopic;
56  import org.activemq.message.ConnectionInfo;
57  import org.activemq.message.ConsumerInfo;
58  import org.activemq.service.Service;
59  import org.activemq.transport.TransportChannel;
60  import org.activemq.transport.TransportChannelFactory;
61  import org.activemq.transport.TransportChannelListener;
62  import org.activemq.transport.TransportChannelProvider;
63  import org.activemq.transport.vm.VmTransportChannel;
64  import org.activemq.util.BeanUtils;
65  import org.activemq.util.IdGenerator;
66  import org.activemq.util.URIHelper;
67  import org.apache.commons.logging.Log;
68  import org.apache.commons.logging.LogFactory;
69  
70  /**
71   * A ConnectionFactory is an an Administed object, and is used for creating
72   * Connections.
73   * <p/>
74   * This class also implements QueueConnectionFactory and TopicConnectionFactory and is an Administered object.
75   * You can use this connection to create both QueueConnections and TopicConnections.
76   *
77   * @version $Revision: 1.2 $
78   * @see javax.jms.ConnectionFactory
79   */
80  public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, Service, StatsCapable {
81  
82      private static final Log log = LogFactory.getLog(ActiveMQConnectionFactory.class);
83  
84      private BrokerContext brokerContext = BrokerContext.getInstance();
85      private BrokerContainerFactory brokerContainerFactory;
86      protected BrokerContainer brokerContainer;
87  
88      protected String userName;
89      protected String password;
90      protected String brokerURL;
91      protected String clientID;
92      protected String brokerName;
93      private boolean useEmbeddedBroker;
94      /**
95       * Should we use an async send for persistent non transacted messages ?
96       */
97      protected boolean useAsyncSend = true;
98      protected boolean disableTimeStampsByDefault = false;
99      protected boolean J2EEcompliant = true;
100     
101     /* The list of emebeded brokers that this object started */
102     private List startedEmbeddedBrokers = new ArrayList();
103 
104     private JMSStatsImpl stats = new JMSStatsImpl();
105     private WireFormat wireFormat = new DefaultWireFormat();
106     private IdGenerator idGenerator = new IdGenerator();
107     private int connectionCount;
108     private String brokerXmlConfig;
109     
110     
111     //compression and fragmentation variables
112     
113     protected boolean doMessageCompression = true;
114     protected int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;//data size above which compression will be used
115     protected int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
116     protected int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;//default compression strategy
117     
118     protected boolean doMessageFragmentation = false;
119     protected int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
120     
121     protected boolean cachingEnabled = true;
122     
123     protected boolean prepareMessageBodyOnSend = true; //causes pre-serialization of messages
124     
125     protected boolean quickClose = false;
126     
127     protected boolean internalConnection = false;//connections are used internally - for networks etc.
128 
129     protected boolean optimizedMessageDispatch = false;//set to true for better consumption for transient topics
130     
131     protected boolean copyMessageOnSend = true;//set false for better throughput
132 
133     private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
134 
135     /**
136      * Default Constructor for ActiveMQConnectionFactory
137      */
138     public ActiveMQConnectionFactory() {
139         this( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_URL);
140     }
141 
142 
143     public ActiveMQConnectionFactory(String brokerURL) {
144         this(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerURL);
145     }
146 
147     public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
148         this.userName = userName;
149         this.password = password;
150         this.brokerURL = brokerURL;
151         
152         if( brokerURL.indexOf("?")>= 0 ) {
153             String options = brokerURL.substring(brokerURL.indexOf("?")+1);
154             Map properties = URIHelper.parseQuery(options);
155             if (!properties.isEmpty()) {
156                 BeanUtils.populate(this, properties);
157             }   
158         }
159     }
160 
161     /**
162      * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
163      * ready for use in embedded mode.
164      *
165      * @param container
166      */
167     public ActiveMQConnectionFactory(BrokerContainer container) {
168         this(container, "vm://" + container.getBroker().getName());
169     }
170 
171     /**
172      * Constructs a {@link ConnectionFactory} with an already configured and <b>started</b> {@link BrokerContainer}
173      * ready for use in embedded mode and the brokerURL connection.
174      *
175      * @param container
176      */
177     public ActiveMQConnectionFactory(BrokerContainer container, String brokerURL) {
178         this.brokerContainer = container;
179         this.useEmbeddedBroker = false;
180         this.brokerURL = brokerURL;
181         
182         if( brokerURL.indexOf("?")>= 0 ) {
183             String options = brokerURL.substring(brokerURL.indexOf("?")+1);
184             Map properties = URIHelper.parseQuery(options);
185             if (!properties.isEmpty()) {
186                 BeanUtils.populate(this, properties);
187             }   
188         }
189     }
190 
191 
192     public StatsImpl getStats() {
193         return stats;
194     }
195 
196     public JMSStatsImpl getFactoryStats() {
197         return stats;
198     }
199 
200     /**
201      * @return Returns the brokerURL.
202      */
203     public String getBrokerURL() {
204         return brokerURL;
205     }
206 
207     /**
208      * @param brokerURL The brokerURL to set.
209      */
210     public void setBrokerURL(String brokerURL) {
211         this.brokerURL = brokerURL;
212     }
213 
214     /**
215      * @return Returns the clientID.
216      */
217     public String getClientID() {
218         return clientID;
219     }
220 
221     /**
222      * @param clientID The clientID to set.
223      */
224     public void setClientID(String clientID) {
225         this.clientID = clientID;
226     }
227 
228     /**
229      * @return Returns the password.
230      */
231     public String getPassword() {
232         return password;
233     }
234 
235     /**
236      * @param password The password to set.
237      */
238     public void setPassword(String password) {
239         this.password = password;
240     }
241 
242     /**
243      * @return Returns the userName.
244      */
245     public String getUserName() {
246         return userName;
247     }
248 
249     /**
250      * @param userName The userName to set.
251      */
252     public void setUserName(String userName) {
253         this.userName = userName;
254     }
255 
256     /**
257      * Is an embedded broker used by this connection factory
258      *
259      * @return true if an embedded broker will be used by this connection factory
260      */
261     public boolean isUseEmbeddedBroker() {
262         return useEmbeddedBroker;
263     }
264 
265     /**
266      * Allows embedded brokers to be associated with a connection factory
267      *
268      * @param useEmbeddedBroker
269      */
270     public void setUseEmbeddedBroker(boolean useEmbeddedBroker) {
271         this.useEmbeddedBroker = useEmbeddedBroker;
272     }
273 
274     /**
275      * The name of the broker to use if creating an embedded broker
276      *
277      * @return
278      */
279     public String getBrokerName() {
280         if (brokerName == null) {
281             // lets auto-create a broker name
282             brokerName = idGenerator.generateId();
283         }
284         return brokerName;
285     }
286     
287     /**
288      * The name of the broker to use if creating an embedded broker
289      *
290      * @return
291      */
292     public String getBrokerName(String url) {
293         if (brokerName == null) {
294             brokerName = url;
295         }
296         return brokerName;
297     }
298 
299     public void setBrokerName(String brokerName) {
300         this.brokerName = brokerName;
301     }
302 
303     /**
304      * @return Returns the useAsyncSend.
305      */
306     public boolean isUseAsyncSend() {
307         return useAsyncSend;
308     }
309 
310     /**
311      * @param useAsyncSend The useAsyncSend to set.
312      */
313     public void setUseAsyncSend(boolean useAsyncSend) {
314         this.useAsyncSend = useAsyncSend;
315     }
316 
317     public WireFormat getWireFormat() {
318         return wireFormat.copy();//need a separate instance - especially if wire format caching enabled
319     }
320 
321     /**
322      * Allows the prefetch policy to be configured
323      *
324      * @return
325      */
326     public ActiveMQPrefetchPolicy getPrefetchPolicy() {
327         return prefetchPolicy;
328     }
329 
330     /**
331      * Sets the prefetch policy
332      *
333      * @param prefetchPolicy
334      */
335     public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
336         this.prefetchPolicy = prefetchPolicy;
337     }
338 
339     /**
340      * Set this flag for fast throughput!
341      * <P>
342      * Enables asynchronous sending of messages and disables timestamps by default
343      * </P>
344      * @param value - the flag to set
345      */
346     public void setTurboBoost(boolean value){
347         
348         disableTimeStampsByDefault = value;
349         useAsyncSend = value;
350         cachingEnabled = value;
351         optimizedMessageDispatch = value;
352         prepareMessageBodyOnSend = !value;
353         copyMessageOnSend = !value;
354     }
355     
356     /**
357      * @return true if turboBoost enabled
358      */
359     public boolean isTurboBoost(){
360         return disableTimeStampsByDefault && useAsyncSend && cachingEnabled;
361     }
362     
363     /**
364      * @return Returns the optimizedMessageDispatch.
365      */
366     public boolean isOptimizedMessageDispatch() {
367         return optimizedMessageDispatch;
368     }
369     /**
370      * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
371      */
372     public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
373         this.optimizedMessageDispatch = optimizedMessageDispatch;
374     }
375     /**
376      * @return Returns the disableTimeStampsByDefault.
377      */
378     public boolean isDisableTimeStampsByDefault() {
379         return disableTimeStampsByDefault;
380     }
381     /**
382      * @param disableTimeStampsByDefault The disableTimeStampsByDefault to set.
383      */
384     public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
385         this.disableTimeStampsByDefault = disableTimeStampsByDefault;
386     }
387     /**
388      * @return Returns the j2EEcompliant.
389      */
390     public boolean isJ2EEcompliant() {
391         return J2EEcompliant;
392     }
393     /**
394      * @param ecompliant The j2EEcompliant to set.
395      */
396     public void setJ2EEcompliant(boolean ecompliant) {
397         J2EEcompliant = ecompliant;
398     }
399 
400     /**
401      * @return Returns the internalConnection.
402      */
403     public boolean isInternalConnection() {
404         return internalConnection;
405     }
406     /**
407      * @param internalConnection The internalConnection to set.
408      */
409     public void setInternalConnection(boolean internalConnection) {
410         this.internalConnection = internalConnection;
411     }
412     /**
413      * @return Returns the quickClose.
414      */
415     public boolean isQuickClose() {
416         return quickClose;
417     }
418     /**
419      * @param quickClose The quickClose to set.
420      */
421     public void setQuickClose(boolean quickClose) {
422         this.quickClose = quickClose;
423     }
424     /**
425      * @return Returns the doMessageCompression.
426      */
427     public boolean isDoMessageCompression() {
428         return doMessageCompression;
429     }
430     /**
431      * @param doMessageCompression The doMessageCompression to set.
432      */
433     public void setDoMessageCompression(boolean doMessageCompression) {
434         this.doMessageCompression = doMessageCompression;
435     }
436     /**
437      * @return Returns the doMessageFragmentation.
438      */
439     public boolean isDoMessageFragmentation() {
440         return doMessageFragmentation;
441     }
442     /**
443      * @param doMessageFragmentation The doMessageFragmentation to set.
444      */
445     public void setDoMessageFragmentation(boolean doMessageFragmentation) {
446         this.doMessageFragmentation = doMessageFragmentation;
447     }
448     /**
449      * @return Returns the messageCompressionLimit.
450      */
451     public int getMessageCompressionLimit() {
452         return messageCompressionLimit;
453     }
454     /**
455      * @param messageCompressionLimit The messageCompressionLimit to set.
456      */
457     public void setMessageCompressionLimit(int messageCompressionLimit) {
458         this.messageCompressionLimit = messageCompressionLimit;
459     }
460     /**
461      * @return Returns the messageCompressionStrategy.
462      */
463     public int getMessageCompressionStrategy() {
464         return messageCompressionStrategy;
465     }
466     /**
467      * @param messageCompressionStrategy The messageCompressionStrategy to set.
468      */
469     public void setMessageCompressionStrategy(int messageCompressionStrategy) {
470         this.messageCompressionStrategy = messageCompressionStrategy;
471     }
472     /**
473      * @return Returns the messageFragmentationLimit.
474      */
475     public int getMessageFragmentationLimit() {
476         return messageFragmentationLimit;
477     }
478     /**
479      * @param messageFragmentationLimit The messageFragmentationLimit to set.
480      */
481     public void setMessageFragmentationLimit(int messageFragmentationLimit) {
482         this.messageFragmentationLimit = messageFragmentationLimit;
483     }
484     
485     /**
486      * @return Returns the cachingEnabled.
487      */
488     public boolean isCachingEnabled() {
489         return cachingEnabled;
490     }
491     /**
492      * @param cachingEnabled The cachingEnabled to set.
493      */
494     public void setCachingEnabled(boolean cachingEnabled) {
495         this.cachingEnabled = cachingEnabled;
496     }
497     /**
498      * Causes pre-serialization of messages before send
499      * By default this is on
500      * @return Returns the prePrepareMessageOnSend.
501      */
502     public boolean isPrepareMessageBodyOnSend() {
503         return prepareMessageBodyOnSend;
504     }
505     /**
506      * Causes pre-serialization of messages before send
507      * By default this is on
508      * @param prePrepareMessageOnSend The prePrepareMessageOnSend to set.
509      */
510     public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) {
511         this.prepareMessageBodyOnSend = prePrepareMessageOnSend;
512     }
513     /**
514      * @return Returns the copyMessageOnSend.
515      */
516     public boolean isCopyMessageOnSend() {
517         return copyMessageOnSend;
518     }
519     /**
520      * @param copyMessageOnSend The copyMessageOnSend to set.
521      */
522     public void setCopyMessageOnSend(boolean copyMessageOnSend) {
523         this.copyMessageOnSend = copyMessageOnSend;
524     }
525     /**
526      * Allows a custom wire format to be used; otherwise the default Java wire format is used
527      * which is designed for minimum size and maximum speed on the Java platform
528      *
529      * @param wireFormat
530      */
531     public void setWireFormat(WireFormat wireFormat) {
532         this.wireFormat = wireFormat;
533     }
534     
535     /**
536      * set the WireFormat by name - e.g. 'default','amqpfast' etc.
537      * 
538      * @param format
539      * @throws JMSException
540      */
541  
542     public void setWireFormat(String format) throws JMSException{
543         this.wireFormat = WireFormatLoader.getWireFormat(format);
544     }
545 
546     public String getBrokerXmlConfig() {
547         return brokerXmlConfig;
548     }
549 
550     public BrokerContainer getBrokerContainer() {
551         return brokerContainer;
552     }
553 
554     /**
555      * Sets the <a href="http://activemq.org/Xml+Configuration">XML configuration file</a>
556      * used to configure the ActiveMQ broker via Spring if using embedded mode.
557      *
558      * @param brokerXmlConfig is the filename which is assumed to be on the classpath unless a URL
559      *                        is specified. So a value of <code>foo/bar.xml</code> would be assumed to be on the classpath
560      *                        whereas <code>file:dir/file.xml</code> would use the file system.
561      *                        Any valid URL string is supported.
562      * @see #setUseEmbeddedBroker(boolean)
563      */
564     public void setBrokerXmlConfig(String brokerXmlConfig) {
565         this.brokerXmlConfig = brokerXmlConfig;
566     }
567 
568     public BrokerContainerFactory getBrokerContainerFactory() throws JMSException {
569         if (brokerContainerFactory == null) {
570             brokerContainerFactory = createBrokerContainerFactory();
571         }
572         return brokerContainerFactory;
573     }
574 
575     public void setBrokerContainerFactory(BrokerContainerFactory brokerContainerFactory) {
576         this.brokerContainerFactory = brokerContainerFactory;
577     }
578 
579     /**
580      * Returns the context used to store broker containers and connectors which defaults
581      * to using the singleton
582      */
583     public BrokerContext getBrokerContext() {
584         return brokerContext;
585     }
586 
587     public void setBrokerContext(BrokerContext brokerContext) {
588         this.brokerContext = brokerContext;
589     }
590 
591     /**
592      * Create a JMS Connection
593      *
594      * @return the JMS Connection
595      * @throws JMSException if an error occurs creating the Connection
596      */
597     public Connection createConnection() throws JMSException {
598         return this.createConnection(this.userName, this.password);
599     }
600 
601     /**
602      * @param userName
603      * @param password
604      * @return the Connection
605      * @throws JMSException if an error occurs creating the Connection
606      */
607     public Connection createConnection(String userName, String password) throws JMSException {
608         ActiveMQConnection connection = new ActiveMQConnection(this, userName, password, createTransportChannel(this.brokerURL));
609         connection.setCachingEnabled(isCachingEnabled());
610         connection.setUseAsyncSend(isUseAsyncSend());
611         connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
612         connection.setJ2EEcompliant(isJ2EEcompliant());
613         connection.setDoMessageCompression(isDoMessageCompression());
614         connection.setMessageCompressionLevel(messageCompressionLevel);
615         connection.setMessageCompressionLimit(getMessageCompressionLimit());
616         connection.setMessageCompressionStrategy(getMessageCompressionStrategy());
617         connection.setDoMessageFragmentation(isDoMessageFragmentation());
618         connection.setMessageFragmentationLimit(getMessageFragmentationLimit());
619         connection.setPrepareMessageBodyOnSend(isPrepareMessageBodyOnSend());
620         connection.setInternalConnection(isInternalConnection());
621         connection.setQuickClose(isQuickClose());
622         connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
623         connection.setCopyMessageOnSend(isCopyMessageOnSend());
624         connection.setPrefetchPolicy(getPrefetchPolicy());
625         if (this.clientID != null && this.clientID.length() > 0) {
626             connection.setClientID(this.clientID);
627         }
628         return connection;
629     }
630 
631     /**
632      * Create a JMS QueueConnection
633      *
634      * @return the JMS QueueConnection
635      * @throws JMSException if an error occurs creating the Connection
636      */
637     public QueueConnection createQueueConnection() throws JMSException {
638         return this.createQueueConnection(this.userName, this.password);
639     }
640 
641     /**
642      * @param userName
643      * @param password
644      * @return the QueueConnection
645      * @throws JMSException if an error occurs creating the Connection
646      */
647     public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
648         return (QueueConnection) createConnection(userName, password);
649     }
650 
651     /**
652      * Create a JMS TopicConnection
653      *
654      * @return the JMS TopicConnection
655      * @throws JMSException if an error occurs creating the Connection
656      */
657     public TopicConnection createTopicConnection() throws JMSException {
658         return this.createTopicConnection(this.userName, this.password);
659     }
660 
661     /**
662      * @param userName
663      * @param password
664      * @return the TopicConnection
665      * @throws JMSException if an error occurs creating the Connection
666      */
667     public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
668         return (TopicConnection) createConnection(userName, password);
669     }
670 
671 
672     public void start() throws JMSException {
673     }
674 
675     /**
676      * A hook to allow any embedded JMS Broker's to be closed down
677      *
678      * @throws JMSException
679      */
680     public synchronized void stop() throws JMSException {
681         // Stop all embded brokers that we started.
682         for (Iterator iter = startedEmbeddedBrokers.iterator(); iter.hasNext();) {
683             String uri = (String) iter.next();
684             brokerContext.deregisterConnector(uri);
685         }
686         if (brokerContainer != null) {
687             brokerContainer.stop();
688             brokerContainer = null;
689         }
690     }
691 
692 
693     public Broker getEmbeddedBroker() throws JMSException {
694         if (isUseEmbeddedBroker()) {
695             return getContainer(getBrokerName(), getBrokerName()).getBroker();
696         }
697         return null;
698     }
699 
700     public static synchronized void registerBroker(String theURLString, BrokerConnector brokerConnector) {
701         BrokerContext.getInstance().registerConnector(theURLString, brokerConnector);
702     }
703 
704     public static synchronized void unregisterBroker(String theURLString) {
705         BrokerContext.getInstance().deregisterConnector(theURLString);
706     }
707 
708 
709     // Implementation methods
710     //-------------------------------------------------------------------------
711 
712 
713     /**
714      * Set the properties that will represent the instance in JNDI
715      *
716      * @param props
717      */
718     protected void buildFromProperties(Properties props) {
719         this.userName = props.getProperty("userName", this.userName);
720         this.password = props.getProperty("password", this.password);
721         String temp = props.getProperty(Context.PROVIDER_URL);
722         if (temp == null || temp.length() == 0) {
723              temp = props.getProperty("brokerURL");
724         }
725         if (temp != null && temp.length() > 0) {
726             this.brokerURL = temp;
727         }
728         this.brokerName = props.getProperty("brokerName", this.brokerName);
729         this.clientID = props.getProperty("clientID");
730         this.useAsyncSend = getBoolean(props, "useAsyncSend", true);
731         this.useEmbeddedBroker = getBoolean(props, "useEmbeddedBroker");
732         this.brokerXmlConfig = props.getProperty("brokerXmlConfig", this.brokerXmlConfig);
733         this.J2EEcompliant = getBoolean(props,"J2EEcompliant",true);
734         if (props.containsKey("turboBoost")){
735             this.setTurboBoost(getBoolean(props, "turboBoost"));
736         }
737     }
738 
739     /**
740      * Initialize the instance from properties stored in JNDI
741      *
742      * @param props
743      */
744     protected void populateProperties(Properties props) {
745         props.put("userName", this.userName);
746         props.put("password", this.password);
747         props.put("brokerURL", this.brokerURL);
748         props.put(Context.PROVIDER_URL, this.brokerURL);
749         props.put("brokerName", this.brokerName);
750         if (this.clientID != null) {
751             props.put("clientID", this.clientID);
752         }
753         props.put("useAsyncSend", (useAsyncSend) ? "true" : "false");
754         props.put("useEmbeddedBroker", (useEmbeddedBroker) ? "true" : "false");
755         props.put("J2EEcompliant", (this.J2EEcompliant) ? "true" : "false");
756         props.put("turboBoost", (isTurboBoost()) ? "true" : "false");
757         if (this.brokerXmlConfig != null) {
758             props.put("brokerXmlConfig", this.brokerXmlConfig);
759         }
760     }
761 
762     /**
763      * Helper method to return the property value as a boolean flag
764      *
765      * @param props
766      * @param key
767      * @return
768      */
769     protected boolean getBoolean(Properties props, String key) {
770         return getBoolean(props, key, false);
771     }
772 
773     /**
774      * Helper method to return the property value as a boolean flag
775      *
776      * @param props
777      * @param key
778      * @param defaultValue
779      * @return
780      */
781     protected boolean getBoolean(Properties props, String key, boolean defaultValue) {
782         String value = props.getProperty(key);
783         return value != null ? value.equalsIgnoreCase("true") : defaultValue;
784     }
785 
786     protected BrokerContainerFactory createBrokerContainerFactory() throws JMSException {
787         if (brokerXmlConfig != null) {
788             return XmlConfigHelper.createBrokerContainerFactory(brokerXmlConfig);
789         }
790         return new BrokerContainerFactoryImpl();
791     }
792 
793     /**
794      * Factory method to create a TransportChannel from a URL
795      * @param theURLString
796      * @return the TransportChannel to use with the embedded broker
797      * @throws JMSException
798      */
799     protected TransportChannel createTransportChannel(String theURLString) throws JMSException {
800         URI uri = createURI(theURLString);
801         TransportChannelFactory factory = TransportChannelProvider.getFactory(uri);
802         BrokerConnector brokerConnector = null;
803         boolean created = false;
804         TransportChannel transportChannel = null;
805         boolean embedServer = isUseEmbeddedBroker() || factory.requiresEmbeddedBroker();
806         if (embedServer) {
807             synchronized (this) {
808                 if (factory.requiresEmbeddedBroker()) {
809                     transportChannel = factory.create(getWireFormat(), uri);
810                     brokerConnector = transportChannel.getEmbeddedBrokerConnector();
811                 }
812                 if (brokerConnector == null) {
813                     brokerConnector = brokerContext.getConnectorByURL(theURLString);
814                     if (brokerConnector == null) {
815                         brokerConnector = createBrokerConnector(theURLString);
816                         brokerContext.registerConnector(theURLString, brokerConnector);
817                         startedEmbeddedBrokers.add(theURLString);
818                         created = true;
819                     }
820                 }
821                 else {
822                     created = true;
823                 }
824             }
825         }
826         if (transportChannel == null){
827             transportChannel = factory.create(getWireFormat(), uri);
828         }
829        
830         if (embedServer) {
831             return ensureServerIsAvailable(uri, transportChannel, brokerConnector, created);
832         }
833         return transportChannel;
834     }
835 
836     protected synchronized BrokerContainer getContainer(String url, String name) throws JMSException {
837         if (brokerContainer == null) {
838             brokerContainer = brokerContext.getBrokerContainerByName(url, name, getBrokerContainerFactory());
839         }
840         return brokerContainer;
841     }
842 
843     protected BrokerConnector createBrokerConnector(String url) throws JMSException {
844         BrokerConnector brokerConnector;
845         brokerConnector = new BrokerConnectorImpl(getContainer(url, getBrokerName()), url, getWireFormat());
846         brokerConnector.start();
847 
848         // lets wait a little for the server to startup
849         log.info("Embedded JMS Broker has started");
850         try {
851             Thread.sleep(1000);
852         }
853         catch (InterruptedException e) {
854             log.warn("caught exception sleeping",e);
855         }
856         return brokerConnector;
857     }
858 
859 
860     protected TransportChannel ensureServerIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
861         ensureVmServerIsAvailable(channel, brokerConnector);
862         if (channel.isMulticast()) {
863             return ensureMulticastChannelIsAvailable(remoteLocation, channel, brokerConnector, created);
864         }
865         return channel;
866     }
867 
868     private void ensureVmServerIsAvailable(TransportChannel channel, BrokerConnector brokerConnector) throws JMSException {
869         if (channel instanceof VmTransportChannel && brokerConnector instanceof TransportChannelListener) {
870             VmTransportChannel answer = (VmTransportChannel) channel;
871             answer.connect(brokerConnector);
872         }
873     }
874 
875     protected TransportChannel ensureMulticastChannelIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
876         if (created) {
877             BrokerConnectorImpl brokerImpl = (BrokerConnectorImpl) brokerConnector;
878 
879             BrokerClientImpl client = new BrokerClientImpl();
880             client.initialize(brokerImpl, channel);
881             channel.start();
882             String brokerClientID = createMulticastClientID();
883             channel.setClientID(brokerClientID);
884 
885             // lets spoof a consumer for topics which will replicate messages
886             // over the multicast transport
887             ConnectionInfo info = new ConnectionInfo();
888             info.setHostName(IdGenerator.getHostName());
889             info.setClientId(brokerClientID);
890             info.setStarted(true);
891             client.consumeConnectionInfo(info);
892 
893             ConsumerInfo consumerInfo = new ConsumerInfo();
894             consumerInfo.setDestination(new ActiveMQTopic(">"));
895             consumerInfo.setNoLocal(true);
896             consumerInfo.setClientId(brokerClientID);
897             consumerInfo.setConsumerId(idGenerator.generateId());
898             consumerInfo.setStarted(true);
899             client.consumeConsumerInfo(consumerInfo);
900 
901             consumerInfo = new ConsumerInfo();
902             consumerInfo.setDestination(new ActiveMQQueue(">"));
903             consumerInfo.setNoLocal(true);
904             consumerInfo.setClientId(brokerClientID);
905             consumerInfo.setConsumerId(idGenerator.generateId());
906             consumerInfo.setStarted(true);
907             client.consumeConsumerInfo(consumerInfo);
908         }
909 
910         // now lets create a VM channel that the JMS client will use
911         // to connect to the embedded brokerConnector
912         URI localURI = createURI("vm", remoteLocation);
913         TransportChannel localChannel = TransportChannelProvider.create(getWireFormat(), localURI);
914         ensureVmServerIsAvailable(localChannel, brokerConnector);
915         return localChannel;
916     }
917 
918     /**
919      * Creates the clientID for the multicast client (used to dispatch local
920      * messages over a multicast bus)
921      */
922     protected String createMulticastClientID() {
923         return idGenerator.generateId();
924     }
925 
926     protected URI createURI(String protocol, URI uri) throws JMSException {
927         try {
928             return new URI(protocol, uri.getRawSchemeSpecificPart(), uri.getFragment());
929         }
930         catch (URISyntaxException e) {
931             JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
932             jmsEx.setLinkedException(e);
933             throw jmsEx;
934 
935         }
936     }
937 
938     protected URI createURI(String uri) throws JMSException {
939         try {
940             if (uri == null) {
941                 throw new JMSException("The connection URI must be specified!");
942             }
943             return new URI(uri);
944         }
945         catch (URISyntaxException e) {
946             e.printStackTrace();
947             JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
948             jmsEx.setLinkedException(e);
949             throw jmsEx;
950 
951         }
952     }
953 
954     /**
955      * Called when a connection is closed so that we can shut down any embedded brokers cleanly
956      *
957      * @param connection
958      */
959     synchronized void onConnectionClose(ActiveMQConnection connection) throws JMSException {
960         if (--connectionCount <= 0) {
961             // close any broker if we've got one
962             stop();
963         }
964 
965     }
966 
967     synchronized void onConnectionCreate(ActiveMQConnection connection) {
968         ++connectionCount;
969     }
970     
971 }