Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/store/vm/VMTransactionStore.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.store.vm;
19  
20  import java.util.ArrayList;
21  import java.util.Iterator;
22  
23  import javax.jms.JMSException;
24  import javax.transaction.xa.XAException;
25  
26  import org.activemq.message.ActiveMQMessage;
27  import org.activemq.message.ActiveMQXid;
28  import org.activemq.message.MessageAck;
29  import org.activemq.store.MessageStore;
30  import org.activemq.store.ProxyMessageStore;
31  import org.activemq.store.ProxyTopicMessageStore;
32  import org.activemq.store.TopicMessageStore;
33  import org.activemq.store.TransactionStore;
34  
35  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
36  
37  /**
38   * Provides a TransactionStore implementation that can create transaction aware
39   * MessageStore objects from non transaction aware MessageStore objects.
40   * 
41   * @version $Revision: 1.1.1.1 $
42   */
43  public class VMTransactionStore implements TransactionStore {
44  
45      ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
46  
47      ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
48  
49      private boolean doingRecover;
50  
51      public static class Tx {
52          private ArrayList messages = new ArrayList();
53  
54          private ArrayList acks = new ArrayList();
55  
56          public void add(AddMessageCommand msg) {
57              messages.add(msg);
58          }
59  
60          public void add(RemoveMessageCommand ack) {
61              acks.add(ack);
62          }
63  
64          public ActiveMQMessage[] getMessages() {
65              ActiveMQMessage rc[] = new ActiveMQMessage[messages.size()];
66              int count=0;
67              for (Iterator iter = messages.iterator(); iter.hasNext();) {
68                  AddMessageCommand cmd = (AddMessageCommand) iter.next();
69                  rc[count++] = cmd.getMessage(); 
70              }
71              return rc;
72          }
73  
74          public MessageAck[] getAcks() {
75              MessageAck rc[] = new MessageAck[acks.size()];
76              int count=0;
77              for (Iterator iter = acks.iterator(); iter.hasNext();) {
78                  RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
79                  rc[count++] = cmd.getMessageAck(); 
80              }
81              return rc;
82          }
83  
84          /**
85           * @throws JMSException
86           */
87          public void commit() throws XAException {
88              try {
89                // Do all the message adds.            
90                for (Iterator iter = messages.iterator(); iter.hasNext();) {
91                    AddMessageCommand cmd = (AddMessageCommand) iter.next();
92                    cmd.run();
93                }
94                // And removes..
95                for (Iterator iter = acks.iterator(); iter.hasNext();) {
96                    RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
97                    cmd.run();
98                }
99              } catch ( JMSException e) {
100                 throw (XAException)new XAException(XAException.XAER_RMFAIL).initCause(e);
101             }
102         }
103     }
104 
105     public interface AddMessageCommand {
106         ActiveMQMessage getMessage();
107         void run() throws JMSException;
108     }
109 
110     public interface RemoveMessageCommand {
111         MessageAck getMessageAck();
112         void run() throws JMSException;
113     }
114 
115     public MessageStore proxy(MessageStore messageStore) {
116       return new ProxyMessageStore(messageStore) {
117           public void addMessage(final ActiveMQMessage message) throws JMSException {
118               VMTransactionStore.this.addMessage(getDelegate(), message);
119           }
120   
121           public void removeMessage(final MessageAck ack) throws JMSException {
122               VMTransactionStore.this.removeMessage(getDelegate(), ack);
123           }
124       };
125     }
126 
127     public TopicMessageStore proxy(TopicMessageStore messageStore) {
128       return new ProxyTopicMessageStore(messageStore) {
129           public void addMessage(final ActiveMQMessage message) throws JMSException {
130               VMTransactionStore.this.addMessage(getDelegate(), message);
131           }
132           public void removeMessage(final MessageAck ack) throws JMSException {
133               VMTransactionStore.this.removeMessage(getDelegate(), ack);
134           }
135       };
136     }
137 
138     /**
139      * @see org.activemq.store.TransactionStore#prepare(org.activemq.service.Transaction)
140      */
141     public void prepare(Object txid) {
142         Tx tx = (Tx) inflightTransactions.remove(txid);
143         if (tx == null)
144             return;
145         preparedTransactions.put(txid, tx);
146     }
147 
148     public Tx getTx(Object txid) {
149         Tx tx = (Tx) inflightTransactions.get(txid);
150         if (tx == null) {
151             tx = new Tx();
152             inflightTransactions.put(txid, tx);
153         }
154         return tx;
155     }
156 
157     /**
158      * @throws XAException
159      * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
160      */
161     public void commit(Object txid, boolean wasPrepared) throws XAException {
162         
163         Tx tx;
164         if( wasPrepared ) {
165             tx = (Tx) preparedTransactions.remove(txid);
166         } else {
167             tx = (Tx) inflightTransactions.remove(txid);
168         }
169         
170         if( tx == null )
171             return;
172         tx.commit();
173         
174     }
175 
176     /**
177      * @see org.activemq.store.TransactionStore#rollback(org.activemq.service.Transaction)
178      */
179     public void rollback(Object txid) {
180         preparedTransactions.remove(txid);
181         inflightTransactions.remove(txid);
182     }
183 
184     public void start() throws JMSException {
185     }
186 
187     public void stop() throws JMSException {
188     }
189 
190     synchronized public void recover(RecoveryListener listener) throws XAException {
191 
192         // All the inflight transactions get rolled back..
193         inflightTransactions.clear();        
194         this.doingRecover = true;
195         try {
196           for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
197               Object txid = (Object) iter.next();
198               try {
199                   Tx tx = (Tx) preparedTransactions.get(txid);
200                   listener.recover((ActiveMQXid) txid, tx.getMessages(), tx.getAcks());
201               } catch (JMSException e) {
202                   throw (XAException) new XAException("Recovery of a transaction failed:").initCause(e);
203               }
204           }
205         } finally {
206             this.doingRecover = false;
207         }
208     }
209 
210     /**
211      * @param message
212      * @throws JMSException
213      */
214     void addMessage(final MessageStore destination, final ActiveMQMessage message) throws JMSException {
215         
216         if( doingRecover )
217             return;
218         
219         if (message.isPartOfTransaction()) {
220             Tx tx = getTx(message.getTransactionId());
221             tx.add(new AddMessageCommand() {
222                 public ActiveMQMessage getMessage() {
223                     return message;
224                 }
225                 public void run() throws JMSException {
226                     destination.addMessage(message);
227                 }
228             });
229         } else {
230             destination.addMessage(message);
231         }
232     }
233     
234     /**
235      * @param ack
236      * @throws JMSException
237      */
238     private void removeMessage(final MessageStore destination,final MessageAck ack) throws JMSException {
239         if( doingRecover )
240             return;
241 
242         if (ack.isPartOfTransaction()) {
243             Tx tx = getTx(ack.getTransactionId());
244             tx.add(new RemoveMessageCommand() {
245                 public MessageAck getMessageAck() {
246                     return ack;
247                 }
248                 public void run() throws JMSException {
249                     destination.removeMessage(ack);
250                 }
251             });
252         } else {
253             destination.removeMessage(ack);
254         }
255     }
256     
257 }