Source code: org/activemq/store/journal/JournalTransactionStore.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.store.journal;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24
25 import javax.jms.JMSException;
26 import javax.transaction.xa.XAException;
27
28 import org.activeio.journal.RecordLocation;
29 import org.activemq.message.ActiveMQMessage;
30 import org.activemq.message.ActiveMQXid;
31 import org.activemq.message.MessageAck;
32 import org.activemq.store.TransactionStore;
33 import org.apache.derby.iapi.store.raw.xact.TransactionId;
34
35 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
36
37 /**
38 */
39 public class JournalTransactionStore implements TransactionStore {
40
41 private final JournalPersistenceAdapter peristenceAdapter;
42 ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
43 ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
44
45 public static class TxOperation {
46
47 static final byte ADD_OPERATION_TYPE = 0;
48 static final byte REMOVE_OPERATION_TYPE = 1;
49 static final byte ACK_OPERATION_TYPE = 3;
50
51 public byte operationType;
52 public JournalMessageStore store;
53 public Object data;
54
55 public TxOperation(byte operationType, JournalMessageStore store, Object data) {
56 this.operationType=operationType;
57 this.store=store;
58 this.data=data;
59 }
60
61 }
62 /**
63 * Operations
64 * @version $Revision: 1.3 $
65 */
66 public static class Tx {
67
68 private final RecordLocation location;
69 private ArrayList operations = new ArrayList();
70
71 public Tx(RecordLocation location) {
72 this.location=location;
73 }
74
75 public void add(JournalMessageStore store, ActiveMQMessage msg) {
76 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
77 }
78
79 public void add(JournalMessageStore store, MessageAck ack) {
80 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
81 }
82
83 public void add(JournalTopicMessageStore store, JournalAck ack) {
84 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
85 }
86
87 public ActiveMQMessage[] getMessages() {
88 ArrayList list = new ArrayList();
89 for (Iterator iter = operations.iterator(); iter.hasNext();) {
90 TxOperation op = (TxOperation) iter.next();
91 if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
92 list.add(op.data);
93 }
94 }
95 ActiveMQMessage rc[] = new ActiveMQMessage[list.size()];
96 list.toArray(rc);
97 return rc;
98 }
99
100 public MessageAck[] getAcks() {
101 ArrayList list = new ArrayList();
102 for (Iterator iter = operations.iterator(); iter.hasNext();) {
103 TxOperation op = (TxOperation) iter.next();
104 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
105 list.add(op.data);
106 }
107 }
108 MessageAck rc[] = new MessageAck[list.size()];
109 list.toArray(rc);
110 return rc;
111 }
112
113 public ArrayList getOperations() {
114 return operations;
115 }
116
117 }
118
119 public interface AddMessageCommand {
120 ActiveMQMessage getMessage();
121
122 void run() throws IOException;
123 }
124
125 public interface RemoveMessageCommand {
126 MessageAck getMessageAck();
127
128 void run() throws IOException;
129 }
130
131 public JournalTransactionStore(JournalPersistenceAdapter adapter) {
132 this.peristenceAdapter = adapter;
133 }
134
135 /**
136 * @throws XAException
137 * @throws IOException
138 * @see org.activemq.store.TransactionStore#prepare(TransactionId)
139 */
140 public void prepare(Object txid) throws XAException {
141 Tx tx = (Tx) inflightTransactions.remove(txid);
142 if (tx == null)
143 return;
144 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_PREPARE, txid, false), true);
145 preparedTransactions.put(txid, tx);
146 }
147
148 /**
149 * @throws IOException
150 * @see org.activemq.store.TransactionStore#prepare(TransactionId)
151 */
152 public void replayPrepare(Object txid) throws IOException {
153 Tx tx = (Tx) inflightTransactions.remove(txid);
154 if (tx == null)
155 return;
156 preparedTransactions.put(txid, tx);
157 }
158
159 public Tx getTx(Object txid, RecordLocation location) {
160 Tx tx = (Tx) inflightTransactions.get(txid);
161 if (tx == null) {
162 tx = new Tx(location);
163 inflightTransactions.put(txid, tx);
164 }
165 return tx;
166 }
167
168 /**
169 * @throws XAException
170 * @throws XAException
171 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
172 */
173 public void commit(Object txid, boolean wasPrepared) throws XAException {
174 Tx tx;
175 if (wasPrepared) {
176 tx = (Tx) preparedTransactions.remove(txid);
177 } else {
178 tx = (Tx) inflightTransactions.remove(txid);
179 }
180
181 if (tx == null)
182 return;
183
184 if (txid.getClass() == ActiveMQXid.class ) {
185 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_COMMIT, txid, wasPrepared),
186 true);
187 } else {
188 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_COMMIT, txid, wasPrepared),
189 true);
190 }
191 }
192
193 /**
194 * @throws XAException
195 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
196 */
197 public Tx replayCommit(Object txid, boolean wasPrepared) throws IOException {
198 if (wasPrepared) {
199 return (Tx) preparedTransactions.remove(txid);
200 } else {
201 return (Tx) inflightTransactions.remove(txid);
202 }
203 }
204
205 /**
206 * @throws XAException
207 * @throws IOException
208 * @see org.activemq.store.TransactionStore#rollback(TransactionId)
209 */
210 public void rollback(Object txid) throws XAException {
211
212 Tx tx = (Tx) inflightTransactions.remove(txid);
213 if (tx != null)
214 tx = (Tx) preparedTransactions.remove(txid);
215
216 if (tx != null) {
217 if (txid.getClass() == ActiveMQXid.class ) {
218 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_ROLLBACK, txid, false),
219 true);
220 } else {
221 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_ROLLBACK, txid, false),
222 true);
223 }
224 }
225
226 }
227
228 /**
229 * @throws IOException
230 * @see org.activemq.store.TransactionStore#rollback(TransactionId)
231 */
232 public void replayRollback(Object txid) throws IOException {
233 Tx tx = (Tx) inflightTransactions.remove(txid);
234 if (tx != null)
235 tx = (Tx) preparedTransactions.remove(txid);
236 }
237
238 synchronized public void recover(RecoveryListener listener) throws XAException {
239 // All the inflight transactions get rolled back..
240 inflightTransactions.clear();
241 try {
242 for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
243 Object txid = (Object) iter.next();
244 Tx tx = (Tx) preparedTransactions.get(txid);
245 try {
246 listener.recover((ActiveMQXid) txid,tx.getMessages(), tx.getAcks());
247 } catch (JMSException e) {
248 throw (XAException)new XAException().initCause(e);
249 }
250 }
251 } finally {
252 }
253 }
254
255 /**
256 * @param message
257 * @throws IOException
258 */
259 void addMessage(JournalMessageStore store, ActiveMQMessage message, RecordLocation location) {
260 Tx tx = getTx(message.getTransactionId(), location);
261 tx.add(store, message);
262 }
263
264 /**
265 * @param ack
266 * @throws IOException
267 */
268 public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) {
269 Tx tx = getTx(ack.getTransactionId(), location);
270 tx.add(store, ack);
271 }
272
273
274 public void acknowledge(JournalTopicMessageStore store, JournalAck ack, RecordLocation location) {
275 Tx tx = getTx(ack.getTransactionId(), location);
276 tx.add(store, ack);
277 }
278
279
280 public RecordLocation checkpoint() throws IOException {
281
282 // Nothing really to checkpoint.. since, we don't
283 // checkpoint tx operations in to long term store until they are committed.
284
285 // But we keep track of the first location of an operation
286 // that was associated with an active tx. The journal can not
287 // roll over active tx records.
288 RecordLocation rc = null;
289 for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
290 Tx tx = (Tx) iter.next();
291 RecordLocation location = tx.location;
292 if (rc == null || rc.compareTo(location) < 0) {
293 rc = location;
294 }
295 }
296 for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
297 Tx tx = (Tx) iter.next();
298 RecordLocation location = tx.location;
299 if (rc == null || rc.compareTo(location) < 0) {
300 rc = location;
301 }
302 }
303 return rc;
304 }
305
306 public void start() throws JMSException {
307 }
308
309 public void stop() throws JMSException {
310 }
311
312
313 }