Source code: org/activemq/store/vm/VMTransactionStore.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.store.vm;
19
20 import java.util.ArrayList;
21 import java.util.Iterator;
22
23 import javax.jms.JMSException;
24 import javax.transaction.xa.XAException;
25
26 import org.activemq.message.ActiveMQMessage;
27 import org.activemq.message.ActiveMQXid;
28 import org.activemq.message.MessageAck;
29 import org.activemq.store.MessageStore;
30 import org.activemq.store.ProxyMessageStore;
31 import org.activemq.store.ProxyTopicMessageStore;
32 import org.activemq.store.TopicMessageStore;
33 import org.activemq.store.TransactionStore;
34
35 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
36
37 /**
38 * Provides a TransactionStore implementation that can create transaction aware
39 * MessageStore objects from non transaction aware MessageStore objects.
40 *
41 * @version $Revision: 1.1.1.1 $
42 */
43 public class VMTransactionStore implements TransactionStore {
44
45 ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
46
47 ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
48
49 private boolean doingRecover;
50
51 public static class Tx {
52 private ArrayList messages = new ArrayList();
53
54 private ArrayList acks = new ArrayList();
55
56 public void add(AddMessageCommand msg) {
57 messages.add(msg);
58 }
59
60 public void add(RemoveMessageCommand ack) {
61 acks.add(ack);
62 }
63
64 public ActiveMQMessage[] getMessages() {
65 ActiveMQMessage rc[] = new ActiveMQMessage[messages.size()];
66 int count=0;
67 for (Iterator iter = messages.iterator(); iter.hasNext();) {
68 AddMessageCommand cmd = (AddMessageCommand) iter.next();
69 rc[count++] = cmd.getMessage();
70 }
71 return rc;
72 }
73
74 public MessageAck[] getAcks() {
75 MessageAck rc[] = new MessageAck[acks.size()];
76 int count=0;
77 for (Iterator iter = acks.iterator(); iter.hasNext();) {
78 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
79 rc[count++] = cmd.getMessageAck();
80 }
81 return rc;
82 }
83
84 /**
85 * @throws JMSException
86 */
87 public void commit() throws XAException {
88 try {
89 // Do all the message adds.
90 for (Iterator iter = messages.iterator(); iter.hasNext();) {
91 AddMessageCommand cmd = (AddMessageCommand) iter.next();
92 cmd.run();
93 }
94 // And removes..
95 for (Iterator iter = acks.iterator(); iter.hasNext();) {
96 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
97 cmd.run();
98 }
99 } catch ( JMSException e) {
100 throw (XAException)new XAException(XAException.XAER_RMFAIL).initCause(e);
101 }
102 }
103 }
104
105 public interface AddMessageCommand {
106 ActiveMQMessage getMessage();
107 void run() throws JMSException;
108 }
109
110 public interface RemoveMessageCommand {
111 MessageAck getMessageAck();
112 void run() throws JMSException;
113 }
114
115 public MessageStore proxy(MessageStore messageStore) {
116 return new ProxyMessageStore(messageStore) {
117 public void addMessage(final ActiveMQMessage message) throws JMSException {
118 VMTransactionStore.this.addMessage(getDelegate(), message);
119 }
120
121 public void removeMessage(final MessageAck ack) throws JMSException {
122 VMTransactionStore.this.removeMessage(getDelegate(), ack);
123 }
124 };
125 }
126
127 public TopicMessageStore proxy(TopicMessageStore messageStore) {
128 return new ProxyTopicMessageStore(messageStore) {
129 public void addMessage(final ActiveMQMessage message) throws JMSException {
130 VMTransactionStore.this.addMessage(getDelegate(), message);
131 }
132 public void removeMessage(final MessageAck ack) throws JMSException {
133 VMTransactionStore.this.removeMessage(getDelegate(), ack);
134 }
135 };
136 }
137
138 /**
139 * @see org.activemq.store.TransactionStore#prepare(org.activemq.service.Transaction)
140 */
141 public void prepare(Object txid) {
142 Tx tx = (Tx) inflightTransactions.remove(txid);
143 if (tx == null)
144 return;
145 preparedTransactions.put(txid, tx);
146 }
147
148 public Tx getTx(Object txid) {
149 Tx tx = (Tx) inflightTransactions.get(txid);
150 if (tx == null) {
151 tx = new Tx();
152 inflightTransactions.put(txid, tx);
153 }
154 return tx;
155 }
156
157 /**
158 * @throws XAException
159 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
160 */
161 public void commit(Object txid, boolean wasPrepared) throws XAException {
162
163 Tx tx;
164 if( wasPrepared ) {
165 tx = (Tx) preparedTransactions.remove(txid);
166 } else {
167 tx = (Tx) inflightTransactions.remove(txid);
168 }
169
170 if( tx == null )
171 return;
172 tx.commit();
173
174 }
175
176 /**
177 * @see org.activemq.store.TransactionStore#rollback(org.activemq.service.Transaction)
178 */
179 public void rollback(Object txid) {
180 preparedTransactions.remove(txid);
181 inflightTransactions.remove(txid);
182 }
183
184 public void start() throws JMSException {
185 }
186
187 public void stop() throws JMSException {
188 }
189
190 synchronized public void recover(RecoveryListener listener) throws XAException {
191
192 // All the inflight transactions get rolled back..
193 inflightTransactions.clear();
194 this.doingRecover = true;
195 try {
196 for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
197 Object txid = (Object) iter.next();
198 try {
199 Tx tx = (Tx) preparedTransactions.get(txid);
200 listener.recover((ActiveMQXid) txid, tx.getMessages(), tx.getAcks());
201 } catch (JMSException e) {
202 throw (XAException) new XAException("Recovery of a transaction failed:").initCause(e);
203 }
204 }
205 } finally {
206 this.doingRecover = false;
207 }
208 }
209
210 /**
211 * @param message
212 * @throws JMSException
213 */
214 void addMessage(final MessageStore destination, final ActiveMQMessage message) throws JMSException {
215
216 if( doingRecover )
217 return;
218
219 if (message.isPartOfTransaction()) {
220 Tx tx = getTx(message.getTransactionId());
221 tx.add(new AddMessageCommand() {
222 public ActiveMQMessage getMessage() {
223 return message;
224 }
225 public void run() throws JMSException {
226 destination.addMessage(message);
227 }
228 });
229 } else {
230 destination.addMessage(message);
231 }
232 }
233
234 /**
235 * @param ack
236 * @throws JMSException
237 */
238 private void removeMessage(final MessageStore destination,final MessageAck ack) throws JMSException {
239 if( doingRecover )
240 return;
241
242 if (ack.isPartOfTransaction()) {
243 Tx tx = getTx(ack.getTransactionId());
244 tx.add(new RemoveMessageCommand() {
245 public MessageAck getMessageAck() {
246 return ack;
247 }
248 public void run() throws JMSException {
249 destination.removeMessage(ack);
250 }
251 });
252 } else {
253 destination.removeMessage(ack);
254 }
255 }
256
257 }