Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/store/journal/JournalMessageStore.java


1   /** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    * 
6    * Licensed under the Apache License, Version 2.0 (the "License"); 
7    * you may not use this file except in compliance with the License. 
8    * You may obtain a copy of the License at 
9    * 
10   * http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS, 
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
15   * See the License for the specific language governing permissions and 
16   * limitations under the License. 
17   * 
18   **/
19  package org.activemq.store.journal;
20  
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.HashSet;
24  import java.util.Iterator;
25  import java.util.LinkedHashMap;
26  
27  import javax.jms.JMSException;
28  
29  import org.activeio.journal.RecordLocation;
30  import org.activemq.message.ActiveMQMessage;
31  import org.activemq.message.MessageAck;
32  import org.activemq.service.MessageIdentity;
33  import org.activemq.service.Transaction;
34  import org.activemq.service.TransactionManager;
35  import org.activemq.service.TransactionTask;
36  import org.activemq.store.MessageStore;
37  import org.activemq.store.RecoveryListener;
38  import org.activemq.store.cache.CacheMessageStore;
39  import org.activemq.store.cache.CacheMessageStoreAware;
40  import org.activemq.util.Callback;
41  import org.activemq.util.TransactionTemplate;
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  
45  /**
46   * A MessageStore that uses a Journal to store it's messages.
47   * 
48   * @version $Revision: 1.1 $
49   */
50  public class JournalMessageStore implements MessageStore, CacheMessageStoreAware {
51  
52      private static final Log log = LogFactory.getLog(JournalMessageStore.class);
53      protected final JournalPersistenceAdapter peristenceAdapter;
54      protected final MessageStore longTermStore;
55      protected final String destinationName;
56      protected final TransactionTemplate transactionTemplate;
57  
58      private LinkedHashMap addedMessageIds = new LinkedHashMap();
59      private ArrayList removedMessageLocations = new ArrayList();
60      protected HashSet inFlightTxLocations = new HashSet();   
61      protected RecordLocation lastLocation;
62  
63      /** A MessageStore that we can use to retreive messages quickly. */
64      private MessageStore cacheMessageStore = this;
65  
66      protected final JournalTransactionStore transactionStore;
67  
68      private LinkedHashMap cpAddedMessageIds;
69      
70      int removedFromJournal;
71  
72      public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName) {
73          this.peristenceAdapter = adapter;
74          this.transactionStore = this.peristenceAdapter.getTransactionStore();
75          this.longTermStore = checkpointStore;
76          this.destinationName = destinationName;
77          this.transactionTemplate = new TransactionTemplate(adapter);
78      }
79  
80      /**
81       * Not synchronized since the Journal has better throughput if you increase
82       * the number of conncurrent writes that it is doing.
83       */
84      public void addMessage(final ActiveMQMessage message) throws JMSException {
85          final boolean debug = log.isDebugEnabled();
86          final RecordLocation location = peristenceAdapter.writePacket(destinationName, message, message.isReceiptRequired());
87          if( !TransactionManager.isCurrentTransaction() ) {
88              if( debug ) 
89                  log.debug("Journalled message add: "+message.getJMSMessageID()+" at "+location);
90               addMessage(message, location);
91          } else {
92              if( debug )
93                  log.debug("Journalled in flight message add: "+message.getJMSMessageID()+" at "+location);            
94              synchronized (this) {
95                  inFlightTxLocations.add(location);
96              }
97              final Transaction tx = TransactionManager.getContexTransaction(); 
98              transactionStore.addMessage(this, message, location);
99              tx.addPostCommitTask(new TransactionTask() {
100                 public void execute() throws Throwable {
101                     if( debug ) 
102                         log.debug("In flight message add commit: "+message.getJMSMessageID()+" at "+location);                        
103                     synchronized (JournalMessageStore.this) {
104                         inFlightTxLocations.remove(location);
105                         addMessage(message, location);
106                     }
107                 }
108             });
109             tx.addPostRollbackTask(new TransactionTask(){
110                 public void execute() throws Throwable {
111                     if( debug ) 
112                         log.debug("In flight message add rollback: "+message.getJMSMessageID()+" at "+location);                        
113                     // TODO Auto-generated method stub
114                     synchronized (JournalMessageStore.this) {
115                         inFlightTxLocations.remove(location);
116                     }
117                 }
118             });
119         }
120     }
121     
122     /**
123      * @param message
124      * @param location
125      */
126     private void addMessage(final ActiveMQMessage message, final RecordLocation location) {
127         synchronized (this) {
128             lastLocation=location;
129             MessageIdentity id = message.getJMSMessageIdentity();
130             addedMessageIds.put(id, location);
131         }
132     }
133 
134     /**
135      */
136     public void removeMessage(final MessageAck ack) throws JMSException {
137 
138         final boolean debug = log.isDebugEnabled();
139         final RecordLocation location = peristenceAdapter.writePacket(destinationName, ack, ack.isReceiptRequired());
140         if( !TransactionManager.isCurrentTransaction() ) {
141             if( debug ) 
142                 log.debug("Journalled message remove: "+ack.getMessageID()+" at "+location);            
143             removeMessage(ack, location);
144         } else {
145             if( debug ) 
146                 log.debug("Journalled in flight message remove: "+ack.getMessageID()+" at "+location);
147             
148             synchronized( this ) {
149                 inFlightTxLocations.add(location);
150             }
151             final Transaction tx = TransactionManager.getContexTransaction(); 
152             transactionStore.removeMessage(this, ack, location);
153             tx.addPostCommitTask(new TransactionTask(){
154                 public void execute() throws Throwable {
155                     if( debug ) 
156                         log.debug("In flight message remove commit: "+ack.getMessageID()+" at "+location);
157 
158                     synchronized (JournalMessageStore.this) {                        
159                         inFlightTxLocations.remove(location);
160                         removeMessage(ack, location);
161                     }
162                 }
163             });
164             tx.addPostRollbackTask(new TransactionTask(){
165                 public void execute() throws Throwable {
166                     // TODO Auto-generated method stub
167                     if( debug ) 
168                         log.debug("In flight message remove rollback: "+ack.getMessageID()+" at "+location);
169                     synchronized (JournalMessageStore.this) {
170                         inFlightTxLocations.remove(location);
171                     }
172                 }
173             });
174         }
175     }
176 
177     /**
178      * @param ack
179      * @param location
180      */
181     private void removeMessage(final MessageAck ack, final RecordLocation location) {
182         synchronized (this) {
183             lastLocation=location;
184             MessageIdentity id = ack.getMessageIdentity();
185             RecordLocation msgLocation = (RecordLocation) addedMessageIds.remove(id);
186             if (msgLocation == null) {
187                 removedMessageLocations.add(ack);
188             } else {
189                 removedFromJournal++;
190             }
191         }
192     }
193 
194     /**
195      * @return
196      * @throws JMSException
197      */
198     public RecordLocation checkpoint() throws JMSException {
199 
200         RecordLocation rc;
201         final ArrayList cpRemovedMessageLocations;
202         final ArrayList cpActiveJournalLocations;
203 
204         // swap out the message hash maps..
205         synchronized (this) {
206             cpAddedMessageIds = this.addedMessageIds;
207             cpRemovedMessageLocations = this.removedMessageLocations;
208 
209             this.inFlightTxLocations.removeAll(this.removedMessageLocations);
210             this.inFlightTxLocations.removeAll(this.addedMessageIds.values());            
211             cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
212             
213             this.addedMessageIds = new LinkedHashMap();
214             this.removedMessageLocations = new ArrayList();            
215             log.debug("removedFromJournal="+removedFromJournal);
216             removedFromJournal=0;
217         }
218         
219         final boolean debug = log.isDebugEnabled();
220         if( debug ) 
221             log.debug("Checkpoint: "+destinationName);
222         
223         
224         final int messagesAdded[]=new int[]{0};
225         final int messagesRemoved[]=new int[]{0};
226 
227         transactionTemplate.run(new Callback() {
228             public void execute() throws Throwable {
229 
230                 // Checkpoint the added messages.
231                 Iterator iterator = cpAddedMessageIds.keySet().iterator();
232                 while (iterator.hasNext()) {
233                     MessageIdentity identity = (MessageIdentity) iterator.next();
234                     if( debug ) 
235                         log.debug("Adding: "+identity.getMessageID());
236                     ActiveMQMessage msg = getCacheMessage(identity);
237                     // Pull it out of the journal if we have to.
238                     if (msg == null) {
239                         RecordLocation location = (RecordLocation) cpAddedMessageIds.get(identity);
240                         msg = (ActiveMQMessage) peristenceAdapter.readPacket((RecordLocation) location);
241                     }
242                     if( msg != null ) {
243                         try {
244                             longTermStore.addMessage(msg);
245                             messagesAdded[0]++;
246                         } catch (Throwable e) {
247                             log.warn("Message could not be added to long term store: " + e.getMessage(), e);
248                         }
249                     } else {
250                         log.warn("Journal could not reload message: " + identity);                        
251                     }
252                 }
253 
254                 // Checkpoint the removed messages.
255                 iterator = cpRemovedMessageLocations.iterator();
256                 while (iterator.hasNext()) {
257                     try {
258                         MessageAck ack = (MessageAck) iterator.next();
259                         if( debug ) 
260                             log.debug("Removing: "+ack.getMessageID());
261                         longTermStore.removeMessage(ack);
262                         messagesRemoved[0]++;
263                     } catch (Throwable e) {
264                         log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
265                     }
266                 }
267             }
268 
269         });
270 
271         log.debug("Added "+messagesAdded[0]+" message(s) and removed "+messagesRemoved[0]+" message(s). removedFromJournal="+removedFromJournal);
272         synchronized (this) {
273             cpAddedMessageIds = null;
274         }
275         
276         Collections.sort(cpActiveJournalLocations);
277         if( debug ) 
278             log.debug("In flight journal locations: "+cpActiveJournalLocations);
279         
280         if( cpActiveJournalLocations.size() > 0 ) {
281             return (RecordLocation) cpActiveJournalLocations.get(0);
282         } else {
283             return lastLocation;
284         }
285     }
286 
287     private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException {
288         return cacheMessageStore.getMessage(identity);
289     }
290 
291     /**
292      * 
293      */
294     public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
295         ActiveMQMessage answer = null;
296 
297         Object location;
298         synchronized (this) {
299             location = addedMessageIds.get(identity);
300             if( location==null && cpAddedMessageIds!=null )
301                 location = cpAddedMessageIds.get(identity);
302         }
303         
304         // Do we have a still have it in the journal?
305         if (location != null ) {
306             try {
307                 answer = (ActiveMQMessage)peristenceAdapter.readPacket((RecordLocation) location);
308                 if (answer != null)
309                     return answer;
310             } catch (Throwable e) {
311                 // We could have had an async checkpoint and thus we cannot read that location anymore,
312                 // but now the message should be in the long term store.
313             }
314         }
315 
316         // If all else fails try the long term message store.
317         return longTermStore.getMessage(identity);
318     }
319 
320     /**
321      * Replays the checkpointStore first as those messages are the oldest ones,
322      * then messages are replayed from the transaction log and then the cache is
323      * updated.
324      * 
325      * @param listener
326      * @throws JMSException
327      */
328     public void recover(final RecoveryListener listener) throws JMSException {
329         peristenceAdapter.checkpoint(true);
330         longTermStore.recover(listener);
331     }
332 
333     public void start() throws JMSException {
334         longTermStore.start();
335     }
336 
337     public void stop() throws JMSException {
338         longTermStore.stop();
339     }
340 
341     /**
342      * @return Returns the longTermStore.
343      */
344     public MessageStore getLongTermMessageStore() {
345         return longTermStore;
346     }
347 
348     /**
349      * @see org.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.activemq.store.cache.CacheMessageStore)
350      */
351     public void setCacheMessageStore(CacheMessageStore store) {
352         cacheMessageStore = store;
353         // Propagate the setCacheMessageStore method call to the longTermStore
354         // if possible.
355         if (longTermStore instanceof CacheMessageStoreAware) {
356             ((CacheMessageStoreAware) longTermStore).setCacheMessageStore(store);
357         }
358     }
359 
360     /**
361      * @see org.activemq.store.MessageStore#removeAllMessages()
362      */
363     public void removeAllMessages() throws JMSException {
364         peristenceAdapter.checkpoint(true);
365         longTermStore.removeAllMessages();
366     }
367 
368     public void replayAddMessage(ActiveMQMessage msg) {
369         try {
370             // Only add the message if it has not already been added.
371             ActiveMQMessage t = longTermStore.getMessage(msg.getJMSMessageIdentity());
372             if( t==null ) {
373                 longTermStore.addMessage(msg);
374             }
375         }
376         catch (Throwable e) {
377             log.debug("Could not replay add for message '" + msg.getJMSMessageIdentity().getMessageID() + "'.  Message may have already been added. reason: " + e);
378         }
379     }
380 
381     public void replayRemoveMessage(MessageAck ack) {
382         try {
383             // Only remove the message if it has not already been removed.
384             ActiveMQMessage t = longTermStore.getMessage(ack.getMessageIdentity());
385             if( t!=null ) {
386                 longTermStore.removeMessage(ack);
387             }
388         }
389         catch (Throwable e) {
390             log.debug("Could not replay acknowledge for message '" + ack.getMessageID() + "'.  Message may have already been acknowledged. reason: " + e);
391         }
392     }
393 
394 }