Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/util/connection/ServerConnectionFactory.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.activemq.util.connection;
19  
20  import org.activemq.ActiveMQConnectionFactory;
21  import org.activemq.ActiveMQConnection;
22  import org.mr.api.jms.MantaTopicConnectionFactory;
23  import org.mr.api.jms.MantaQueueConnectionFactory;
24  
25  import org.apache.jmeter.util.JMeterUtils;
26  
27  import javax.jms.Connection;
28  import javax.jms.Session;
29  import javax.jms.JMSException;
30  import javax.jms.Topic;
31  import javax.jms.Queue;
32  import javax.jms.ConnectionFactory;
33  import javax.jms.TopicConnectionFactory;
34  import javax.jms.QueueConnectionFactory;
35  import javax.jms.TopicSession;
36  import javax.jms.QueueSession;
37  import javax.jms.Destination;
38  import javax.jms.TopicConnection;
39  import javax.jms.QueueConnection;
40  
41  import javax.naming.Context;
42  import javax.naming.InitialContext;
43  import javax.naming.NamingException;
44  
45  import java.lang.reflect.Constructor;
46  import java.lang.reflect.InvocationTargetException;
47  import java.util.Properties;
48  
49  /**
50   * Provides static methods for creating Session and Destination objects.
51   */
52  public class ServerConnectionFactory {
53  
54      public static final String SONICMQ_SERVER = JMeterUtils.getResString("sonicmq_server");
55      public static final String TIBCOMQ_SERVER = JMeterUtils.getResString("tibcomq_server");
56      public static final String JBOSSMQ_SERVER = JMeterUtils.getResString("jbossmq_server");
57      public static final String OPENJMS_SERVER = JMeterUtils.getResString("openjms_server");
58      public static final String JORAM_SERVER  = JMeterUtils.getResString("joram_server");
59      public static final String JORAM_CONNECTION_FACTORY = JMeterUtils.getResString("joram_connection_factory");
60      public static final String JORAM_USERNAME = JMeterUtils.getResString("joram_username");
61      public static final String JORAM_PASSWORD = JMeterUtils.getResString("joram_password");
62      public static final String JORAM_NAMING_PORT = JMeterUtils.getResString("joram_naming_port");
63      public static final String MANTARAY_SERVER = JMeterUtils.getResString("mantaray_server");
64  
65      // For testing within IntelliJ running main()
66      /*
67      public static final String SONICMQ_SERVER = "Sonic Server";
68      public static final String TIBCOMQ_SERVER = "Tibco Server";
69      public static final String JBOSSMQ_SERVER = "JbossMQ Server";
70      public static final String OPENJMS_SERVER = "OpenJMS Server";
71      public static final String ACTIVEMQ_SERVER = "ActiveMQ Server";
72      public static final String JORAM_SERVER = "Joram Server";
73      public static final String JORAM_CONNECTION_FACTORY = "!cf";
74      public static final String JORAM_USERNAME = "root";
75      public static final String JORAM_PASSWORD = "root";
76      public static final String JORAM_NAMING_PORT = "16400";
77      public static final String MANTARAY_SERVER = "Mantaray";
78      */
79  
80      public static final String SONICMQ_TOPIC = "progress.message.jclient.TopicConnectionFactory";
81      public static final String SONICMQ_QUEUE = "progress.message.jclient.QueueConnectionFactory";
82      public static final String TIBCOMQ_TOPIC = "com.tibco.tibjms.TibjmsTopicConnectionFactory";
83      public static final String TIBCOMQ_QUEUE = "com.tibco.tibjms.TibjmsQueueConnectionFactory";
84      public static final String NAMING_CONTEXT = "org.jnp.interfaces.NamingContextFactory";
85      public static final String JNP_INTERFACES = "org.jnp.interfaces";
86      public static final String OPENJMS_NAMING_CONTEXT = "org.exolab.jms.jndi.InitialContextFactory";
87      public static final String OPENJMS_TOPIC = "TcpTopicConnectionFactory";
88      public static final String OPENJMS_QUEUE = "TcpQueueConnectionFactory";
89      public static final String JORAM_NAMING_CONTEXT = "fr.dyade.aaa.jndi2.client.NamingContextFactory";
90      public static final String JORAM_TOPIC = "TopicConnectionFactory";
91      public static final String JORAM_QUEUE = "QueueConnectionFactory";
92      public static final String NAMING_HOST = "java.naming.factory.host";
93      public static final String NAMING_PORT = "java.naming.factory.post";
94  
95      public static Topic topicContext;
96  
97      private static int mantarayProducerPortCount = 0;
98      private static int mantarayConsumerPortCount = 0;
99  
100     /**
101      * Closes the connection passed through the parameter
102      *
103      * @param connection - Connection object to be closed.
104      * @param session    - Session object to be closed.
105      * @throws JMSException
106      */
107     public static void close(Connection connection, Session session) throws JMSException {
108         session.close();
109         connection.close();
110     }
111 
112     /**
113      * Dynamically creates a Connection object based on the type of broker.
114      *
115      * @param url            - location of the broker.
116      * @param mqServer       - type of broker that is running.
117      * @param isTopic        - type of message domain.
118      * @param embeddedBroker - specified is the broker is embedded.
119      * @return
120      * @throws JMSException
121      */
122     public static Connection createConnectionFactory(String url,
123                                                      String mqServer,
124                                                      boolean isTopic,
125                                                      boolean embeddedBroker) throws JMSException {
126        if (SONICMQ_SERVER.equals(mqServer)) {
127             //Creates a Connection object for a SONIC MQ server.
128             if (isTopic) {
129                 return createConnectionFactory(url, SONICMQ_TOPIC);
130             } else {
131                 return createConnectionFactory(url, SONICMQ_QUEUE);
132             }
133         } else if (TIBCOMQ_SERVER.equals(mqServer)) {
134             //Creates a Connection object for a TIBCO MQ server.
135             if (isTopic) {
136                 return createConnectionFactory(url, TIBCOMQ_TOPIC);
137             } else {
138                 return createConnectionFactory(url, TIBCOMQ_QUEUE);
139             }
140         } else if (JBOSSMQ_SERVER.equals(mqServer)) {
141             //Creates a Connection object for a JBoss MQ server.
142             try {
143                 InitialContext context = getInitialContext(url, JBOSSMQ_SERVER);
144                 ConnectionFactory factory = (ConnectionFactory) context.lookup("ConnectionFactory");
145                 context.close();
146 
147                 return factory.createConnection();
148 
149             } catch (NamingException e) {
150                 throw new JMSException("Error creating InitialContext ", e.toString());
151             }
152         } else if (OPENJMS_SERVER.equals(mqServer)) {
153             //Creates a Connection object for a OpenJMS server.
154             try {
155                 Context context = getInitialContext(url, OPENJMS_SERVER);
156                 if (isTopic) {
157                     TopicConnectionFactory factory = (TopicConnectionFactory)
158                             context.lookup(OPENJMS_TOPIC);
159                     context.close();
160 
161                     return factory.createTopicConnection();
162 
163                 } else {
164                     QueueConnectionFactory factory = (QueueConnectionFactory)
165                             context.lookup(OPENJMS_QUEUE);
166                     context.close();
167 
168                     return factory.createQueueConnection();
169 
170                 }
171             } catch (NamingException e) {
172                 throw new JMSException("Error creating InitialContext ", e.toString());
173             }
174         } else if (JORAM_SERVER.equals(mqServer)) {
175             //Creates a Connection object for a JORAM server.
176             try {
177                 Context ictx = getInitialContext(url, JORAM_SERVER);
178                 ConnectionFactory cf = (ConnectionFactory) ictx.lookup(JORAM_CONNECTION_FACTORY);
179                 ictx.close();
180                 Connection cnx = cf.createConnection(JORAM_USERNAME, JORAM_PASSWORD);
181 
182                 return cnx;
183 
184             } catch (NamingException e) {
185                 throw new JMSException("Error creating InitialContext ", e.toString());
186             }
187         } else if (MANTARAY_SERVER.equals(mqServer)) {
188             //Creates a Connection object for a Mantaray.
189             System.setProperty("mantaHome",url);
190 
191             if (isTopic) {
192                 TopicConnectionFactory factory = (TopicConnectionFactory) new MantaTopicConnectionFactory();
193 
194                 return factory.createTopicConnection();
195 
196             } else {
197                 QueueConnectionFactory factory = (QueueConnectionFactory) new MantaQueueConnectionFactory();
198 
199                 return factory.createQueueConnection();
200 
201             }
202         } else {
203             //Used to create a session from the default MQ server ActiveMQConnectionFactory.
204             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
205 
206             if (embeddedBroker) {
207                 factory.setUseEmbeddedBroker(true);
208             }
209 
210             factory.setTurboBoost(true);
211             ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
212 
213             c.getPrefetchPolicy().setQueuePrefetch(1000);
214             c.getPrefetchPolicy().setQueueBrowserPrefetch(1000);
215             c.getPrefetchPolicy().setTopicPrefetch(1000);
216             c.getPrefetchPolicy().setDurableTopicPrefetch(1000);
217 
218             return c;
219 
220         }
221 
222     }
223 
224     /**
225      * Creates a Destination object through Session using subject.
226      *
227      * @param session  - Session used to create the Destination.
228      * @param subject  - the subject of the Destination to be created.
229      * @param mqServer - ype of broker that is running.
230      * @param url      - location of the broker.
231      * @param isTopic  - specified is the broker is embedded.
232      * @return
233      * @throws JMSException
234      */
235     public static Destination createDestination(Session session,
236                                                 String subject,
237                                                 String url,
238                                                 String mqServer,
239                                                 boolean isTopic) throws JMSException {
240         if (JBOSSMQ_SERVER.equals(mqServer)) {
241             try {
242                 if (isTopic) {
243                     return (Topic) getInitialContext(url, JBOSSMQ_SERVER).lookup("topic/" + subject);
244                 } else {
245                     return (Queue) getInitialContext(url, JBOSSMQ_SERVER).lookup("queue/" + subject);
246                 }
247             } catch (NamingException e) {
248                 throw new JMSException("Error on lookup for Queue " + subject, e.toString());
249             }
250         } else if (OPENJMS_SERVER.equals(mqServer)) {
251             if (isTopic) {
252                 return ((TopicSession) session).createTopic(subject);
253             } else {
254                 return ((QueueSession) session).createQueue(subject);
255             }
256         } else if (JORAM_SERVER.equals(mqServer)) {
257             try {
258                 if (isTopic) {
259                     return (Topic) getInitialContext(url, JORAM_SERVER).lookup(subject);
260                 } else {
261                     return (Queue) getInitialContext(url, JORAM_SERVER).lookup(subject);
262                 }
263             } catch (NamingException e) {
264                 throw new JMSException("Error on lookup for Queue " + subject, e.toString());
265             }
266         } else {
267             if (isTopic) {
268                 return session.createTopic(subject);
269             } else {
270                 return session.createQueue(subject);
271             }
272         }
273     }
274 
275     /**
276      * Creates a Session object.
277      *
278      * @param connection - Connection object where the session will be created from.
279      * @return
280      * @throws JMSException
281      */
282     public static Session createSession(Connection connection,
283                                         boolean isTransacted,
284                                         String mqServer,
285                                         boolean isTopic) throws JMSException {
286         if (OPENJMS_SERVER.equals(mqServer) || MANTARAY_SERVER.equals(mqServer)) {
287             if (isTransacted) {
288                 if (isTopic) {
289                     TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.SESSION_TRANSACTED);
290 
291                     return ((Session) session);
292 
293                 } else {
294                     QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.SESSION_TRANSACTED);
295 
296                     return ((Session) session);
297 
298                 }
299             } else {
300                 if (isTopic) {
301                     TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
302 
303                     return ((Session) session);
304 
305                 } else {
306                     QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
307 
308                     return ((Session) session);
309 
310                 }
311             }
312         } else {
313             // check when to use Transacted or Non-Transacted type.
314             if (isTransacted) {
315                 return connection.createSession(true, Session.SESSION_TRANSACTED);
316             } else {
317                 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
318             }
319         }
320     }
321 
322     /**
323      * Dynamically creates a ConnectionFactory object depending on the MQ Factory class.
324      *
325      * @param url              - location of the broker.
326      * @param connFactoryClass - fully qualified name of connection factory to be initialized.
327      * @return
328      * @throws JMSException
329      */
330     public static Connection createConnectionFactory(String url, String connFactoryClass) throws JMSException {
331         Class classObject;
332         Constructor constructor;
333         Class[] classParameter = {url.getClass()};
334         Object[] constArgs = {url};
335 
336         try {
337             classObject = Class.forName(connFactoryClass);
338             constructor = classObject.getConstructor(classParameter);
339             ConnectionFactory factory = (ConnectionFactory) constructor.newInstance(constArgs);
340 
341             return factory.createConnection();
342 
343         } catch (ClassNotFoundException e) {
344             throw new JMSException("Unable to find class ", e.toString());
345         } catch (NoSuchMethodException e) {
346             throw new JMSException("No such getConstructor(Class[] class) method found ", e.toString());
347         } catch (InstantiationException e) {
348             throw new JMSException("Unable to instantiate class ", e.toString());
349         } catch (IllegalAccessException e) {
350             throw new JMSException("Unable to instantiate class ", e.toString());
351         } catch (InvocationTargetException e) {
352             throw new JMSException("Unable to instantiate class ", e.toString());
353         }
354     }
355 
356     /**
357      * Creates an InitialContext object which contains the information of the broker.
358      * This is used if the broker uses JNDI.
359      *
360      * @param url - location of the broker.
361      * @return
362      * @throws JMSException
363      */
364     public static InitialContext getInitialContext(String url, String mqServer) throws JMSException {
365         Properties properties = new Properties();
366 
367         if (JBOSSMQ_SERVER.equals(mqServer)) {
368             //Creates a Context oject for JBOSS MQ server
369             properties.put(Context.INITIAL_CONTEXT_FACTORY, NAMING_CONTEXT);
370             properties.put(Context.URL_PKG_PREFIXES, JNP_INTERFACES);
371             properties.put(Context.PROVIDER_URL, url);
372 
373         } else if (OPENJMS_SERVER.equals(mqServer)) {
374             //Creates a Context object for OPENJMS server
375             properties.put(Context.INITIAL_CONTEXT_FACTORY, OPENJMS_NAMING_CONTEXT);
376             properties.put(Context.PROVIDER_URL, url);
377 
378         } else if (JORAM_SERVER.equals(mqServer)) {
379             //Creates a Context object for JORAM server
380             //The JNDI's host is set to be the same as with the Joram broker
381             properties.put(Context.INITIAL_CONTEXT_FACTORY, JORAM_NAMING_CONTEXT);
382             properties.put(NAMING_HOST, getHost(url));
383             properties.put(NAMING_PORT, JORAM_NAMING_PORT);
384 
385         }
386 
387         try {
388             return new InitialContext(properties);
389         } catch (NamingException e) {
390             throw new JMSException("Error creating InitialContext ", e.toString());
391         }
392     }
393 
394     /**
395      * Returns the host part of the URL.
396      *
397      * @param url - location of the broker.
398      * @return host
399      */
400     private static String getHost(String url) {
401         return url.substring(url.lastIndexOf("/") + 1, url.lastIndexOf(":"));
402     }
403 
404 }