Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/service/impl/DurableQueueMessageContainer.java


1   /**
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.service.impl;
19  
20  import javax.jms.JMSException;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.activemq.message.ActiveMQMessage;
25  import org.activemq.message.MessageAck;
26  import org.activemq.service.DeadLetterPolicy;
27  import org.activemq.service.MessageContainerAdmin;
28  import org.activemq.service.MessageIdentity;
29  import org.activemq.service.QueueList;
30  import org.activemq.service.QueueListEntry;
31  import org.activemq.service.QueueMessageContainer;
32  import org.activemq.service.TransactionManager;
33  import org.activemq.service.TransactionTask;
34  import org.activemq.store.MessageStore;
35  import org.activemq.store.PersistenceAdapter;
36  import org.activemq.store.RecoveryListener;
37  
38  /**
39   * A default implementation of a Durable Queue based
40   * {@link org.activemq.service.MessageContainer}
41   * which acts as an adapter between the {@link org.activemq.service.MessageContainerManager}
42   * requirements and those of the persistent {@link MessageStore} implementations.
43   *
44   * @version $Revision: 1.1.1.1 $
45   */
46  public class DurableQueueMessageContainer implements QueueMessageContainer, MessageContainerAdmin {
47      private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class);
48  
49      private MessageStore messageStore;
50      private String destinationName;
51      private boolean deadLetterQueue;
52  
53      /**
54       * messages to be delivered
55       */
56      private QueueList messagesToBeDelivered;
57      /**
58       * messages that have been delivered but not acknowledged
59       */
60      private QueueList deliveredMessages;
61  
62      public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName) {
63          this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList());
64      }
65  
66      public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) {
67          this.messageStore = messageStore;
68          this.destinationName = destinationName;
69          this.messagesToBeDelivered = messagesToBeDelivered;
70          this.deliveredMessages = deliveredMessages;
71          this.deadLetterQueue = destinationName.startsWith(DeadLetterPolicy.DEAD_LETTER_PREFIX);
72      }
73  
74      public String getDestinationName() {
75          return destinationName;
76      }
77  
78      public void addMessage(ActiveMQMessage message) throws JMSException {
79          messageStore.addMessage(message);
80          final MessageIdentity answer = message.getJMSMessageIdentity();
81          
82          // If there is no transaction.. then this executes directly.
83          TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
84              public void execute() throws Throwable {
85                  synchronized( this ) {
86                    messagesToBeDelivered.add(answer);
87                  }
88              }
89          });      
90      }
91  
92      public synchronized void delete(final MessageIdentity messageID, MessageAck ack) throws JMSException {
93          
94          messageStore.removeMessage(ack);          
95  
96          TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
97              public void execute() throws Throwable {
98                doDelete(messageID);
99              }
100         });      
101         
102     }
103 
104     /**
105      * @param messageID
106      * @param storedIdentity
107      * @throws JMSException
108      */
109     private void doDelete(MessageIdentity messageID) throws JMSException {
110         MessageIdentity storedIdentity=null;
111         synchronized( this ) {
112           QueueListEntry entry = deliveredMessages.getFirstEntry();
113           while (entry != null) {
114               MessageIdentity identity = (MessageIdentity) entry.getElement();
115               if (messageID.equals(identity)) {
116                   deliveredMessages.remove(entry);
117                   storedIdentity=identity;
118                   break;
119               }
120               entry = deliveredMessages.getNextEntry(entry);
121           }
122           
123           if (storedIdentity==null) {
124               // maybe the messages have not been delivered yet
125               // as we are recovering from a previous transaction log
126               entry = messagesToBeDelivered.getFirstEntry();
127               while (entry != null) {
128                   MessageIdentity identity = (MessageIdentity) entry.getElement();
129                   if (messageID.equals(identity)) {                      
130                         messagesToBeDelivered.remove(entry);
131                       storedIdentity=identity;
132                       break;
133                   }
134                   entry = messagesToBeDelivered.getNextEntry(entry);
135               }
136           }
137       }
138       
139         if (storedIdentity==null) {
140             log.error("Attempt to acknowledge unknown messageID: " + messageID);
141         } else {
142         }
143     }
144 
145     public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
146         return messageStore.getMessage(messageID);
147     }
148 
149 
150     public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
151         /** TODO: make more optimal implementation */
152         return getMessage(messageIdentity) != null;
153     }
154 
155     /**
156      * Does nothing since when we receive an acknowledgement on a queue
157      * we can delete the message
158      *
159      * @param messageIdentity
160      */
161     public void registerMessageInterest(MessageIdentity messageIdentity) {
162     }
163 
164     /**
165      * Does nothing since when we receive an acknowledgement on a queue
166      * we can delete the message
167      *
168      * @param messageIdentity
169      * @param ack
170      */
171     public void unregisterMessageInterest(MessageIdentity ack) {
172     }
173 
174     public ActiveMQMessage poll() throws JMSException {
175         ActiveMQMessage message = null;
176         MessageIdentity messageIdentity = null;
177         synchronized (this) {
178             messageIdentity = (MessageIdentity) messagesToBeDelivered.removeFirst();
179             if (messageIdentity != null) {
180                 deliveredMessages.add(messageIdentity);
181             }
182         }
183         if (messageIdentity != null) {
184             message = messageStore.getMessage(messageIdentity);
185         }
186         return message;
187     }
188 
189     public ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException {
190       ActiveMQMessage answer = null;
191       MessageIdentity identity = null;
192       synchronized( this ) {
193             if (messageID == null) {
194               identity = (MessageIdentity) messagesToBeDelivered.getFirst();
195             }
196             else {
197                 int index = messagesToBeDelivered.indexOf(messageID);
198                 if (index >= 0 && (index + 1) < messagesToBeDelivered.size()) {
199                   identity = (MessageIdentity) messagesToBeDelivered.get(index + 1);
200                 }
201             }
202             
203       }
204         if (identity != null) {
205             answer = messageStore.getMessage(identity);
206         }
207         return answer;
208     }
209 
210 
211     public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException {
212         boolean result = deliveredMessages.remove(messageIdentity);
213         messagesToBeDelivered.addFirst(messageIdentity);
214     }
215 
216     /**
217      * called to reset dispatch pointers if a new Message Consumer joins
218      *
219      * @throws javax.jms.JMSException
220      */
221     public synchronized void reset() throws JMSException {
222         //new Message Consumer - move all filtered/undispatched messages to front of queue
223         int count = 0;
224         MessageIdentity messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
225         while (messageIdentity != null) {
226             messagesToBeDelivered.add(count++, messageIdentity);
227             messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
228         }
229     }
230 
231     public synchronized void start() throws JMSException {
232         final QueueMessageContainer container = this;
233         messageStore.start();
234         messageStore.recover(new RecoveryListener() {
235             public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
236                 DurableQueueMessageContainer.this.recoverMessageToBeDelivered(messageIdentity);
237             }
238         });
239     }
240 
241     public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
242         messagesToBeDelivered.add(messageIdentity);
243     }
244 
245     public void stop() throws JMSException {
246         messageStore.stop();
247     }
248 
249     /**
250      * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
251      */
252     public MessageContainerAdmin getMessageContainerAdmin() {
253         return this;
254     }
255 
256     /**
257      * @see org.activemq.service.MessageContainerAdmin#empty()
258      */
259     public void empty() throws JMSException {
260         messageStore.removeAllMessages();
261     }
262 
263     /**
264      * @see org.activemq.service.QueueMessageContainer#isDeadLetterQueue()
265      */
266     public boolean isDeadLetterQueue() {
267         return deadLetterQueue;
268     }
269 
270     /**
271      * @see org.activemq.service.QueueMessageContainer#setDeadLetterQueue(boolean)
272      */
273     public void setDeadLetterQueue(boolean value) {
274         deadLetterQueue = value;
275         
276     }
277     
278 }