Source code: org/activemq/service/impl/XATransactionCommand.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.service.impl;
19
20 import java.util.Map;
21
22 import javax.transaction.xa.XAException;
23 import javax.transaction.xa.XAResource;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.activemq.message.ActiveMQXid;
28 import org.activemq.store.TransactionStore;
29
30 /**
31 * @version $Revision: 1.1.1.1 $
32 */
33 public class XATransactionCommand extends AbstractTransaction {
34 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
35
36 private ActiveMQXid xid;
37 private transient Map xaTxs;
38 private transient TransactionStore transactionStore;
39
40 public XATransactionCommand(ActiveMQXid xid, Map xaTxs, TransactionStore transactionStore) {
41 this.xid = xid;
42 this.xaTxs = xaTxs;
43 this.transactionStore = transactionStore;
44 }
45
46
47 /**
48 * Called after the transaction command has been recovered from disk
49 *
50 * @param xaTxs
51 * @param preparedTransactions
52 */
53 public void initialise(Map xaTxs, TransactionStore preparedTransactions) {
54 this.xaTxs = xaTxs;
55 this.transactionStore = preparedTransactions;
56 }
57
58 public void commit(boolean onePhase) throws XAException {
59 if(log.isDebugEnabled())
60 log.debug("XA Transaction commit: "+xid);
61
62 switch (getState()) {
63 case START_STATE:
64 // 1 phase commit, no work done.
65 checkForPreparedState(onePhase);
66 setStateFinished();
67 break;
68 case IN_USE_STATE:
69 // 1 phase commit, work done.
70 checkForPreparedState(onePhase);
71 doPrePrepare();
72 setStateFinished();
73 transactionStore.commit(getTransactionId(), false);
74 doPostCommit();
75 break;
76 case PREPARED_STATE:
77 // 2 phase commit, work done.
78 // We would record commit here.
79 setStateFinished();
80 transactionStore.commit(getTransactionId(), true);
81 doPostCommit();
82 break;
83 default:
84 illegalStateTransition("commit");
85 }
86 }
87
88 private void illegalStateTransition(String callName) throws XAException {
89 XAException xae = new XAException("Cannot call " + callName + " now.");
90 xae.errorCode = XAException.XAER_PROTO;
91 throw xae;
92 }
93
94 private void checkForPreparedState(boolean onePhase) throws XAException {
95 if (!onePhase) {
96 XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
97 xae.errorCode = XAException.XAER_PROTO;
98 throw xae;
99 }
100 }
101
102 private void doPrePrepare() throws XAException {
103 try {
104 prePrepare();
105 } catch (XAException e) {
106 throw e;
107 } catch (Throwable e) {
108 log.warn("PRE-PREPARE FAILED: ", e);
109 rollback();
110 XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
111 xae.errorCode = XAException.XA_RBOTHER;
112 xae.initCause(e);
113 throw xae;
114 }
115 }
116
117 private void doPostCommit() throws XAException {
118 try {
119 postCommit();
120 }
121 catch (Throwable e) {
122 // I guess this could happen. Post commit task failed
123 // to execute properly.
124 log.warn("POST COMMIT FAILED: ", e);
125 XAException xae = new XAException("POST COMMIT FAILED");
126 xae.errorCode = XAException.XAER_RMERR;
127 xae.initCause(e);
128 throw xae;
129 }
130 }
131
132 public void rollback() throws XAException {
133
134 if(log.isDebugEnabled())
135 log.debug("XA Transaction rollback: "+xid);
136
137 switch (getState()) {
138 case START_STATE:
139 // 1 phase rollback no work done.
140 setStateFinished();
141 break;
142 case IN_USE_STATE:
143 // 1 phase rollback work done.
144 setStateFinished();
145 transactionStore.rollback(getTransactionId());
146 doPostRollback();
147 break;
148 case PREPARED_STATE:
149 // 2 phase rollback work done.
150 setStateFinished();
151 transactionStore.rollback(getTransactionId());
152 doPostRollback();
153 break;
154 }
155
156 }
157
158 private void doPostRollback() throws XAException {
159 try {
160 postRollback();
161 }
162 catch (Throwable e) {
163 // I guess this could happen. Post commit task failed
164 // to execute properly.
165 log.warn("POST ROLLBACK FAILED: ", e);
166 XAException xae = new XAException("POST ROLLBACK FAILED");
167 xae.errorCode = XAException.XAER_RMERR;
168 xae.initCause(e);
169 throw xae;
170 }
171 }
172
173 public int prepare() throws XAException {
174 if(log.isDebugEnabled())
175 log.debug("XA Transaction prepare: "+xid);
176
177 switch (getState()) {
178 case START_STATE:
179 // No work done.. no commit/rollback needed.
180 setStateFinished();
181 return XAResource.XA_RDONLY;
182 case IN_USE_STATE:
183 // We would record prepare here.
184 doPrePrepare();
185 setState(AbstractTransaction.PREPARED_STATE);
186 transactionStore.prepare(getTransactionId());
187 return XAResource.XA_OK;
188 default :
189 illegalStateTransition("prepare");
190 return XAResource.XA_RDONLY;
191 }
192 }
193
194 private void setStateFinished() {
195 setState(AbstractTransaction.FINISHED_STATE);
196 xaTxs.remove(xid);
197 }
198
199 public boolean isXaTransacted() {
200 return true;
201 }
202
203 public Object getTransactionId() {
204 return xid;
205 }
206 }