Source code: org/apache/geronimo/transaction/manager/TransactionImpl.java
1 /**
2 *
3 * Copyright 2003-2004 The Apache Software Foundation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.geronimo.transaction.manager;
19
20 import java.util.ArrayList;
21 import java.util.IdentityHashMap;
22 import java.util.Iterator;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import javax.transaction.HeuristicMixedException;
28 import javax.transaction.HeuristicRollbackException;
29 import javax.transaction.RollbackException;
30 import javax.transaction.Status;
31 import javax.transaction.Synchronization;
32 import javax.transaction.SystemException;
33 import javax.transaction.Transaction;
34 import javax.transaction.xa.XAException;
35 import javax.transaction.xa.XAResource;
36 import javax.transaction.xa.Xid;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40
41 /**
42 * Basic local transaction with support for multiple resources.
43 *
44 * @version $Rev: 128441 $ $Date: 2005-01-27 14:56:21 -0800 (Thu, 27 Jan 2005) $
45 */
46 public class TransactionImpl implements Transaction {
47 private static final Log log = LogFactory.getLog("Transaction");
48
49 private final XidFactory xidFactory;
50 private final Xid xid;
51 private final TransactionLog txnLog;
52 private final long timeout;
53 private final List syncList = new ArrayList(5);
54 private final LinkedList resourceManagers = new LinkedList();
55 private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
56 private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
57 private int status = Status.STATUS_NO_TRANSACTION;
58 private Object logMark;
59
60 TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
61 this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
62 }
63
64 TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
65 this.xidFactory = xidFactory;
66 this.txnLog = txnLog;
67 this.xid = xid;
68 this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
69 try {
70 txnLog.begin(xid);
71 } catch (LogException e) {
72 status = Status.STATUS_MARKED_ROLLBACK;
73 SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
74 ex.initCause(e);
75 throw ex;
76 }
77 status = Status.STATUS_ACTIVE;
78 }
79
80 //reconstruct a tx for an external tx found in recovery
81 public TransactionImpl(Xid xid, TransactionLog txLog) {
82 this.xidFactory = null;
83 this.txnLog = txLog;
84 this.xid = xid;
85 status = Status.STATUS_PREPARED;
86 //TODO is this a good idea?
87 this.timeout = Long.MAX_VALUE;
88 }
89
90 public synchronized int getStatus() throws SystemException {
91 return status;
92 }
93
94 public synchronized void setRollbackOnly() throws IllegalStateException, SystemException {
95 switch (status) {
96 case Status.STATUS_ACTIVE:
97 case Status.STATUS_PREPARING:
98 status = Status.STATUS_MARKED_ROLLBACK;
99 break;
100 case Status.STATUS_MARKED_ROLLBACK:
101 case Status.STATUS_ROLLING_BACK:
102 // nothing to do
103 break;
104 default:
105 throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
106 }
107 }
108
109 public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
110 if (synch == null) {
111 throw new IllegalArgumentException("Synchronization is null");
112 }
113 switch (status) {
114 case Status.STATUS_ACTIVE:
115 case Status.STATUS_PREPARING:
116 break;
117 case Status.STATUS_MARKED_ROLLBACK:
118 throw new RollbackException("Transaction is marked for rollback");
119 default:
120 throw new IllegalStateException("Status is " + getStateString(status));
121 }
122 syncList.add(synch);
123 }
124
125 public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
126 if (xaRes == null) {
127 throw new IllegalArgumentException("XAResource is null");
128 }
129 switch (status) {
130 case Status.STATUS_ACTIVE:
131 break;
132 case Status.STATUS_MARKED_ROLLBACK:
133 throw new RollbackException("Transaction is marked for rollback");
134 default:
135 throw new IllegalStateException("Status is " + getStateString(status));
136 }
137
138 if (activeXaResources.containsKey(xaRes)) {
139 throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
140 }
141
142 try {
143 TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
144 if (manager != null) {
145 //we know about this one, it was suspended
146 xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
147 activeXaResources.put(xaRes, manager);
148 return true;
149 }
150 //it is not suspended.
151 for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
152 manager = (TransactionBranch) i.next();
153 boolean sameRM;
154 //if the xares is already known, we must be resuming after a suspend.
155 if (xaRes == manager.getCommitter()) {
156 throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
157 }
158 //Otherwise, see if this is a new xares for the same resource manager
159 try {
160 sameRM = xaRes.isSameRM(manager.getCommitter());
161 } catch (XAException e) {
162 log.warn("Unexpected error checking for same RM", e);
163 continue;
164 }
165 if (sameRM) {
166 xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
167 activeXaResources.put(xaRes, manager);
168 return true;
169 }
170 }
171 //we know nothing about this XAResource or resource manager
172 Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
173 xaRes.start(branchId, XAResource.TMNOFLAGS);
174 activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
175 return true;
176 } catch (XAException e) {
177 log.warn("Unable to enlist XAResource " + xaRes, e);
178 return false;
179 }
180 }
181
182 public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
183 if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
184 throw new IllegalStateException("invalid flag for delistResource: " + flag);
185 }
186 if (xaRes == null) {
187 throw new IllegalArgumentException("XAResource is null");
188 }
189 switch (status) {
190 case Status.STATUS_ACTIVE:
191 case Status.STATUS_MARKED_ROLLBACK:
192 break;
193 default:
194 throw new IllegalStateException("Status is " + getStateString(status));
195 }
196 TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
197 if (manager == null) {
198 if (flag == XAResource.TMSUSPEND) {
199 throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
200 }
201 //not active, and we are not trying to suspend. We must be ending tx.
202 manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
203 if (manager == null) {
204 throw new IllegalStateException("Resource not known to transaction: " + xaRes);
205 }
206 }
207
208 try {
209 xaRes.end(manager.getBranchId(), flag);
210 if (flag == XAResource.TMSUSPEND) {
211 suspendedXaResources.put(xaRes, manager);
212 }
213 return true;
214 } catch (XAException e) {
215 log.warn("Unable to delist XAResource " + xaRes, e);
216 return false;
217 }
218 }
219
220 //Transaction method, does 2pc
221 public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
222 beforePrepare();
223
224 try {
225 boolean timedout = false;
226 if (TransactionTimer.getCurrentTime() > timeout)
227 {
228 status = Status.STATUS_MARKED_ROLLBACK;
229 timedout = true;
230 }
231
232 if (status == Status.STATUS_MARKED_ROLLBACK) {
233 rollbackResources(resourceManagers);
234 if(timedout)
235 {
236 throw new RollbackException("Transaction timout");
237 }
238 else
239 {
240 throw new RollbackException("Unable to commit");
241 }
242 }
243 synchronized (this) {
244 if (status == Status.STATUS_ACTIVE) {
245 if (this.resourceManagers.size() == 0) {
246 // nothing to commit
247 status = Status.STATUS_COMMITTED;
248 } else if (this.resourceManagers.size() == 1) {
249 // one-phase commit decision
250 status = Status.STATUS_COMMITTING;
251 } else {
252 // start prepare part of two-phase
253 status = Status.STATUS_PREPARING;
254 }
255 }
256 // resourceManagers is now immutable
257 }
258
259
260 // no-phase
261 if (resourceManagers.size() == 0) {
262 synchronized (this) {
263 status = Status.STATUS_COMMITTED;
264 }
265 return;
266 }
267
268 // one-phase
269 if (resourceManagers.size() == 1) {
270 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
271 try {
272 manager.getCommitter().commit(manager.getBranchId(), true);
273 synchronized (this) {
274 status = Status.STATUS_COMMITTED;
275 }
276 return;
277 } catch (XAException e) {
278 synchronized (this) {
279 status = Status.STATUS_ROLLEDBACK;
280 }
281 throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
282 }
283 }
284
285 // two-phase
286 boolean willCommit = internalPrepare();
287
288 // notify the RMs
289 if (willCommit) {
290 commitResources(resourceManagers);
291 } else {
292 rollbackResources(resourceManagers);
293 throw new RollbackException("Unable to commit");
294 }
295 } finally {
296 afterCompletion();
297 synchronized (this) {
298 status = Status.STATUS_NO_TRANSACTION;
299 }
300 }
301 }
302
303 //Used from XATerminator for first phase in a remotely controlled tx.
304 int prepare() throws SystemException, RollbackException {
305 beforePrepare();
306 int result = XAResource.XA_RDONLY;
307 try {
308 LinkedList rms;
309 synchronized (this) {
310 if (status == Status.STATUS_ACTIVE) {
311 if (resourceManagers.size() == 0) {
312 // nothing to commit
313 status = Status.STATUS_COMMITTED;
314 return result;
315 } else {
316 // start prepare part of two-phase
317 status = Status.STATUS_PREPARING;
318 }
319 }
320 // resourceManagers is now immutable
321 rms = resourceManagers;
322 }
323
324 boolean willCommit = internalPrepare();
325
326 // notify the RMs
327 if (willCommit) {
328 if (!rms.isEmpty()) {
329 result = XAResource.XA_OK;
330 }
331 } else {
332 rollbackResources(rms);
333 throw new RollbackException("Unable to commit");
334 }
335 } finally {
336 if (result == XAResource.XA_RDONLY) {
337 afterCompletion();
338 synchronized (this) {
339 status = Status.STATUS_NO_TRANSACTION;
340 }
341 }
342 }
343 return result;
344 }
345
346 //used from XATerminator for commit phase of non-readonly remotely controlled tx.
347 void preparedCommit() throws SystemException {
348 try {
349 commitResources(resourceManagers);
350 } finally {
351 afterCompletion();
352 synchronized (this) {
353 status = Status.STATUS_NO_TRANSACTION;
354 }
355 }
356 }
357
358 //helper method used by Transaction.commit and XATerminator prepare.
359 private void beforePrepare() {
360 synchronized (this) {
361 switch (status) {
362 case Status.STATUS_ACTIVE:
363 case Status.STATUS_MARKED_ROLLBACK:
364 break;
365 default:
366 throw new IllegalStateException("Status is " + getStateString(status));
367 }
368 }
369
370 beforeCompletion();
371 endResources();
372 }
373
374
375 //helper method used by Transaction.commit and XATerminator prepare.
376 private boolean internalPrepare() throws SystemException {
377
378 for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
379 synchronized (this) {
380 if (status != Status.STATUS_PREPARING) {
381 // we were marked for rollback
382 break;
383 }
384 }
385 TransactionBranch manager = (TransactionBranch) rms.next();
386 try {
387 int vote = manager.getCommitter().prepare(manager.getBranchId());
388 if (vote == XAResource.XA_RDONLY) {
389 // we don't need to consider this RM any more
390 rms.remove();
391 }
392 } catch (XAException e) {
393 synchronized (this) {
394 status = Status.STATUS_MARKED_ROLLBACK;
395 //TODO document why this is true from the spec.
396 //XAException during prepare means we can assume resource is rolled back.
397 rms.remove();
398 break;
399 }
400 }
401 }
402
403
404 // decision time...
405 boolean willCommit;
406 synchronized (this) {
407 willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
408 if (willCommit) {
409 status = Status.STATUS_PREPARED;
410 }
411 }
412 // log our decision
413 if (willCommit && !resourceManagers.isEmpty()) {
414 try {
415 logMark = txnLog.prepare(xid, resourceManagers);
416 } catch (LogException e) {
417 try {
418 rollbackResources(resourceManagers);
419 } catch (Exception se) {
420 log.error("Unable to rollback after failure to log prepare", se.getCause());
421 }
422 throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
423 }
424 }
425 return willCommit;
426 }
427
428 public void rollback() throws IllegalStateException, SystemException {
429 List rms;
430 synchronized (this) {
431 switch (status) {
432 case Status.STATUS_ACTIVE:
433 status = Status.STATUS_MARKED_ROLLBACK;
434 break;
435 case Status.STATUS_MARKED_ROLLBACK:
436 break;
437 default:
438 throw new IllegalStateException("Status is " + getStateString(status));
439 }
440 rms = resourceManagers;
441 }
442
443 beforeCompletion();
444 endResources();
445 try {
446 rollbackResources(rms);
447 //only write rollback record if we have already written prepare record.
448 if (logMark != null) {
449 try {
450 txnLog.rollback(xid, logMark);
451 } catch (LogException e) {
452 try {
453 rollbackResources(rms);
454 } catch (Exception se) {
455 log.error("Unable to rollback after failure to log decision", se.getCause());
456 }
457 throw (SystemException) new SystemException("Error logging rollback").initCause(e);
458 }
459 }
460 } finally {
461 afterCompletion();
462 synchronized (this) {
463 status = Status.STATUS_NO_TRANSACTION;
464 }
465 }
466 }
467
468 private void beforeCompletion() {
469 int i = 0;
470 while (true) {
471 Synchronization synch;
472 synchronized (this) {
473 if (i == syncList.size()) {
474 return;
475 }
476 synch = (Synchronization) syncList.get(i++);
477 }
478 try {
479 synch.beforeCompletion();
480 } catch (Exception e) {
481 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
482 synchronized (this) {
483 status = Status.STATUS_MARKED_ROLLBACK;
484 }
485 }
486 }
487 }
488
489 private void afterCompletion() {
490 // this does not synchronize because nothing can modify our state at this time
491 for (Iterator i = syncList.iterator(); i.hasNext();) {
492 Synchronization synch = (Synchronization) i.next();
493 try {
494 synch.afterCompletion(status);
495 } catch (Exception e) {
496 log.warn("Unexpected exception from afterCompletion; continuing", e);
497 continue;
498 }
499 }
500 }
501
502 private void endResources() {
503 endResources(activeXaResources);
504 endResources(suspendedXaResources);
505 }
506
507 private void endResources(IdentityHashMap resourceMap) {
508 while (true) {
509 XAResource xaRes;
510 TransactionBranch manager;
511 int flags;
512 synchronized (this) {
513 Set entrySet = resourceMap.entrySet();
514 if (entrySet.isEmpty()) {
515 return;
516 }
517 Map.Entry entry = (Map.Entry) entrySet.iterator().next();
518 xaRes = (XAResource) entry.getKey();
519 manager = (TransactionBranch) entry.getValue();
520 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
521 resourceMap.remove(xaRes);
522 }
523 try {
524 xaRes.end(manager.getBranchId(), flags);
525 } catch (XAException e) {
526 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back", e);
527 synchronized (this) {
528 status = Status.STATUS_MARKED_ROLLBACK;
529 }
530 }
531 }
532 }
533
534 private void rollbackResources(List rms) throws SystemException {
535 SystemException cause = null;
536 synchronized (this) {
537 status = Status.STATUS_ROLLING_BACK;
538 }
539 for (Iterator i = rms.iterator(); i.hasNext();) {
540 TransactionBranch manager = (TransactionBranch) i.next();
541 try {
542 manager.getCommitter().rollback(manager.getBranchId());
543 } catch (XAException e) {
544 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
545 if (cause == null) {
546 cause = new SystemException(e.errorCode);
547 }
548 continue;
549 }
550 }
551 synchronized (this) {
552 status = Status.STATUS_ROLLEDBACK;
553 }
554 if (cause != null) {
555 throw cause;
556 }
557 }
558
559 private void commitResources(List rms) throws SystemException {
560 SystemException cause = null;
561 synchronized (this) {
562 status = Status.STATUS_COMMITTING;
563 }
564 for (Iterator i = rms.iterator(); i.hasNext();) {
565 TransactionBranch manager = (TransactionBranch) i.next();
566 try {
567 manager.getCommitter().commit(manager.getBranchId(), false);
568 } catch (XAException e) {
569 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
570 if (cause == null) {
571 cause = new SystemException(e.errorCode);
572 }
573 continue;
574 }
575 }
576 try {
577 txnLog.commit(xid, logMark);
578 } catch (LogException e) {
579 log.error("Unexpected exception logging commit completion for xid " + xid, e);
580 throw (SystemException)new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
581 }
582 synchronized (this) {
583 status = Status.STATUS_COMMITTED;
584 }
585 if (cause != null) {
586 throw cause;
587 }
588 }
589
590 private static String getStateString(int status) {
591 switch (status) {
592 case Status.STATUS_ACTIVE:
593 return "STATUS_ACTIVE";
594 case Status.STATUS_PREPARING:
595 return "STATUS_PREPARING";
596 case Status.STATUS_PREPARED:
597 return "STATUS_PREPARED";
598 case Status.STATUS_MARKED_ROLLBACK:
599 return "STATUS_MARKED_ROLLBACK";
600 case Status.STATUS_ROLLING_BACK:
601 return "STATUS_ROLLING_BACK";
602 case Status.STATUS_COMMITTING:
603 return "STATUS_COMMITTING";
604 case Status.STATUS_COMMITTED:
605 return "STATUS_COMMITTED";
606 case Status.STATUS_ROLLEDBACK:
607 return "STATUS_ROLLEDBACK";
608 case Status.STATUS_NO_TRANSACTION:
609 return "STATUS_NO_TRANSACTION";
610 case Status.STATUS_UNKNOWN:
611 return "STATUS_UNKNOWN";
612 default:
613 throw new AssertionError();
614 }
615 }
616
617 public boolean equals(Object obj) {
618 if (obj instanceof TransactionImpl) {
619 TransactionImpl other = (TransactionImpl) obj;
620 return xid.equals(other.xid);
621 } else {
622 return false;
623 }
624 }
625
626 //when used from recovery, do not add manager to active or suspended resource maps.
627 // The xaresources have already been ended with TMSUCCESS.
628 public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
629 TransactionBranch manager = new TransactionBranch(xaRes, branchId);
630 resourceManagers.add(manager);
631 return manager;
632 }
633
634 private static class TransactionBranch implements TransactionBranchInfo {
635 private final NamedXAResource committer;
636 private final Xid branchId;
637
638 public TransactionBranch(XAResource xaRes, Xid branchId) {
639 committer = (NamedXAResource)xaRes;
640 this.branchId = branchId;
641 }
642
643 public NamedXAResource getCommitter() {
644 return committer;
645 }
646
647 public Xid getBranchId() {
648 return branchId;
649 }
650
651 public String getResourceName() {
652 return committer.getName();
653 }
654
655 public Xid getBranchXid() {
656 return branchId;
657 }
658 }
659
660
661 }