Source code: org/activemq/service/impl/DurableQueueMessageContainer.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.service.impl;
19
20 import javax.jms.JMSException;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.activemq.message.ActiveMQMessage;
25 import org.activemq.message.MessageAck;
26 import org.activemq.service.DeadLetterPolicy;
27 import org.activemq.service.MessageContainerAdmin;
28 import org.activemq.service.MessageIdentity;
29 import org.activemq.service.QueueList;
30 import org.activemq.service.QueueListEntry;
31 import org.activemq.service.QueueMessageContainer;
32 import org.activemq.service.TransactionManager;
33 import org.activemq.service.TransactionTask;
34 import org.activemq.store.MessageStore;
35 import org.activemq.store.PersistenceAdapter;
36 import org.activemq.store.RecoveryListener;
37
38 /**
39 * A default implementation of a Durable Queue based
40 * {@link org.activemq.service.MessageContainer}
41 * which acts as an adapter between the {@link org.activemq.service.MessageContainerManager}
42 * requirements and those of the persistent {@link MessageStore} implementations.
43 *
44 * @version $Revision: 1.1.1.1 $
45 */
46 public class DurableQueueMessageContainer implements QueueMessageContainer, MessageContainerAdmin {
47 private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class);
48
49 private MessageStore messageStore;
50 private String destinationName;
51 private boolean deadLetterQueue;
52
53 /**
54 * messages to be delivered
55 */
56 private QueueList messagesToBeDelivered;
57 /**
58 * messages that have been delivered but not acknowledged
59 */
60 private QueueList deliveredMessages;
61
62 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName) {
63 this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList());
64 }
65
66 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) {
67 this.messageStore = messageStore;
68 this.destinationName = destinationName;
69 this.messagesToBeDelivered = messagesToBeDelivered;
70 this.deliveredMessages = deliveredMessages;
71 this.deadLetterQueue = destinationName.startsWith(DeadLetterPolicy.DEAD_LETTER_PREFIX);
72 }
73
74 public String getDestinationName() {
75 return destinationName;
76 }
77
78 public void addMessage(ActiveMQMessage message) throws JMSException {
79 messageStore.addMessage(message);
80 final MessageIdentity answer = message.getJMSMessageIdentity();
81
82 // If there is no transaction.. then this executes directly.
83 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
84 public void execute() throws Throwable {
85 synchronized( this ) {
86 messagesToBeDelivered.add(answer);
87 }
88 }
89 });
90 }
91
92 public synchronized void delete(final MessageIdentity messageID, MessageAck ack) throws JMSException {
93
94 messageStore.removeMessage(ack);
95
96 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
97 public void execute() throws Throwable {
98 doDelete(messageID);
99 }
100 });
101
102 }
103
104 /**
105 * @param messageID
106 * @param storedIdentity
107 * @throws JMSException
108 */
109 private void doDelete(MessageIdentity messageID) throws JMSException {
110 MessageIdentity storedIdentity=null;
111 synchronized( this ) {
112 QueueListEntry entry = deliveredMessages.getFirstEntry();
113 while (entry != null) {
114 MessageIdentity identity = (MessageIdentity) entry.getElement();
115 if (messageID.equals(identity)) {
116 deliveredMessages.remove(entry);
117 storedIdentity=identity;
118 break;
119 }
120 entry = deliveredMessages.getNextEntry(entry);
121 }
122
123 if (storedIdentity==null) {
124 // maybe the messages have not been delivered yet
125 // as we are recovering from a previous transaction log
126 entry = messagesToBeDelivered.getFirstEntry();
127 while (entry != null) {
128 MessageIdentity identity = (MessageIdentity) entry.getElement();
129 if (messageID.equals(identity)) {
130 messagesToBeDelivered.remove(entry);
131 storedIdentity=identity;
132 break;
133 }
134 entry = messagesToBeDelivered.getNextEntry(entry);
135 }
136 }
137 }
138
139 if (storedIdentity==null) {
140 log.error("Attempt to acknowledge unknown messageID: " + messageID);
141 } else {
142 }
143 }
144
145 public ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
146 return messageStore.getMessage(messageID);
147 }
148
149
150 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
151 /** TODO: make more optimal implementation */
152 return getMessage(messageIdentity) != null;
153 }
154
155 /**
156 * Does nothing since when we receive an acknowledgement on a queue
157 * we can delete the message
158 *
159 * @param messageIdentity
160 */
161 public void registerMessageInterest(MessageIdentity messageIdentity) {
162 }
163
164 /**
165 * Does nothing since when we receive an acknowledgement on a queue
166 * we can delete the message
167 *
168 * @param messageIdentity
169 * @param ack
170 */
171 public void unregisterMessageInterest(MessageIdentity ack) {
172 }
173
174 public ActiveMQMessage poll() throws JMSException {
175 ActiveMQMessage message = null;
176 MessageIdentity messageIdentity = null;
177 synchronized (this) {
178 messageIdentity = (MessageIdentity) messagesToBeDelivered.removeFirst();
179 if (messageIdentity != null) {
180 deliveredMessages.add(messageIdentity);
181 }
182 }
183 if (messageIdentity != null) {
184 message = messageStore.getMessage(messageIdentity);
185 }
186 return message;
187 }
188
189 public ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException {
190 ActiveMQMessage answer = null;
191 MessageIdentity identity = null;
192 synchronized( this ) {
193 if (messageID == null) {
194 identity = (MessageIdentity) messagesToBeDelivered.getFirst();
195 }
196 else {
197 int index = messagesToBeDelivered.indexOf(messageID);
198 if (index >= 0 && (index + 1) < messagesToBeDelivered.size()) {
199 identity = (MessageIdentity) messagesToBeDelivered.get(index + 1);
200 }
201 }
202
203 }
204 if (identity != null) {
205 answer = messageStore.getMessage(identity);
206 }
207 return answer;
208 }
209
210
211 public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException {
212 boolean result = deliveredMessages.remove(messageIdentity);
213 messagesToBeDelivered.addFirst(messageIdentity);
214 }
215
216 /**
217 * called to reset dispatch pointers if a new Message Consumer joins
218 *
219 * @throws javax.jms.JMSException
220 */
221 public synchronized void reset() throws JMSException {
222 //new Message Consumer - move all filtered/undispatched messages to front of queue
223 int count = 0;
224 MessageIdentity messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
225 while (messageIdentity != null) {
226 messagesToBeDelivered.add(count++, messageIdentity);
227 messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
228 }
229 }
230
231 public synchronized void start() throws JMSException {
232 final QueueMessageContainer container = this;
233 messageStore.start();
234 messageStore.recover(new RecoveryListener() {
235 public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
236 DurableQueueMessageContainer.this.recoverMessageToBeDelivered(messageIdentity);
237 }
238 });
239 }
240
241 public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
242 messagesToBeDelivered.add(messageIdentity);
243 }
244
245 public void stop() throws JMSException {
246 messageStore.stop();
247 }
248
249 /**
250 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
251 */
252 public MessageContainerAdmin getMessageContainerAdmin() {
253 return this;
254 }
255
256 /**
257 * @see org.activemq.service.MessageContainerAdmin#empty()
258 */
259 public void empty() throws JMSException {
260 messageStore.removeAllMessages();
261 }
262
263 /**
264 * @see org.activemq.service.QueueMessageContainer#isDeadLetterQueue()
265 */
266 public boolean isDeadLetterQueue() {
267 return deadLetterQueue;
268 }
269
270 /**
271 * @see org.activemq.service.QueueMessageContainer#setDeadLetterQueue(boolean)
272 */
273 public void setDeadLetterQueue(boolean value) {
274 deadLetterQueue = value;
275
276 }
277
278 }