Source code: org/activemq/service/DeadLetterPolicy.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.service;
20 import javax.jms.JMSException;
21 import javax.jms.DeliveryMode;
22 import org.apache.commons.logging.*;
23 import org.activemq.broker.BrokerContainer;
24 import org.activemq.broker.Broker;
25 import org.activemq.message.ActiveMQDestination;
26 import org.activemq.message.ActiveMQMessage;
27 import org.activemq.message.ActiveMQQueue;
28 import org.activemq.store.PersistenceAdapter;
29 import org.activemq.util.IdGenerator;
30
31 /**
32 * Determines how messages are stored in a dead letter queue
33 *
34 * @version $Revision: 1.1.1.1 $
35 */
36 public class DeadLetterPolicy {
37 /**
38 * Prefix used by dead letter queues
39 */
40 public static final String DEAD_LETTER_PREFIX = "org.activemq.deadletter.";
41 private static final String DEFAULT_DEAD_LETTER_NAME = "DLQ";
42 private static final Log log = LogFactory.getLog(DeadLetterPolicy.class);
43 private Broker broker;
44 private String deadLetterPrefix = DEAD_LETTER_PREFIX;
45 private String deadLetterName = DEFAULT_DEAD_LETTER_NAME;
46 private boolean deadLetterEnabled = true;
47 private boolean deadLetterPerDestinationName = true;
48 private boolean storeNonPersistentMessages = true;
49 private boolean noTopicConsumerEnabled = true;
50 private boolean allowDuplicates = false;
51 private boolean useDatabaseLocking = false;
52 private long deadLetterQueueTTL = 0L;
53 private long deadLetterTopicTTL = 0L;
54 private IdGenerator idGenerator = new IdGenerator();
55
56 /**
57 * Construct a dead letter policy
58 *
59 * @param broker
60 */
61 public DeadLetterPolicy(Broker broker) {
62 this.broker = broker;
63 }
64
65 public DeadLetterPolicy(BrokerContainer brokerContainer) {
66 this(brokerContainer.getBroker());
67 }
68
69 /**
70 * Default constructor
71 */
72 public DeadLetterPolicy() {
73 }
74
75 /**
76 * @return Returns the broker.
77 */
78 public Broker getBroker() {
79 return broker;
80 }
81
82 /**
83 * @param broker The broker to set.
84 */
85 public void setBroker(Broker broker) {
86 this.broker = broker;
87 }
88
89 /**
90 * @return Returns the deadLetterEnabled.
91 */
92 public boolean isDeadLetterEnabled() {
93 return deadLetterEnabled;
94 }
95
96 /**
97 * @param deadLetterEnabled The deadLetterEnabled to set.
98 */
99 public void setDeadLetterEnabled(boolean deadLetterEnabled) {
100 this.deadLetterEnabled = deadLetterEnabled;
101 }
102
103 /**
104 * @return Returns the deadLetterPerDestinationName.
105 */
106 public boolean isDeadLetterPerDestinationName() {
107 return deadLetterPerDestinationName;
108 }
109
110 /**
111 * @param deadLetterPerDestinationName The deadLetterPerDestinationName to set.
112 */
113 public void setDeadLetterPerDestinationName(boolean deadLetterPerDestinationName) {
114 this.deadLetterPerDestinationName = deadLetterPerDestinationName;
115 }
116
117 /**
118 * @return Returns the deadLetterName.
119 */
120 public String getDeadLetterName() {
121 return deadLetterName;
122 }
123
124 /**
125 * @param deadLetterName The deadLetterName to set.
126 */
127 public void setDeadLetterName(String deadLetterName) {
128 this.deadLetterName = deadLetterName;
129 }
130
131 /**
132 * @return Returns the deadLetterPrefix.
133 */
134 public String getDeadLetterPrefix() {
135 return deadLetterPrefix;
136 }
137
138 /**
139 * @param deadLetterPrefix The deadLetterPrefix to set.
140 */
141 public void setDeadLetterPrefix(String deadLetterPrefix) {
142 this.deadLetterPrefix = deadLetterPrefix;
143 }
144
145 /**
146 * @return Returns the storeNonPersistentMessages.
147 */
148 public boolean isStoreNonPersistentMessages() {
149 return storeNonPersistentMessages;
150 }
151
152 /**
153 * @param storeNonPersistentMessages The storeNonPersistentMessages to set.
154 */
155 public void setStoreNonPersistentMessages(boolean storeNonPersistentMessages) {
156 this.storeNonPersistentMessages = storeNonPersistentMessages;
157 }
158
159 /**
160 * @return Returns the noTopicConsumerEnabled.
161 */
162 public boolean isNoTopicConsumerEnabled() {
163 return noTopicConsumerEnabled;
164 }
165 /**
166 * @param noTopicConsumerEnabled The noTopicConsumerEnabled to set.
167 */
168 public void setNoTopicConsumerEnabled(boolean noTopicConsumerEnabled) {
169 this.noTopicConsumerEnabled = noTopicConsumerEnabled;
170 }
171
172 /**
173 * @return Returns the allowDuplicates.
174 */
175 public boolean isAllowDuplicates() {
176 return allowDuplicates;
177 }
178 /**
179 * @param allowDuplicates The allowDuplicates to set.
180 */
181 public void setAllowDuplicates(boolean allowDuplicates) {
182 this.allowDuplicates = allowDuplicates;
183 }
184 /**
185 * @return Returns the useDatabaseLocking.
186 */
187 public boolean isUseDatabaseLocking() {
188 return useDatabaseLocking;
189 }
190 /**
191 * @param useDatabaseLocking The useDatabaseLocking to set.
192 */
193 public void setUseDatabaseLocking(boolean useDatabaseLocking) {
194 this.useDatabaseLocking = useDatabaseLocking;
195 }
196 /**
197 * @param deadLetterQueueTTL The deadLetterQueueTTL to set.
198 */
199 public void setDeadLetterQueueTTL(long deadLetterQueueTTL) {
200 this.deadLetterQueueTTL = deadLetterQueueTTL;
201 }
202 /**
203 * @param deadLetterTopicTTL The deadLetterTopicTTL to set.
204 */
205 public void setDeadLetterTopicTTL(long deadLetterTopicTTL) {
206 this.deadLetterTopicTTL = deadLetterTopicTTL;
207 }
208 /**
209 * Get the name of the DLQ from the destination provided
210 * @param destination
211 * @return the name of the DLQ for this Destination
212 */
213 public String getDeadLetterNameFromDestination(ActiveMQDestination destination){
214 String deadLetterName = deadLetterPrefix;
215 if (deadLetterPerDestinationName) {
216 deadLetterName += destination.getPhysicalName();
217 }
218 else {
219 deadLetterName += this.deadLetterName;
220 }
221 return deadLetterName;
222 }
223
224 /**
225 * Send a message to a dead letter queue
226 *
227 * @param message
228 * @throws JMSException
229 */
230 public void sendToDeadLetter(ActiveMQMessage message) {
231 if (deadLetterEnabled && message != null && (message.isPersistent() || storeNonPersistentMessages) && !message.isDispatchedFromDLQ()) {
232 if (broker != null) {
233 // process duplicates
234 if (!isAllowDuplicates()) {
235 PersistenceAdapter persistenceAdapter = getBroker().getPersistenceAdapter();
236 // make sure no previous dead letter was already sent
237 if (persistenceAdapter!=null
238 && message.getJMSMessageIdentity()!=null
239 && message.getJMSMessageIdentity().getSequenceNumber()!=null
240 && persistenceAdapter.deadLetterAlreadySent(((Long)message.getJMSMessageIdentity().getSequenceNumber()).longValue(), isUseDatabaseLocking())) {
241 if (log.isDebugEnabled()) log.debug("Dead letter has been already sent for this message: " + message.getJMSMessageID());
242 return;
243 }
244 }
245
246 // send a dead letter message
247 String dlqName = getDeadLetterNameFromDestination(message.getJMSActiveMQDestination());
248 try {
249 ActiveMQMessage deadMessage = createDeadLetterMessage(dlqName, message);
250 broker.sendToDeadLetterQueue(dlqName, deadMessage);
251 if (log.isDebugEnabled()) log.debug("Passed message: " + deadMessage + " to DLQ: " + dlqName);
252 } catch (JMSException e) {
253 log.warn("Failed to send message to dead letter due to: " + e, e);
254 }
255 }
256 else {
257 log.warn("Broker is not initialized - cannot add to DLQ: " + message);
258 }
259 }else if (log.isDebugEnabled()){
260 log.debug("DLQ not storing message: " + message);
261 }
262 }
263
264 protected ActiveMQMessage createDeadLetterMessage(String dlqName, ActiveMQMessage message) throws JMSException {
265 // make a shallow copy of the orginal message
266 ActiveMQMessage deadMessage = message.shallowCopy();
267
268 // generate a new producer and message ID
269 String id = idGenerator.generateId();
270 String producerKey = IdGenerator.getSeedFromId(id);
271 long seq = IdGenerator.getCountFromId(id);
272 deadMessage.setProducerKey(producerKey);
273 deadMessage.setJMSMessageID(id);
274 deadMessage.setSequenceNumber(seq);
275 deadMessage.getJMSMessageIdentity().setMessageID(id);
276 deadMessage.getJMSMessageIdentity().setSequenceNumber(new Long(seq));
277
278 ActiveMQQueue destination = new ActiveMQQueue(dlqName);
279 deadMessage.setJMSDestination(destination);
280 deadMessage.setDispatchedFromDLQ(true);
281
282 // set the expiration of the dead letter message
283 long expiration = 0L;
284 long timeStamp = System.currentTimeMillis();
285 if (message.getJMSActiveMQDestination().isTopic()) {
286 if (deadLetterTopicTTL > 0) {
287 expiration = deadLetterTopicTTL + timeStamp;
288 }
289 } else {
290 if (deadLetterQueueTTL > 0) {
291 expiration = deadLetterQueueTTL + timeStamp;
292 }
293 }
294 deadMessage.setJMSExpiration(expiration);
295 deadMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
296
297 return deadMessage;
298 }
299 }