Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ra/ActiveMQResourceAdapter.java


1   /**
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.activemq.ra;
19  
20  import java.util.HashMap;
21  
22  import javax.jms.Connection;
23  import javax.jms.JMSException;
24  import javax.jms.XAConnection;
25  import javax.jms.XASession;
26  import javax.resource.NotSupportedException;
27  import javax.resource.ResourceException;
28  import javax.resource.spi.ActivationSpec;
29  import javax.resource.spi.BootstrapContext;
30  import javax.resource.spi.ResourceAdapter;
31  import javax.resource.spi.ResourceAdapterInternalException;
32  import javax.resource.spi.endpoint.MessageEndpointFactory;
33  import javax.transaction.xa.XAResource;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.activemq.ActiveMQConnection;
38  import org.activemq.ActiveMQConnectionFactory;
39  import org.activemq.XmlConfigHelper;
40  import org.activemq.broker.BrokerContainer;
41  import org.activemq.broker.BrokerContainerFactory;
42  import org.activemq.broker.BrokerContext;
43  import org.activemq.util.IdGenerator;
44  
45  /**
46   * Knows how to connect to one ActiveMQ server. It can then activate endpoints
47   * and deliver messages to those enpoints using the connection configure in the
48   * resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
49   * 
50   * @version $Revision: 1.2 $
51   */
52  public class ActiveMQResourceAdapter implements ResourceAdapter {
53      private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
54  
55      private static final String ASF_ENDPOINT_WORKER_TYPE = "asf";
56  
57      private static final String POLLING_ENDPOINT_WORKER_TYPE = "polling";
58  
59      private BootstrapContext bootstrapContext;
60  
61      private HashMap endpointWorkers = new HashMap();
62  
63      final private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
64  
65      private String endpointWorkerType = ASF_ENDPOINT_WORKER_TYPE;
66  
67      private ActiveMQConnectionFactory connectionFactory;
68  
69      private BrokerContainer container;
70      
71      private Boolean useEmbeddedBroker;
72      private String brokerXmlConfig;
73  
74      private HashMap connectionFactoryMap = new HashMap(1);
75  
76      public ActiveMQResourceAdapter() {
77      }
78  
79      /**
80       * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
81       */
82      public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
83          this.bootstrapContext = bootstrapContext;
84          if (isUseEmbeddedBroker() != null && isUseEmbeddedBroker().booleanValue()) {
85              createBroker();
86          }
87      }
88  
89      private void createBroker() throws ResourceAdapterInternalException {
90          try {
91              BrokerContainerFactory brokerContainerFactory = XmlConfigHelper.createBrokerContainerFactory(getBrokerXmlConfig());
92  
93              IdGenerator idgen = new IdGenerator();
94              container = brokerContainerFactory.createBrokerContainer(idgen.generateId(), BrokerContext.getInstance());
95              container.start();
96              connectionFactory = new ActiveMQConnectionFactory(container, getServerUrl());
97          } catch (JMSException e) {
98              log.error(e.toString(), e);
99              throw new ResourceAdapterInternalException("Failed to startup an embedded broker", e);
100         }
101     }
102 
103     /**
104      * Return a connection using the default connection request info from the RAR
105      * deployment.
106      */
107     public ActiveMQConnection makeConnection() throws JMSException {
108         return makeConnection(info);
109     }
110 
111     /**
112      * Return a connection using a specific connection request info.
113      */
114     public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo crInfo) throws JMSException {
115 
116         ActiveMQConnectionFactory connectionFactory = getConnectionFactory(crInfo);
117 
118         String userName = info.getUserName();
119         String password = info.getPassword();
120         ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
121 
122         String clientId = info.getClientid();
123         if (clientId != null) {
124             physicalConnection.setClientID(clientId);
125         }
126         return physicalConnection;
127     }
128 
129     /**
130      * @param activationSpec
131      */
132     public ActiveMQConnection makeConnection(ActiveMQActivationSpec activationSpec) throws JMSException {
133         //use the default RA connection request for info
134         ActiveMQConnectionFactory connectionFactory = getConnectionFactory(info);
135         String userName = defaultValue(activationSpec.getUserName(), info.getUserName());
136         String password = defaultValue(activationSpec.getPassword(), info.getPassword());
137         ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
138         if (activationSpec.isDurableSubscription()) {
139             physicalConnection.setClientID(activationSpec.getClientId());
140         }
141         return physicalConnection;
142     }
143 
144     /**
145      * Returns a connection factory given a connection configuration.
146      * The implementation of this method treats the factories as singletons
147      * only creating one factory for a given set of configuration data.
148      */
149     private ActiveMQConnectionFactory getConnectionFactory(ActiveMQConnectionRequestInfo crInfo) {
150         //use adapter default if none provided
151         if(crInfo == null) {
152             crInfo = info;
153         }
154 
155         if(!(connectionFactoryMap.containsKey(crInfo))) {
156             //slightly possible the factory can be set twice here
157             //but highly unlikely and no real functional impact
158             //other than an extra reference
159             synchronized(connectionFactoryMap) {
160                 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(crInfo.getServerUrl());
161                 connectionFactoryMap.put(crInfo, factory);
162                 return factory;
163             }
164         }
165         return (ActiveMQConnectionFactory)connectionFactoryMap.get(crInfo);
166     }
167 
168     private String defaultValue(String value, String defaultValue) {
169         if (value != null)
170             return value;
171         return defaultValue;
172     }
173 
174     /**
175      * @see javax.resource.spi.ResourceAdapter#stop()
176      */
177     public void stop() {
178       while (endpointWorkers.size() > 0) {
179         ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next();
180         endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
181       }
182         stopBroker();
183         this.bootstrapContext = null;
184     }
185 
186     private void stopBroker() {
187         if (container != null) {
188             try {
189                 container.stop();
190             } catch (JMSException e) {
191                 log.warn("Exception while stopping the broker container", e);
192             }
193         }
194     }
195 
196     /**
197      * @return
198      */
199     public BootstrapContext getBootstrapContext() {
200         return bootstrapContext;
201     }
202 
203     /**
204      * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
205      *      javax.resource.spi.ActivationSpec)
206      */
207     public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
208             throws ResourceException {
209 
210         // spec section 5.3.3
211         if (activationSpec.getResourceAdapter() != this) {
212             throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance");
213         }
214 
215         if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
216 
217             ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
218                     (ActiveMQActivationSpec) activationSpec);
219             // This is weird.. the same endpoint activated twice.. must be a
220             // container error.
221             if (endpointWorkers.containsKey(key)) {
222                 throw new IllegalStateException("Endpoint previously activated");
223             }
224 
225             ActiveMQBaseEndpointWorker worker;
226             if (POLLING_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
227                 worker = new ActiveMQPollingEndpointWorker(this, key);
228             } else if (ASF_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
229                 worker = new ActiveMQAsfEndpointWorker(this, key);
230             } else {
231                 throw new NotSupportedException("That type of EndpointWorkerType is not supported: "
232                         + getEndpointWorkerType());
233             }
234 
235             endpointWorkers.put(key, worker);
236             worker.start();
237 
238         } else {
239             throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
240         }
241 
242     }
243 
244     /**
245      * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
246      *      javax.resource.spi.ActivationSpec)
247      */
248     public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
249 
250         if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
251             ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
252                     (ActiveMQActivationSpec) activationSpec);
253             ActiveMQBaseEndpointWorker worker = (ActiveMQBaseEndpointWorker) endpointWorkers.remove(key);
254             if (worker == null) {
255                 // This is weird.. that endpoint was not activated.. oh well..
256                 // this method
257                 // does not throw exceptions so just return.
258                 return;
259             }
260             try {
261                 worker.stop();
262             } catch (InterruptedException e) {
263                 // We interrupted.. we won't throw an exception but will stop
264                 // waiting for the worker
265                 // to stop.. we tried our best. Keep trying to interrupt the
266                 // thread.
267                 Thread.currentThread().interrupt();
268             }
269 
270         }
271 
272     }
273 
274     /**
275      * We only connect to one resource manager per ResourceAdapter instance, so
276      * any ActivationSpec will return the same XAResource.
277      * 
278      * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
279      */
280     public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
281         Connection connection = null;
282         try {
283             connection = makeConnection();
284             if (connection instanceof XAConnection) {
285                 XASession session = ((XAConnection) connection).createXASession();
286                 XAResource xaResource = session.getXAResource();
287                 return new XAResource[] { xaResource };
288             } else {
289                 return new XAResource[] {};
290             }
291         } catch (JMSException e) {
292             throw new ResourceException(e);
293         } finally {
294             try {
295                 connection.close();
296             } catch (Throwable ignore) {
297             }
298         }
299     }
300 
301     // ///////////////////////////////////////////////////////////////////////
302     //
303     // Java Bean getters and setters for this ResourceAdapter class.
304     //
305     // ///////////////////////////////////////////////////////////////////////
306 
307     /**
308      * @return
309      */
310     public String getClientid() {
311         return emptyToNull(info.getClientid());
312     }
313 
314     /**
315      * @return
316      */
317     public String getPassword() {
318         return emptyToNull(info.getPassword());
319     }
320 
321     /**
322      * @return
323      */
324     public String getServerUrl() {
325         return info.getServerUrl();
326     }
327 
328     /**
329      * @return
330      */
331     public String getUserName() {
332         return emptyToNull(info.getUserName());
333     }
334 
335     /**
336      * @param clientid
337      */
338     public void setClientid(String clientid) {
339         info.setClientid(clientid);
340     }
341 
342     /**
343      * @param password
344      */
345     public void setPassword(String password) {
346         info.setPassword(password);
347     }
348 
349     /**
350      * @param url
351      */
352     public void setServerUrl(String url) {
353         info.setServerUrl(url);
354     }
355 
356     /**
357      * @param userid
358      */
359     public void setUserName(String userid) {
360         info.setUserName(userid);
361     }
362 
363     /**
364      * @return Returns the endpointWorkerType.
365      */
366     public String getEndpointWorkerType() {
367         return endpointWorkerType;
368     }
369 
370     /**
371      * @param endpointWorkerType
372      *            The endpointWorkerType to set.
373      */
374     public void setEndpointWorkerType(String endpointWorkerType) {
375         this.endpointWorkerType = endpointWorkerType.toLowerCase();
376     }
377 
378     public String getBrokerXmlConfig() {
379         return brokerXmlConfig;
380     }
381 
382     /**
383      * Sets the <a href="http://activemq.org/Xml+Configuration">XML
384      * configuration file </a> used to configure the ActiveMQ broker via Spring
385      * if using embedded mode.
386      * 
387      * @param brokerXmlConfig
388      *            is the filename which is assumed to be on the classpath unless
389      *            a URL is specified. So a value of <code>foo/bar.xml</code>
390      *            would be assumed to be on the classpath whereas
391      *            <code>file:dir/file.xml</code> would use the file system.
392      *            Any valid URL string is supported.
393      * @see #setUseEmbeddedBroker(Boolean)
394      */
395     public void setBrokerXmlConfig(String brokerXmlConfig) {
396         this.brokerXmlConfig=brokerXmlConfig;
397     }
398 
399     public Boolean isUseEmbeddedBroker() {
400         return useEmbeddedBroker;
401     }
402 
403     public void setUseEmbeddedBroker(Boolean useEmbeddedBroker) {
404         this.useEmbeddedBroker = useEmbeddedBroker;
405     }
406 
407     /**
408      * @return Returns the info.
409      */
410     public ActiveMQConnectionRequestInfo getInfo() {
411         return info;
412     }
413 
414     public boolean equals(Object o) {
415         if (this == o) {
416             return true;
417         }
418         if (!(o instanceof ActiveMQResourceAdapter)) {
419             return false;
420         }
421 
422         final ActiveMQResourceAdapter activeMQResourceAdapter = (ActiveMQResourceAdapter) o;
423 
424         if (!endpointWorkerType.equals(activeMQResourceAdapter.endpointWorkerType)) {
425             return false;
426         }
427         if (!info.equals(activeMQResourceAdapter.info)) {
428             return false;
429         }
430         if ( notEqual(useEmbeddedBroker, activeMQResourceAdapter.useEmbeddedBroker) ) {
431             return false;
432         }
433         if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.brokerXmlConfig) ) {
434             return false;
435         }
436 
437         return true;
438     }
439     
440     private boolean notEqual(Object o1, Object o2) {
441         return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
442     }
443 
444 
445     public int hashCode() {
446         int result;
447         result = info.hashCode();
448         result = 29 * result + endpointWorkerType.hashCode();
449         if (useEmbeddedBroker != null && useEmbeddedBroker.booleanValue()) {
450             result = result * 29 + 1;
451         }
452         if( brokerXmlConfig !=null ) {
453             result ^= brokerXmlConfig.hashCode();
454         }
455         return result;
456     }
457 
458     private String emptyToNull(String value) {
459         if (value == null || value.length() == 0) {
460             return null;
461         }
462         return value;
463     }
464 
465     public Boolean getUseEmbeddedBroker() {
466         return useEmbeddedBroker;
467     }
468 
469     public Boolean getUseInboundSession() {
470         return info.getUseInboundSession();
471     }
472 
473     public void setUseInboundSession(Boolean useInboundSession) {
474         info.setUseInboundSession(useInboundSession);
475     }
476 
477 }