Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/TransactionContext.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    * Copyright 2004 Hiram Chirino
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   *
18   **/
19  package org.activemq;
20  
21  import java.util.ArrayList;
22  
23  import javax.jms.JMSException;
24  import javax.jms.TransactionInProgressException;
25  import javax.jms.TransactionRolledBackException;
26  import javax.transaction.xa.XAException;
27  import javax.transaction.xa.XAResource;
28  import javax.transaction.xa.Xid;
29  
30  import org.activemq.message.ActiveMQXid;
31  import org.activemq.message.IntResponseReceipt;
32  import org.activemq.message.ResponseReceipt;
33  import org.activemq.message.TransactionInfo;
34  import org.activemq.message.XATransactionInfo;
35  import org.activemq.util.IdGenerator;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  
39  /**
40   * A TransactionContext provides the means to control a JMS transaction.  It provides
41   * a local transaction interface and also an XAResource interface.
42   * 
43   * <p/>
44   * An application server controls the transactional assignment of an XASession
45   * by obtaining its XAResource. It uses the XAResource to assign the session
46   * to a transaction, prepare and commit work on the transaction, and so on.
47   * <p/>
48   * An XAResource provides some fairly sophisticated facilities for
49   * interleaving work on multiple transactions, recovering a list of
50   * transactions in progress, and so on. A JTA aware JMS provider must fully
51   * implement this functionality. This could be done by using the services of a
52   * database that supports XA, or a JMS provider may choose to implement this
53   * functionality from scratch.
54   * <p/>
55   *
56   * @version $Revision: 1.1.1.1 $
57   * @see javax.jms.Session
58   * @see javax.jms.QueueSession
59   * @see javax.jms.TopicSession
60   * @see javax.jms.XASession
61   */
62  public class TransactionContext implements XAResource {
63      
64      private static final Log log = LogFactory.getLog(TransactionContext.class);
65      
66      private final ActiveMQConnection connection;
67      private final ArrayList sessions = new ArrayList(2);
68      private final IdGenerator localTransactionIdGenerator = new IdGenerator();
69  
70      // To track XA transactions.
71      private Xid associatedXid;
72      private ActiveMQXid activeXid;
73  
74      // To track local transactions.
75      private String localTransactionId;
76  
77      private LocalTransactionEventListener localTransactionEventListener;
78      
79      public TransactionContext(ActiveMQConnection connection) {
80          this.connection = connection;        
81      }
82      
83      public boolean isInXATransaction() {
84          return associatedXid!=null;
85      }
86      
87      public boolean isInLocalTransaction() {
88          return localTransactionId!=null;
89      }
90      
91      /**
92       * @return Returns the localTransactionEventListener.
93       */
94      public LocalTransactionEventListener getLocalTransactionEventListener() {
95          return localTransactionEventListener;
96      }
97  
98      /**
99       * Used by the resource adapter to listen to transaction events.
100      * 
101      * @param localTransactionEventListener The localTransactionEventListener to set.
102      */
103     public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
104         this.localTransactionEventListener = localTransactionEventListener;
105     }
106 
107     /////////////////////////////////////////////////////////////
108     //
109     // Methods that interface with the session
110     //
111     /////////////////////////////////////////////////////////////
112     public void addSession(ActiveMQSession session) {
113         sessions.add(session);
114     }
115     public void removeSession(ActiveMQSession session) {
116         sessions.remove(session);
117     }
118     
119     private void postRollback() {
120         int size = sessions.size();        
121         for(int i=0; i < size; i++ ){
122             ((ActiveMQSession)sessions.get(i)).redeliverUnacknowledgedMessages(true);
123         }
124     }
125 
126     private void postCommit() {
127         int size = sessions.size();        
128         for(int i=0; i < size; i++ ){
129             ((ActiveMQSession)sessions.get(i)).clearDeliveredMessages();
130         }
131     }
132 
133     public Object getTransactionId() {
134         if( localTransactionId!=null )
135             return localTransactionId;
136         return activeXid;
137     }
138         
139     /////////////////////////////////////////////////////////////
140     //
141     // Local transaction interface.
142     //
143     /////////////////////////////////////////////////////////////        
144     
145     /**
146      * Start a local transaction.
147      */
148     public void begin() throws JMSException {        
149         if( associatedXid!=null ) 
150             throw new TransactionInProgressException("Cannot start local transction.  XA transaction is allready in progress.");        
151         
152         if( localTransactionId==null ) {         
153             this.localTransactionId = localTransactionIdGenerator.generateId();
154             TransactionInfo info = new TransactionInfo();
155             info.setTransactionId((String)localTransactionId);
156             info.setType(TransactionInfo.START);
157             this.connection.asyncSendPacket(info);
158             
159             // Notify the listener that the tx was started.
160             if (localTransactionEventListener != null) {
161                 localTransactionEventListener.beginEvent();
162             }
163             if( log.isDebugEnabled() )
164                 log.debug("Started local transaction: "+localTransactionId);
165         }
166     }
167     
168     /**
169      * Rolls back any messages done in this transaction and releases any locks currently held.
170      * 
171      * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
172      * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
173      */
174     public void rollback() throws JMSException {
175         if( associatedXid!=null ) 
176             throw new TransactionInProgressException("Cannot rollback() if an XA transaction is allready in progress ");
177         
178         if( localTransactionId!=null ) {            
179             TransactionInfo info = new TransactionInfo();
180             info.setTransactionId((String)localTransactionId);
181             info.setType(TransactionInfo.ROLLBACK);
182             //before we send, update the current transaction id
183             this.localTransactionId = null;
184             this.connection.asyncSendPacket(info);
185             // Notify the listener that the tx was rolled back
186             if (localTransactionEventListener != null) {
187                 localTransactionEventListener.rollbackEvent();
188             }
189             if( log.isDebugEnabled() )
190                 log.debug("Rolledback local transaction: "+localTransactionId);
191         }
192         postRollback();
193     }
194         
195     /**
196      * Commits all messages done in this transaction and releases any locks currently held.
197      * 
198      * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
199      * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
200      * commit.
201      * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
202      */
203     public void commit() throws JMSException {
204         if( associatedXid!=null ) 
205             throw new TransactionInProgressException("Cannot commit() if an XA transaction is allready in progress ");
206 
207         // Only send commit if the transaction was started.
208         if (localTransactionId!=null) {
209             TransactionInfo info = new TransactionInfo();
210             info.setTransactionId((String)localTransactionId);
211             info.setType(TransactionInfo.COMMIT);
212             //before we send, update the current transaction id
213             this.localTransactionId = null;
214             // Notify the listener that the tx was commited back
215             this.connection.syncSendPacket(info);
216             if (localTransactionEventListener != null) {
217                 localTransactionEventListener.commitEvent();
218             }
219             if( log.isDebugEnabled() )
220                 log.debug("Committed local transaction: "+localTransactionId);
221         }
222         postCommit();
223     }
224     
225     /////////////////////////////////////////////////////////////
226     //
227     // XAResource Implementation
228     //
229     /////////////////////////////////////////////////////////////
230     /**
231      * Associates a transaction with the resource.
232      */
233     public void start(Xid xid, int flags) throws XAException {
234         if( localTransactionId!=null ) 
235             throw new XAException(XAException.XAER_PROTO);
236         
237         // Are we allready associated?
238         if (associatedXid != null) {
239             throw new XAException(XAException.XAER_PROTO);
240         }
241 
242         if ((flags & TMJOIN) == TMJOIN) {
243             // TODO: verify that the server has seen the xid
244         }
245         if ((flags & TMJOIN) == TMRESUME) {
246             // TODO: verify that the xid was suspended.
247         }
248 
249         // associate
250         setXid(xid);
251 
252     }
253 
254     public void end(Xid xid, int flags) throws XAException {
255         if( localTransactionId!=null ) 
256             throw new XAException(XAException.XAER_PROTO);
257 
258         if ((flags & TMSUSPEND) == TMSUSPEND) {
259             // You can only suspend the associated xid.
260             if (associatedXid == null || !ActiveMQXid.equals(associatedXid,xid)) {
261                 throw new XAException(XAException.XAER_PROTO);
262             }
263 
264             //TODO: we may want to put the xid in a suspended list.
265             setXid(null);
266         } else if ((flags & TMFAIL) == TMFAIL) {
267             //TODO: We need to rollback the transaction??
268             setXid(null);
269         } else if ((flags & TMSUCCESS) == TMSUCCESS) {
270             //set to null if this is the current xid.
271             //otherwise this could be an asynchronous success call
272             if (ActiveMQXid.equals(associatedXid,xid)) {
273                 setXid(null);
274             }
275         } else {
276             throw new XAException(XAException.XAER_INVAL);
277         }
278         if( log.isDebugEnabled() )
279             log.debug("Ended XA transaction: "+activeXid);
280 
281     }
282 
283     public int prepare(Xid xid) throws XAException {
284 
285         // We allow interleaving multiple transactions, so
286         // we don't limit prepare to the associated xid.
287         ActiveMQXid x;
288         //THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first
289         if (ActiveMQXid.equals(associatedXid,xid)) {
290             throw new XAException(XAException.XAER_PROTO);
291         } else {
292             //TODO cache the known xids so we don't keep recreating this one??
293             x = new ActiveMQXid(xid);
294         }
295 
296         XATransactionInfo info = new XATransactionInfo();
297         info.setXid(x);
298         info.setType(XATransactionInfo.PRE_COMMIT);
299 
300         try {
301             if( log.isDebugEnabled() )
302                 log.debug("Preparing XA transaction: "+x);
303             
304             // Find out if the server wants to commit or rollback.
305             IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
306             return receipt.getResult();
307         } catch (JMSException e) {
308             throw toXAException(e);
309         }
310     }
311 
312     public void rollback(Xid xid) throws XAException {
313 
314         // We allow interleaving multiple transactions, so
315         // we don't limit rollback to the associated xid.
316         ActiveMQXid x;
317         if (ActiveMQXid.equals(associatedXid,xid)) {
318             //I think this can happen even without an end(xid) call.  Need to check spec.
319             x = activeXid;
320         } else {
321             x = new ActiveMQXid(xid);
322         }
323 
324         XATransactionInfo info = new XATransactionInfo();
325         info.setXid(x);
326         info.setType(XATransactionInfo.ROLLBACK);
327 
328         try {
329             if( log.isDebugEnabled() )
330                 log.debug("Rollingback XA transaction: "+x);
331             
332             // Let the server know that the tx is rollback.
333             this.connection.syncSendPacket(info);
334         } catch (JMSException e) {
335             throw toXAException(e);
336         }
337         
338         postRollback();
339     }
340 
341     // XAResource interface
342     public void commit(Xid xid, boolean onePhase) throws XAException {
343 
344         // We allow interleaving multiple transactions, so
345         // we don't limit commit to the associated xid.
346         ActiveMQXid x;
347         if (ActiveMQXid.equals(associatedXid,xid)) {
348             //should never happen, end(xid,TMSUCCESS) must have been previously called
349             throw new XAException(XAException.XAER_PROTO);
350         } else {
351             x = new ActiveMQXid(xid);
352         }
353 
354         XATransactionInfo info = new XATransactionInfo();
355         info.setXid(x);
356         info.setType(onePhase ? XATransactionInfo.COMMIT_ONE_PHASE : XATransactionInfo.COMMIT);
357 
358         try {
359             if( log.isDebugEnabled() )
360                 log.debug("Committing XA transaction: "+x);
361             
362             // Notify the server that the tx was commited back
363             this.connection.syncSendPacket(info);
364         } catch (JMSException e) {
365             throw toXAException(e);
366         }
367 
368         postCommit();
369     }
370 
371 
372     public void forget(Xid xid) throws XAException {
373 
374         // We allow interleaving multiple transactions, so
375         // we don't limit forget to the associated xid.
376         ActiveMQXid x;
377         if (ActiveMQXid.equals(associatedXid,xid)) {
378             //TODO determine if this can happen... I think not.
379             x = activeXid;
380         } else {
381             x = new ActiveMQXid(xid);
382         }
383 
384         XATransactionInfo info = new XATransactionInfo();
385         info.setXid(x);
386         info.setType(XATransactionInfo.FORGET);
387 
388         try {
389             if( log.isDebugEnabled() )
390                 log.debug("Forgetting XA transaction: "+x);
391             
392             // Tell the server to forget the transaction.
393             this.connection.syncSendPacket(info);
394         } catch (JMSException e) {
395             throw toXAException(e);
396         }
397     }
398 
399     public boolean isSameRM(XAResource xaResource) throws XAException {
400         if (xaResource == null) {
401             return false;
402         }
403         if (!(xaResource instanceof TransactionContext)) {
404             return false;
405         }
406         TransactionContext xar = (TransactionContext) xaResource;
407         try {
408             return getResourceManagerId().equals(xar.getResourceManagerId());
409         } catch (Throwable e) {
410             throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
411         }
412     }
413 
414 
415     public Xid[] recover(int flag) throws XAException {
416 
417         XATransactionInfo info = new XATransactionInfo();
418         info.setType(XATransactionInfo.XA_RECOVER);
419 
420         try {
421             ResponseReceipt receipt = (ResponseReceipt) this.connection.syncSendRequest(info);
422             return (ActiveMQXid[]) receipt.getResult();
423         } catch (JMSException e) {
424             throw toXAException(e);
425         }
426     }
427 
428     public int getTransactionTimeout() throws XAException {
429 
430         XATransactionInfo info = new XATransactionInfo();
431         info.setType(XATransactionInfo.GET_TX_TIMEOUT);
432 
433         try {
434             // get the tx timeout that was set.
435             IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
436             return receipt.getResult();
437         } catch (JMSException e) {
438             throw toXAException(e);
439         }
440     }
441 
442     public boolean setTransactionTimeout(int seconds) throws XAException {
443 
444         XATransactionInfo info = new XATransactionInfo();
445         info.setType(XATransactionInfo.SET_TX_TIMEOUT);
446         info.setTransactionTimeout(seconds);
447 
448         try {
449             // Setup the new tx timeout
450             this.connection.asyncSendPacket(info);
451             return true;
452         } catch (JMSException e) {
453             throw toXAException(e);
454         }
455     }
456     
457     /////////////////////////////////////////////////////////////
458     //
459     // Helper methods.
460     //
461     /////////////////////////////////////////////////////////////
462     private String getResourceManagerId() throws JMSException {
463         return this.connection.getResourceManagerId();
464     }
465     
466     private void setXid(Xid xid) throws XAException {
467         if (xid != null) {
468             // associate
469             associatedXid = xid;
470             activeXid = new ActiveMQXid(xid);
471             
472             XATransactionInfo info = new XATransactionInfo();
473             info.setXid(activeXid);
474             info.setType(XATransactionInfo.START);
475             try {
476                 this.connection.asyncSendPacket(info);
477                 if( log.isDebugEnabled() )
478                     log.debug("Started XA transaction: "+activeXid);
479             } catch (JMSException e) {
480                 throw toXAException(e);
481             }
482             
483         } else {
484             
485             if( activeXid!=null ) {
486                 XATransactionInfo info = new XATransactionInfo();
487                 info.setXid(activeXid);
488                 info.setType(XATransactionInfo.END);
489                 try {
490                     this.connection.syncSendPacket(info);
491                     if( log.isDebugEnabled() )
492                         log.debug("Ended XA transaction: "+activeXid);
493                 } catch (JMSException e) {
494                     throw toXAException(e);
495                 }
496             }
497             
498             // dis-associate
499             associatedXid = null;
500             activeXid = null;
501         }
502     }
503 
504     /**
505      * Converts a JMSException from the server to an XAException.
506      * if the JMSException contained a linked XAException that is
507      * returned instead.
508      *
509      * @param e
510      * @return
511      */
512     private XAException toXAException(JMSException e) {
513         if (e.getCause() != null && e.getCause() instanceof XAException) {
514             XAException original = (XAException) e.getCause();
515             XAException xae = new XAException(original.getMessage());
516             xae.errorCode = original.errorCode;
517             xae.initCause(original);
518             return xae;
519         }
520 
521         XAException xae = new XAException(e.getMessage());
522         xae.errorCode = XAException.XAER_RMFAIL;
523         xae.initCause(e);
524         return xae;
525     }
526 
527     public ActiveMQConnection getConnection() {
528         return connection;
529     }
530 
531 }