Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/apache/geronimo/transaction/manager/TransactionImpl.java


1   /**
2    *
3    * Copyright 2003-2004 The Apache Software Foundation
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  
18  package org.apache.geronimo.transaction.manager;
19  
20  import java.util.ArrayList;
21  import java.util.IdentityHashMap;
22  import java.util.Iterator;
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import javax.transaction.HeuristicMixedException;
28  import javax.transaction.HeuristicRollbackException;
29  import javax.transaction.RollbackException;
30  import javax.transaction.Status;
31  import javax.transaction.Synchronization;
32  import javax.transaction.SystemException;
33  import javax.transaction.Transaction;
34  import javax.transaction.xa.XAException;
35  import javax.transaction.xa.XAResource;
36  import javax.transaction.xa.Xid;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  
41  /**
42   * Basic local transaction with support for multiple resources.
43   *
44   * @version $Rev: 128441 $ $Date: 2005-01-27 14:56:21 -0800 (Thu, 27 Jan 2005) $
45   */
46  public class TransactionImpl implements Transaction {
47      private static final Log log = LogFactory.getLog("Transaction");
48  
49      private final XidFactory xidFactory;
50      private final Xid xid;
51      private final TransactionLog txnLog;
52      private final long timeout;
53      private final List syncList = new ArrayList(5);
54      private final LinkedList resourceManagers = new LinkedList();
55      private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
56      private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
57      private int status = Status.STATUS_NO_TRANSACTION;
58      private Object logMark;
59  
60      TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
61          this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
62      }
63  
64      TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
65          this.xidFactory = xidFactory;
66          this.txnLog = txnLog;
67          this.xid = xid;
68          this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
69          try {
70              txnLog.begin(xid);
71          } catch (LogException e) {
72              status = Status.STATUS_MARKED_ROLLBACK;
73              SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
74              ex.initCause(e);
75              throw ex;
76          }
77          status = Status.STATUS_ACTIVE;
78      }
79  
80      //reconstruct a tx for an external tx found in recovery
81      public TransactionImpl(Xid xid, TransactionLog txLog) {
82          this.xidFactory = null;
83          this.txnLog = txLog;
84          this.xid = xid;
85          status = Status.STATUS_PREPARED;
86          //TODO is this a good idea?
87          this.timeout = Long.MAX_VALUE;
88      }
89  
90      public synchronized int getStatus() throws SystemException {
91          return status;
92      }
93  
94      public synchronized void setRollbackOnly() throws IllegalStateException, SystemException {
95          switch (status) {
96              case Status.STATUS_ACTIVE:
97              case Status.STATUS_PREPARING:
98                  status = Status.STATUS_MARKED_ROLLBACK;
99                  break;
100             case Status.STATUS_MARKED_ROLLBACK:
101             case Status.STATUS_ROLLING_BACK:
102                 // nothing to do
103                 break;
104             default:
105                 throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
106         }
107     }
108 
109     public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
110         if (synch == null) {
111             throw new IllegalArgumentException("Synchronization is null");
112         }
113         switch (status) {
114             case Status.STATUS_ACTIVE:
115             case Status.STATUS_PREPARING:
116                 break;
117             case Status.STATUS_MARKED_ROLLBACK:
118                 throw new RollbackException("Transaction is marked for rollback");
119             default:
120                 throw new IllegalStateException("Status is " + getStateString(status));
121         }
122         syncList.add(synch);
123     }
124 
125     public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
126         if (xaRes == null) {
127             throw new IllegalArgumentException("XAResource is null");
128         }
129         switch (status) {
130             case Status.STATUS_ACTIVE:
131                 break;
132             case Status.STATUS_MARKED_ROLLBACK:
133                 throw new RollbackException("Transaction is marked for rollback");
134             default:
135                 throw new IllegalStateException("Status is " + getStateString(status));
136         }
137 
138         if (activeXaResources.containsKey(xaRes)) {
139             throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
140         }
141 
142         try {
143             TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
144             if (manager != null) {
145                 //we know about this one, it was suspended
146                 xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
147                 activeXaResources.put(xaRes, manager);
148                 return true;
149             }
150             //it is not suspended.
151             for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
152                 manager = (TransactionBranch) i.next();
153                 boolean sameRM;
154                 //if the xares is already known, we must be resuming after a suspend.
155                 if (xaRes == manager.getCommitter()) {
156                     throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
157                 }
158                 //Otherwise, see if this is a new xares for the same resource manager
159                 try {
160                     sameRM = xaRes.isSameRM(manager.getCommitter());
161                 } catch (XAException e) {
162                     log.warn("Unexpected error checking for same RM", e);
163                     continue;
164                 }
165                 if (sameRM) {
166                     xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
167                     activeXaResources.put(xaRes, manager);
168                     return true;
169                 }
170             }
171             //we know nothing about this XAResource or resource manager
172             Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
173             xaRes.start(branchId, XAResource.TMNOFLAGS);
174             activeXaResources.put(xaRes, addBranchXid(xaRes,  branchId));
175             return true;
176         } catch (XAException e) {
177             log.warn("Unable to enlist XAResource " + xaRes, e);
178             return false;
179         }
180     }
181 
182     public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
183         if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
184             throw new IllegalStateException("invalid flag for delistResource: " + flag);
185         }
186         if (xaRes == null) {
187             throw new IllegalArgumentException("XAResource is null");
188         }
189         switch (status) {
190             case Status.STATUS_ACTIVE:
191             case Status.STATUS_MARKED_ROLLBACK:
192                 break;
193             default:
194                 throw new IllegalStateException("Status is " + getStateString(status));
195         }
196         TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
197         if (manager == null) {
198             if (flag == XAResource.TMSUSPEND) {
199                 throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
200             }
201             //not active, and we are not trying to suspend.  We must be ending tx.
202             manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
203             if (manager == null) {
204                 throw new IllegalStateException("Resource not known to transaction: " + xaRes);
205             }
206         }
207 
208         try {
209             xaRes.end(manager.getBranchId(), flag);
210             if (flag == XAResource.TMSUSPEND) {
211                 suspendedXaResources.put(xaRes, manager);
212             }
213             return true;
214         } catch (XAException e) {
215             log.warn("Unable to delist XAResource " + xaRes, e);
216             return false;
217         }
218     }
219 
220     //Transaction method, does 2pc
221     public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
222         beforePrepare();
223 
224         try {
225                       boolean timedout = false;
226                       if (TransactionTimer.getCurrentTime() > timeout)
227                       {
228                           status = Status.STATUS_MARKED_ROLLBACK;
229                           timedout = true;
230                       }
231 
232             if (status == Status.STATUS_MARKED_ROLLBACK) {
233                 rollbackResources(resourceManagers);
234                               if(timedout)
235                               {
236                                   throw new RollbackException("Transaction timout");
237                               }
238                               else
239                               {
240                                   throw new RollbackException("Unable to commit");
241                               }
242             }
243             synchronized (this) {
244                 if (status == Status.STATUS_ACTIVE) {
245                     if (this.resourceManagers.size() == 0) {
246                         // nothing to commit
247                         status = Status.STATUS_COMMITTED;
248                     } else if (this.resourceManagers.size() == 1) {
249                         // one-phase commit decision
250                         status = Status.STATUS_COMMITTING;
251                     } else {
252                         // start prepare part of two-phase
253                         status = Status.STATUS_PREPARING;
254                     }
255                 }
256                 // resourceManagers is now immutable
257             }
258 
259 
260             // no-phase
261             if (resourceManagers.size() == 0) {
262                 synchronized (this) {
263                     status = Status.STATUS_COMMITTED;
264                 }
265                 return;
266             }
267 
268             // one-phase
269             if (resourceManagers.size() == 1) {
270                 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
271                 try {
272                     manager.getCommitter().commit(manager.getBranchId(), true);
273                     synchronized (this) {
274                         status = Status.STATUS_COMMITTED;
275                     }
276                     return;
277                 } catch (XAException e) {
278                     synchronized (this) {
279                         status = Status.STATUS_ROLLEDBACK;
280                     }
281                     throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
282                 }
283             }
284 
285             // two-phase
286             boolean willCommit = internalPrepare();
287 
288             // notify the RMs
289             if (willCommit) {
290                 commitResources(resourceManagers);
291             } else {
292                 rollbackResources(resourceManagers);
293                 throw new RollbackException("Unable to commit");
294             }
295         } finally {
296             afterCompletion();
297             synchronized (this) {
298                 status = Status.STATUS_NO_TRANSACTION;
299             }
300         }
301     }
302 
303     //Used from XATerminator for first phase in a remotely controlled tx.
304     int prepare() throws SystemException, RollbackException {
305         beforePrepare();
306         int result = XAResource.XA_RDONLY;
307         try {
308             LinkedList rms;
309             synchronized (this) {
310                 if (status == Status.STATUS_ACTIVE) {
311                     if (resourceManagers.size() == 0) {
312                         // nothing to commit
313                         status = Status.STATUS_COMMITTED;
314                         return result;
315                     } else {
316                         // start prepare part of two-phase
317                         status = Status.STATUS_PREPARING;
318                     }
319                 }
320                 // resourceManagers is now immutable
321                 rms = resourceManagers;
322             }
323 
324             boolean willCommit = internalPrepare();
325 
326             // notify the RMs
327             if (willCommit) {
328                 if (!rms.isEmpty()) {
329                     result = XAResource.XA_OK;
330                 }
331             } else {
332                 rollbackResources(rms);
333                 throw new RollbackException("Unable to commit");
334             }
335         } finally {
336             if (result == XAResource.XA_RDONLY) {
337                 afterCompletion();
338                 synchronized (this) {
339                     status = Status.STATUS_NO_TRANSACTION;
340                 }
341             }
342         }
343         return result;
344     }
345 
346     //used from XATerminator for commit phase of non-readonly remotely controlled tx.
347     void preparedCommit() throws SystemException {
348         try {
349             commitResources(resourceManagers);
350         } finally {
351             afterCompletion();
352             synchronized (this) {
353                 status = Status.STATUS_NO_TRANSACTION;
354             }
355         }
356     }
357 
358     //helper method used by Transaction.commit and XATerminator prepare.
359     private void beforePrepare() {
360         synchronized (this) {
361             switch (status) {
362                 case Status.STATUS_ACTIVE:
363                 case Status.STATUS_MARKED_ROLLBACK:
364                     break;
365                 default:
366                     throw new IllegalStateException("Status is " + getStateString(status));
367             }
368         }
369 
370         beforeCompletion();
371         endResources();
372     }
373 
374 
375     //helper method used by Transaction.commit and XATerminator prepare.
376     private boolean internalPrepare() throws SystemException {
377       
378         for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
379             synchronized (this) {
380                 if (status != Status.STATUS_PREPARING) {
381                     // we were marked for rollback
382                     break;
383                 }
384             }
385             TransactionBranch manager = (TransactionBranch) rms.next();
386             try {
387                 int vote = manager.getCommitter().prepare(manager.getBranchId());
388                 if (vote == XAResource.XA_RDONLY) {
389                     // we don't need to consider this RM any more
390                     rms.remove();
391                 }
392             } catch (XAException e) {
393                 synchronized (this) {
394                     status = Status.STATUS_MARKED_ROLLBACK;
395                     //TODO document why this is true from the spec.
396                     //XAException during prepare means we can assume resource is rolled back.
397                     rms.remove();
398                     break;
399                 }
400             }
401         }
402 
403         
404         // decision time...
405         boolean willCommit;
406         synchronized (this) {
407             willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
408             if (willCommit) {
409                 status = Status.STATUS_PREPARED;
410             }
411         }
412         // log our decision
413         if (willCommit && !resourceManagers.isEmpty()) {
414             try {
415                 logMark = txnLog.prepare(xid, resourceManagers);
416             } catch (LogException e) {
417                 try {
418                     rollbackResources(resourceManagers);
419                 } catch (Exception se) {
420                     log.error("Unable to rollback after failure to log prepare", se.getCause());
421                 }
422                 throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
423             }
424         }
425         return willCommit;
426     }
427 
428     public void rollback() throws IllegalStateException, SystemException {
429         List rms;
430         synchronized (this) {
431             switch (status) {
432                 case Status.STATUS_ACTIVE:
433                     status = Status.STATUS_MARKED_ROLLBACK;
434                     break;
435                 case Status.STATUS_MARKED_ROLLBACK:
436                     break;
437                 default:
438                     throw new IllegalStateException("Status is " + getStateString(status));
439             }
440             rms = resourceManagers;
441         }
442 
443         beforeCompletion();
444         endResources();
445         try {
446             rollbackResources(rms);
447             //only write rollback record if we have already written prepare record.
448             if (logMark != null) {
449                 try {
450                     txnLog.rollback(xid, logMark);
451                 } catch (LogException e) {
452                     try {
453                         rollbackResources(rms);
454                     } catch (Exception se) {
455                         log.error("Unable to rollback after failure to log decision", se.getCause());
456                     }
457                     throw (SystemException) new SystemException("Error logging rollback").initCause(e);
458                 }
459             }
460         } finally {
461             afterCompletion();
462             synchronized (this) {
463                 status = Status.STATUS_NO_TRANSACTION;
464             }
465         }
466     }
467 
468     private void beforeCompletion() {
469         int i = 0;
470         while (true) {
471             Synchronization synch;
472             synchronized (this) {
473                 if (i == syncList.size()) {
474                     return;
475                 }
476                 synch = (Synchronization) syncList.get(i++);
477             }
478             try {
479                 synch.beforeCompletion();
480             } catch (Exception e) {
481                 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
482                 synchronized (this) {
483                     status = Status.STATUS_MARKED_ROLLBACK;
484                 }
485             }
486         }
487     }
488 
489     private void afterCompletion() {
490         // this does not synchronize because nothing can modify our state at this time
491         for (Iterator i = syncList.iterator(); i.hasNext();) {
492             Synchronization synch = (Synchronization) i.next();
493             try {
494                 synch.afterCompletion(status);
495             } catch (Exception e) {
496                 log.warn("Unexpected exception from afterCompletion; continuing", e);
497                 continue;
498             }
499         }
500     }
501 
502     private void endResources() {
503         endResources(activeXaResources);
504         endResources(suspendedXaResources);
505     }
506 
507     private void endResources(IdentityHashMap resourceMap) {
508         while (true) {
509             XAResource xaRes;
510             TransactionBranch manager;
511             int flags;
512             synchronized (this) {
513                 Set entrySet = resourceMap.entrySet();
514                 if (entrySet.isEmpty()) {
515                     return;
516                 }
517                 Map.Entry entry = (Map.Entry) entrySet.iterator().next();
518                 xaRes = (XAResource) entry.getKey();
519                 manager = (TransactionBranch) entry.getValue();
520                 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
521                 resourceMap.remove(xaRes);
522             }
523             try {
524                 xaRes.end(manager.getBranchId(), flags);
525             } catch (XAException e) {
526                 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back", e);
527                 synchronized (this) {
528                     status = Status.STATUS_MARKED_ROLLBACK;
529                 }
530             }
531         }
532     }
533 
534     private void rollbackResources(List rms) throws SystemException {
535         SystemException cause = null;
536         synchronized (this) {
537             status = Status.STATUS_ROLLING_BACK;
538         }
539         for (Iterator i = rms.iterator(); i.hasNext();) {
540             TransactionBranch manager = (TransactionBranch) i.next();
541             try {
542                 manager.getCommitter().rollback(manager.getBranchId());
543             } catch (XAException e) {
544                 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
545                 if (cause == null) {
546                     cause = new SystemException(e.errorCode);
547                 }
548                 continue;
549             }
550         }
551         synchronized (this) {
552             status = Status.STATUS_ROLLEDBACK;
553         }
554         if (cause != null) {
555             throw cause;
556         }
557     }
558 
559     private void commitResources(List rms) throws SystemException {
560         SystemException cause = null;
561         synchronized (this) {
562             status = Status.STATUS_COMMITTING;
563         }
564         for (Iterator i = rms.iterator(); i.hasNext();) {
565             TransactionBranch manager = (TransactionBranch) i.next();
566             try {
567                 manager.getCommitter().commit(manager.getBranchId(), false);
568             } catch (XAException e) {
569                 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
570                 if (cause == null) {
571                     cause = new SystemException(e.errorCode);
572                 }
573                 continue;
574             }
575         }
576         try {
577             txnLog.commit(xid, logMark);
578         } catch (LogException e) {
579             log.error("Unexpected exception logging commit completion for xid " + xid, e);
580             throw (SystemException)new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
581         }
582         synchronized (this) {
583             status = Status.STATUS_COMMITTED;
584         }
585         if (cause != null) {
586             throw cause;
587         }
588     }
589 
590     private static String getStateString(int status) {
591         switch (status) {
592             case Status.STATUS_ACTIVE:
593                 return "STATUS_ACTIVE";
594             case Status.STATUS_PREPARING:
595                 return "STATUS_PREPARING";
596             case Status.STATUS_PREPARED:
597                 return "STATUS_PREPARED";
598             case Status.STATUS_MARKED_ROLLBACK:
599                 return "STATUS_MARKED_ROLLBACK";
600             case Status.STATUS_ROLLING_BACK:
601                 return "STATUS_ROLLING_BACK";
602             case Status.STATUS_COMMITTING:
603                 return "STATUS_COMMITTING";
604             case Status.STATUS_COMMITTED:
605                 return "STATUS_COMMITTED";
606             case Status.STATUS_ROLLEDBACK:
607                 return "STATUS_ROLLEDBACK";
608             case Status.STATUS_NO_TRANSACTION:
609                 return "STATUS_NO_TRANSACTION";
610             case Status.STATUS_UNKNOWN:
611                 return "STATUS_UNKNOWN";
612             default:
613                 throw new AssertionError();
614         }
615     }
616 
617     public boolean equals(Object obj) {
618         if (obj instanceof TransactionImpl) {
619             TransactionImpl other = (TransactionImpl) obj;
620             return xid.equals(other.xid);
621         } else {
622             return false;
623         }
624     }
625 
626     //when used from recovery, do not add manager to active or suspended resource maps.
627     // The xaresources have already been ended with TMSUCCESS.
628     public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
629         TransactionBranch manager = new TransactionBranch(xaRes, branchId);
630         resourceManagers.add(manager);
631         return manager;
632     }
633 
634     private static class TransactionBranch implements TransactionBranchInfo {
635         private final NamedXAResource committer;
636         private final Xid branchId;
637 
638         public TransactionBranch(XAResource xaRes, Xid branchId) {
639             committer = (NamedXAResource)xaRes;
640             this.branchId = branchId;
641         }
642 
643         public NamedXAResource getCommitter() {
644             return committer;
645         }
646 
647         public Xid getBranchId() {
648             return branchId;
649         }
650 
651         public String getResourceName() {
652             return committer.getName();
653         }
654 
655         public Xid getBranchXid() {
656             return branchId;
657         }
658     }
659 
660 
661 }