Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ra/ActiveMQManagedConnection.java


1   /**
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.activemq.ra;
19  
20  import java.io.PrintWriter;
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  
24  import javax.jms.Connection;
25  import javax.jms.ExceptionListener;
26  import javax.jms.JMSException;
27  import javax.resource.ResourceException;
28  import javax.resource.spi.ConnectionEvent;
29  import javax.resource.spi.ConnectionEventListener;
30  import javax.resource.spi.ConnectionRequestInfo;
31  import javax.resource.spi.LocalTransaction;
32  import javax.resource.spi.ManagedConnection;
33  import javax.resource.spi.ManagedConnectionMetaData;
34  import javax.security.auth.Subject;
35  import javax.transaction.xa.XAResource;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.activemq.ActiveMQConnection;
40  import org.activemq.LocalTransactionEventListener;
41  import org.activemq.TransactionContext;
42  
43  /**
44   * ActiveMQManagedConnection maps to real physical connection to the
45   * server.  Since a ManagedConnection has to provide a transaction
46   * managment interface to the physical connection, and sessions
47   * are the objects implement transaction managment interfaces in
48   * the JMS API, this object also maps to a singe physical JMS session.
49   * <p/>
50   * The side-effect is that JMS connection the application gets
51   * will allways create the same session object.  This is good if
52   * running in an app server since the sessions are elisted in the
53   * context transaction.  This is bad if used outside of an app
54   * server since the user may be trying to create 2 different
55   * sessions to coordinate 2 different uow.
56   *
57   * @version $Revision: 1.1.1.1 $
58   */
59  public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO: , DissociatableManagedConnection {
60  
61      private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
62  
63      private PrintWriter logWriter;
64  
65      private final ActiveMQConnection physicalConnection;
66      private final TransactionContext transactionContext;
67      private final ArrayList proxyConnections = new ArrayList();
68      private final ArrayList listeners = new ArrayList();
69      private final LocalAndXATransaction localAndXATransaction;
70      
71      private Subject subject;
72      private ActiveMQConnectionRequestInfo info;
73      private boolean destoryed;
74  
75      public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
76          try {
77              this.subject = subject;
78              this.info = info;
79              this.physicalConnection = physicalConnection;
80              this.transactionContext = new TransactionContext(physicalConnection);
81              
82              this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
83                  public void setInManagedTx(boolean inManagedTx) throws JMSException {
84                      super.setInManagedTx(inManagedTx);                    
85                      Iterator iterator = proxyConnections.iterator();
86                      while (iterator.hasNext()) {
87                          JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
88                          proxy.setUseSharedTxContext(inManagedTx);
89                      }                    
90                  }
91              };
92              
93              this.transactionContext.setLocalTransactionEventListener( new LocalTransactionEventListener() {
94                  public void beginEvent() {
95                      fireBeginEvent();
96                  }
97                  public void commitEvent() {
98                      fireCommitEvent();
99                  }
100                 public void rollbackEvent() {
101                     fireRollbackEvent();
102                 }
103             });
104                         
105             physicalConnection.setExceptionListener(this);
106     } catch (JMSException e) {
107             throw new ResourceException("Could not create a new connection: "+e.getMessage(), e);
108         }          
109     }
110     
111     public boolean isInManagedTx() {
112         return localAndXATransaction.isInManagedTx();
113     }
114     
115     static public boolean matches(Object x, Object y) {
116         if (x == null ^ y == null) {
117             return false;
118         }
119         if (x != null && !x.equals(y)) {
120             return false;
121         }
122         return true;
123     }
124 
125     public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {
126 
127         // Do we need to change the associated userid/password
128         if( !matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword()) ) {
129             ((ActiveMQConnection)physicalConnection).changeUserInfo(info.getUserName(), info.getPassword());
130         }
131         
132         // Do we need to set the clientId?
133         if( info.getClientid()!=null && info.getClientid().length()>0 ) 
134             physicalConnection.setClientID(info.getClientid());
135 
136         this.subject = subject;
137         this.info = info;
138     }
139 
140     public Connection getPhysicalConnection() {
141         return physicalConnection;
142     }
143     
144     private void fireBeginEvent() {
145         ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
146                 ConnectionEvent.LOCAL_TRANSACTION_STARTED);
147         Iterator iterator = listeners.iterator();
148         while (iterator.hasNext()) {
149             ConnectionEventListener l = (ConnectionEventListener) iterator.next();
150             l.localTransactionStarted(event);
151         }
152     }
153 
154     private void fireCommitEvent() {
155         ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
156                 ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
157         Iterator iterator = listeners.iterator();
158         while (iterator.hasNext()) {
159             ConnectionEventListener l = (ConnectionEventListener) iterator.next();
160             l.localTransactionCommitted(event);
161         }
162     }
163 
164     private void fireRollbackEvent() {
165         ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
166                 ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
167         Iterator iterator = listeners.iterator();
168         while (iterator.hasNext()) {
169             ConnectionEventListener l = (ConnectionEventListener) iterator.next();
170             l.localTransactionRolledback(event);
171         }
172     }
173 
174     private void fireCloseEvent(JMSConnectionProxy proxy) {
175         ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
176                 ConnectionEvent.CONNECTION_CLOSED);
177         event.setConnectionHandle(proxy);
178         
179         Iterator iterator = listeners.iterator();
180         while (iterator.hasNext()) {
181             ConnectionEventListener l = (ConnectionEventListener) iterator.next();
182             l.connectionClosed(event);
183         }
184     }
185 
186     private void fireErrorOccurredEvent(Exception error) {
187         ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
188                 ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
189         Iterator iterator = listeners.iterator();
190         while (iterator.hasNext()) {
191             ConnectionEventListener l = (ConnectionEventListener) iterator.next();
192             l.connectionErrorOccurred(event);
193         }
194     }
195 
196     /**
197      * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
198      *      javax.resource.spi.ConnectionRequestInfo)
199      */
200     public Object getConnection(Subject subject, ConnectionRequestInfo info)
201             throws ResourceException {
202         JMSConnectionProxy proxy = new JMSConnectionProxy(this);
203         proxyConnections.add(proxy);
204         return proxy;
205     }
206 
207     private boolean isDestroyed() {
208         return destoryed;
209     }
210     
211     /**
212      * Close down the physical connection to the server.
213      *
214      * @see javax.resource.spi.ManagedConnection#destroy()
215      */
216     public void destroy() throws ResourceException {
217         // Have we allready been destroyed??
218         if (isDestroyed()) {
219             return;
220         }
221 
222         cleanup();
223 
224         try {
225             physicalConnection.close();
226             destoryed = true;
227         } catch (JMSException e) {
228             log.info("Error occured during close of a JMS connection.", e);
229         }
230     }
231 
232     /**
233      * Cleans up all proxy handles attached to this physical connection so that
234      * they cannot be used anymore.
235      * 
236      * @see javax.resource.spi.ManagedConnection#cleanup()
237      */
238     public void cleanup() throws ResourceException {
239       
240         // Have we allready been destroyed??
241         if (isDestroyed()) {
242             return;
243         }
244 
245         Iterator iterator = proxyConnections.iterator();
246         while (iterator.hasNext()) {
247             JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
248             proxy.cleanup();
249         }
250         proxyConnections.clear();
251 
252         try {
253             ((ActiveMQConnection)physicalConnection).cleanup();
254         } catch (JMSException e) {
255             throw new ResourceException("Could cleanup the ActiveMQ connection: "+e, e);
256         }
257             
258     }
259 
260     /**
261      * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
262      */
263     public void associateConnection(Object connection) throws ResourceException {
264         throw new ResourceException("Not supported.");
265     }
266 
267     /**
268      * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
269      */
270     public void addConnectionEventListener(ConnectionEventListener listener) {
271         listeners.add(listener);
272     }
273 
274     /**
275      * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
276      */
277     public void removeConnectionEventListener(ConnectionEventListener listener) {
278         listeners.remove(listener);
279     }
280 
281     /**
282      * @see javax.resource.spi.ManagedConnection#getXAResource()
283      */
284     public XAResource getXAResource() throws ResourceException {
285         return localAndXATransaction;
286     }
287 
288     /**
289      * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
290      */
291     public LocalTransaction getLocalTransaction() throws ResourceException {
292         return localAndXATransaction;
293     }
294 
295     /**
296      * @see javax.resource.spi.ManagedConnection#getMetaData()
297      */
298     public ManagedConnectionMetaData getMetaData() throws ResourceException {
299         return new ManagedConnectionMetaData() {
300 
301             public String getEISProductName() throws ResourceException {
302                 if (physicalConnection == null) {
303                     throw new ResourceException("Not connected.");
304                 }
305                 try {
306                     return physicalConnection.getMetaData().getJMSProviderName();
307                 }
308                 catch (JMSException e) {
309                     throw new ResourceException("Error accessing provider.", e);
310                 }
311             }
312 
313             public String getEISProductVersion() throws ResourceException {
314                 if (physicalConnection == null) {
315                     throw new ResourceException("Not connected.");
316                 }
317                 try {
318                     return physicalConnection.getMetaData().getProviderVersion();
319                 }
320                 catch (JMSException e) {
321                     throw new ResourceException("Error accessing provider.", e);
322                 }
323             }
324 
325             public int getMaxConnections() throws ResourceException {
326                 if (physicalConnection == null) {
327                     throw new ResourceException("Not connected.");
328                 }
329                 return Integer.MAX_VALUE;
330             }
331 
332             public String getUserName() throws ResourceException {
333                 if (physicalConnection == null) {
334                     throw new ResourceException("Not connected.");
335                 }
336                 try {
337                     return physicalConnection.getClientID();
338                 }
339                 catch (JMSException e) {
340                     throw new ResourceException("Error accessing provider.", e);
341                 }
342             }
343         };
344     }
345 
346     /**
347      * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
348      */
349     public void setLogWriter(PrintWriter logWriter) throws ResourceException {
350         this.logWriter = logWriter;
351     }
352 
353     /**
354      * @see javax.resource.spi.ManagedConnection#getLogWriter()
355      */
356     public PrintWriter getLogWriter() throws ResourceException {
357         return logWriter;
358     }
359 
360     /**
361      * @param subject
362      * @param info
363      * @return
364      */
365     public boolean matches(Subject subject, ConnectionRequestInfo info) {
366 
367         // Check to see if it is our info class
368         if (info == null) {
369             return false;
370         }
371         if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
372             return false;
373         }
374 
375         // Do the subjects match?
376         if (subject == null ^ this.subject == null) {
377             return false;
378         }
379         if (subject != null && !subject.equals(this.subject)) {
380             return false;
381         }
382 
383         // Does the info match?
384         return info.equals(this.info);
385     }
386 
387     /**
388      * When a proxy is closed this cleans up the proxy and notifys the
389      * ConnectionEventListeners that a connection closed.
390      *
391      * @param proxy
392      */
393     public void proxyClosedEvent(JMSConnectionProxy proxy) {
394         proxyConnections.remove(proxy);
395         proxy.cleanup();
396         fireCloseEvent(proxy);
397     }
398 
399     public void onException(JMSException e) {
400         log.warn("Connection failed: "+e);
401         log.debug("Cause: ", e);
402         
403         // Let any active proxy connections know that exception occured.
404         for (Iterator iter = proxyConnections.iterator(); iter.hasNext();) {
405             JMSConnectionProxy proxy = (JMSConnectionProxy) iter.next();
406             proxy.onException(e);
407         }
408         // Let the container know that the error occured.
409         fireErrorOccurredEvent(e);
410     }
411 
412     /**
413      * @return Returns the transactionContext.
414      */
415     public TransactionContext getTransactionContext() {
416         return transactionContext;
417     }
418 
419 }