Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/store/journal/JournalTransactionStore.java


1   /** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.store.journal;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  
25  import javax.jms.JMSException;
26  import javax.transaction.xa.XAException;
27  
28  import org.activeio.journal.RecordLocation;
29  import org.activemq.message.ActiveMQMessage;
30  import org.activemq.message.ActiveMQXid;
31  import org.activemq.message.MessageAck;
32  import org.activemq.store.TransactionStore;
33  import org.apache.derby.iapi.store.raw.xact.TransactionId;
34  
35  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
36  
37  /**
38   */
39  public class JournalTransactionStore implements TransactionStore {
40  
41      private final JournalPersistenceAdapter peristenceAdapter;
42      ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
43      ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
44      
45      public static class TxOperation {
46          
47          static final byte ADD_OPERATION_TYPE       = 0;
48          static final byte REMOVE_OPERATION_TYPE    = 1;
49          static final byte ACK_OPERATION_TYPE       = 3;
50          
51          public byte operationType;
52          public JournalMessageStore store;
53          public Object data;
54          
55          public TxOperation(byte operationType, JournalMessageStore store, Object data) {
56              this.operationType=operationType;
57              this.store=store;
58              this.data=data;
59          }
60          
61      }
62      /**
63       * Operations
64       * @version $Revision: 1.3 $
65       */
66      public static class Tx {
67  
68          private final RecordLocation location;
69          private ArrayList operations = new ArrayList();
70  
71          public Tx(RecordLocation location) {
72              this.location=location;
73          }
74  
75          public void add(JournalMessageStore store, ActiveMQMessage msg) {
76              operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
77          }
78  
79          public void add(JournalMessageStore store, MessageAck ack) {
80              operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
81          }
82  
83          public void add(JournalTopicMessageStore store, JournalAck ack) {
84              operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
85          }
86          
87          public ActiveMQMessage[] getMessages() {
88              ArrayList list = new ArrayList();
89              for (Iterator iter = operations.iterator(); iter.hasNext();) {
90                  TxOperation op = (TxOperation) iter.next();
91                  if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
92                      list.add(op.data);
93                  }
94              }
95              ActiveMQMessage rc[] = new ActiveMQMessage[list.size()];
96              list.toArray(rc);
97              return rc;
98          }
99  
100         public MessageAck[] getAcks() {
101             ArrayList list = new ArrayList();
102             for (Iterator iter = operations.iterator(); iter.hasNext();) {
103                 TxOperation op = (TxOperation) iter.next();
104                 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
105                     list.add(op.data);
106                 }
107             }
108             MessageAck rc[] = new MessageAck[list.size()];
109             list.toArray(rc);
110             return rc;
111         }
112 
113         public ArrayList getOperations() {
114             return operations;
115         }
116 
117     }
118 
119     public interface AddMessageCommand {
120         ActiveMQMessage getMessage();
121 
122         void run() throws IOException;
123     }
124 
125     public interface RemoveMessageCommand {
126         MessageAck getMessageAck();
127 
128         void run() throws IOException;
129     }
130 
131     public JournalTransactionStore(JournalPersistenceAdapter adapter) {
132         this.peristenceAdapter = adapter;
133     }
134 
135     /**
136      * @throws XAException 
137      * @throws IOException
138      * @see org.activemq.store.TransactionStore#prepare(TransactionId)
139      */
140     public void prepare(Object txid) throws XAException {
141         Tx tx = (Tx) inflightTransactions.remove(txid);
142         if (tx == null)
143             return;
144         peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_PREPARE, txid, false), true);
145         preparedTransactions.put(txid, tx);
146     }
147     
148     /**
149      * @throws IOException
150      * @see org.activemq.store.TransactionStore#prepare(TransactionId)
151      */
152     public void replayPrepare(Object txid) throws IOException {
153         Tx tx = (Tx) inflightTransactions.remove(txid);
154         if (tx == null)
155             return;
156         preparedTransactions.put(txid, tx);
157     }
158 
159     public Tx getTx(Object txid, RecordLocation location) {
160         Tx tx = (Tx) inflightTransactions.get(txid);
161         if (tx == null) {
162             tx = new Tx(location);
163             inflightTransactions.put(txid, tx);
164         }
165         return tx;
166     }
167 
168     /**
169      * @throws XAException 
170      * @throws XAException
171      * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
172      */
173     public void commit(Object txid, boolean wasPrepared) throws XAException  {
174         Tx tx;
175         if (wasPrepared) {
176             tx = (Tx) preparedTransactions.remove(txid);
177         } else {
178             tx = (Tx) inflightTransactions.remove(txid);
179         }
180 
181         if (tx == null)
182             return;
183 
184         if (txid.getClass() == ActiveMQXid.class ) {
185             peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_COMMIT, txid, wasPrepared),
186                     true);
187         } else {
188             peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_COMMIT, txid, wasPrepared),
189                     true);
190         }
191     }
192 
193     /**
194      * @throws XAException
195      * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
196      */
197     public Tx replayCommit(Object txid, boolean wasPrepared) throws IOException {
198         if (wasPrepared) {
199             return (Tx) preparedTransactions.remove(txid);
200         } else {
201             return (Tx) inflightTransactions.remove(txid);
202         }
203     }
204 
205     /**
206      * @throws XAException 
207      * @throws IOException
208      * @see org.activemq.store.TransactionStore#rollback(TransactionId)
209      */
210     public void rollback(Object txid) throws XAException {
211 
212         Tx tx = (Tx) inflightTransactions.remove(txid);
213         if (tx != null)
214             tx = (Tx) preparedTransactions.remove(txid);
215 
216         if (tx != null) {
217             if (txid.getClass() == ActiveMQXid.class ) {
218                 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_ROLLBACK, txid, false),
219                         true);
220             } else {
221                 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_ROLLBACK, txid, false),
222                         true);
223             }
224         }
225 
226     }
227 
228     /**
229      * @throws IOException
230      * @see org.activemq.store.TransactionStore#rollback(TransactionId)
231      */
232     public void replayRollback(Object txid) throws IOException {
233         Tx tx = (Tx) inflightTransactions.remove(txid);
234         if (tx != null)
235             tx = (Tx) preparedTransactions.remove(txid);
236     }
237         
238     synchronized public void recover(RecoveryListener listener) throws XAException {
239         // All the inflight transactions get rolled back..
240         inflightTransactions.clear();
241         try {
242             for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
243                 Object txid = (Object) iter.next();
244                 Tx tx = (Tx) preparedTransactions.get(txid);
245                 try {
246                     listener.recover((ActiveMQXid) txid,tx.getMessages(), tx.getAcks());
247                 } catch (JMSException e) {
248                     throw (XAException)new XAException().initCause(e);
249                 }
250             }
251         } finally {
252         }
253     }
254 
255     /**
256      * @param message
257      * @throws IOException
258      */
259     void addMessage(JournalMessageStore store, ActiveMQMessage message, RecordLocation location) {
260         Tx tx = getTx(message.getTransactionId(), location);
261         tx.add(store, message);
262     }
263 
264     /**
265      * @param ack
266      * @throws IOException
267      */
268     public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) {
269         Tx tx = getTx(ack.getTransactionId(), location);
270         tx.add(store, ack);
271     }
272     
273     
274     public void acknowledge(JournalTopicMessageStore store, JournalAck ack, RecordLocation location) {
275         Tx tx = getTx(ack.getTransactionId(), location);
276         tx.add(store, ack);
277     }
278 
279 
280     public RecordLocation checkpoint() throws IOException {
281         
282         // Nothing really to checkpoint.. since, we don't
283         // checkpoint tx operations in to long term store until they are committed.
284 
285         // But we keep track of the first location of an operation
286         // that was associated with an active tx. The journal can not
287         // roll over active tx records.        
288         RecordLocation rc = null;
289         for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
290             Tx tx = (Tx) iter.next();
291             RecordLocation location = tx.location;
292             if (rc == null || rc.compareTo(location) < 0) {
293                 rc = location;
294             }
295         }
296         for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
297             Tx tx = (Tx) iter.next();
298             RecordLocation location = tx.location;
299             if (rc == null || rc.compareTo(location) < 0) {
300                 rc = location;
301             }
302         }
303         return rc;
304     }
305 
306     public void start() throws JMSException {
307     }
308 
309     public void stop() throws JMSException {
310     }
311 
312 
313 }