Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/service/impl/XATransactionCommand.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.service.impl;
19  
20  import java.util.Map;
21  
22  import javax.transaction.xa.XAException;
23  import javax.transaction.xa.XAResource;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.activemq.message.ActiveMQXid;
28  import org.activemq.store.TransactionStore;
29  
30  /**
31   * @version $Revision: 1.1.1.1 $
32   */
33  public class XATransactionCommand extends AbstractTransaction {
34      private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
35  
36      private ActiveMQXid xid;
37      private transient Map xaTxs;
38      private transient TransactionStore transactionStore;
39  
40      public XATransactionCommand(ActiveMQXid xid, Map xaTxs, TransactionStore transactionStore) {
41          this.xid = xid;
42          this.xaTxs = xaTxs;
43          this.transactionStore = transactionStore;
44      }
45  
46  
47      /**
48       * Called after the transaction command has been recovered from disk
49       *
50       * @param xaTxs
51       * @param preparedTransactions
52       */
53      public void initialise(Map xaTxs, TransactionStore preparedTransactions) {
54          this.xaTxs = xaTxs;
55          this.transactionStore = preparedTransactions;
56      }
57  
58      public void commit(boolean onePhase) throws XAException {
59        if(log.isDebugEnabled())
60          log.debug("XA Transaction commit: "+xid);
61  
62          switch (getState()) {
63              case START_STATE:
64                  // 1 phase commit, no work done.
65                  checkForPreparedState(onePhase);
66                  setStateFinished();
67                  break;
68              case IN_USE_STATE:
69                  // 1 phase commit, work done.
70                  checkForPreparedState(onePhase);
71                  doPrePrepare();
72                  setStateFinished();
73                  transactionStore.commit(getTransactionId(), false);
74                  doPostCommit();
75                  break;
76              case PREPARED_STATE:
77                  // 2 phase commit, work done.
78                  // We would record commit here.
79                  setStateFinished();
80                  transactionStore.commit(getTransactionId(), true);
81                  doPostCommit();
82                  break;
83              default:
84                  illegalStateTransition("commit");
85          }
86      }
87  
88      private void illegalStateTransition(String callName) throws XAException {
89          XAException xae = new XAException("Cannot call " + callName + " now.");
90          xae.errorCode = XAException.XAER_PROTO;
91          throw xae;
92      }
93  
94      private void checkForPreparedState(boolean onePhase) throws XAException {
95          if (!onePhase) {
96              XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
97              xae.errorCode = XAException.XAER_PROTO;
98              throw xae;
99          }
100     }
101 
102     private void doPrePrepare() throws XAException {
103         try {
104             prePrepare();
105         } catch (XAException e) {
106             throw e;
107         } catch (Throwable e) {
108             log.warn("PRE-PREPARE FAILED: ", e);
109             rollback();
110             XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
111             xae.errorCode = XAException.XA_RBOTHER;
112             xae.initCause(e);
113             throw xae;
114         }
115     }
116 
117     private void doPostCommit() throws XAException {
118         try {
119             postCommit();
120         }
121         catch (Throwable e) {
122             // I guess this could happen.  Post commit task failed
123             // to execute properly.
124             log.warn("POST COMMIT FAILED: ", e);
125             XAException xae = new XAException("POST COMMIT FAILED");
126             xae.errorCode = XAException.XAER_RMERR;
127             xae.initCause(e);
128             throw xae;
129         }
130     }
131 
132     public void rollback() throws XAException {
133       
134       if(log.isDebugEnabled())
135         log.debug("XA Transaction rollback: "+xid);
136 
137         switch (getState()) {
138             case START_STATE:
139                 // 1 phase rollback no work done.
140                 setStateFinished();
141                 break;
142             case IN_USE_STATE:
143                 // 1 phase rollback work done.
144                 setStateFinished();
145                 transactionStore.rollback(getTransactionId());
146                 doPostRollback();
147                 break;
148             case PREPARED_STATE:
149                 // 2 phase rollback work done.
150                 setStateFinished();
151                 transactionStore.rollback(getTransactionId());
152                 doPostRollback();
153                 break;
154         }
155 
156     }
157 
158     private void doPostRollback() throws XAException {
159         try {
160             postRollback();
161         }
162         catch (Throwable e) {
163             // I guess this could happen.  Post commit task failed
164             // to execute properly.
165             log.warn("POST ROLLBACK FAILED: ", e);
166             XAException xae = new XAException("POST ROLLBACK FAILED");
167             xae.errorCode = XAException.XAER_RMERR;
168             xae.initCause(e);
169             throw xae;
170         }
171     }
172 
173     public int prepare() throws XAException {
174       if(log.isDebugEnabled())
175         log.debug("XA Transaction prepare: "+xid);
176       
177         switch (getState()) {
178             case START_STATE:
179                 // No work done.. no commit/rollback needed.
180                 setStateFinished();
181                 return XAResource.XA_RDONLY;
182             case IN_USE_STATE:
183                 // We would record prepare here.
184                 doPrePrepare();
185                 setState(AbstractTransaction.PREPARED_STATE);
186                 transactionStore.prepare(getTransactionId());
187                 return XAResource.XA_OK;
188             default :
189                 illegalStateTransition("prepare");
190                 return XAResource.XA_RDONLY;
191         }
192     }
193 
194     private void setStateFinished() {
195         setState(AbstractTransaction.FINISHED_STATE);
196         xaTxs.remove(xid);
197     }
198 
199     public boolean isXaTransacted() {
200         return true;
201     }
202 
203     public Object getTransactionId() {
204         return xid;
205     }
206 }