Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/service/DeadLetterPolicy.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.service;
20  import javax.jms.JMSException;
21  import javax.jms.DeliveryMode;
22  import org.apache.commons.logging.*;
23  import org.activemq.broker.BrokerContainer;
24  import org.activemq.broker.Broker;
25  import org.activemq.message.ActiveMQDestination;
26  import org.activemq.message.ActiveMQMessage;
27  import org.activemq.message.ActiveMQQueue;
28  import org.activemq.store.PersistenceAdapter;
29  import org.activemq.util.IdGenerator;
30  
31  /**
32   * Determines how messages are stored in a dead letter queue
33   * 
34   * @version $Revision: 1.1.1.1 $
35   */
36  public class DeadLetterPolicy {
37      /**
38       * Prefix used by dead letter queues
39       */
40      public static final String DEAD_LETTER_PREFIX = "org.activemq.deadletter.";
41      private static final String DEFAULT_DEAD_LETTER_NAME = "DLQ";    
42      private static final Log log = LogFactory.getLog(DeadLetterPolicy.class);
43      private Broker broker;
44      private String deadLetterPrefix = DEAD_LETTER_PREFIX;
45      private String deadLetterName = DEFAULT_DEAD_LETTER_NAME;
46      private boolean deadLetterEnabled = true;
47      private boolean deadLetterPerDestinationName = true;
48      private boolean storeNonPersistentMessages = true;
49      private boolean noTopicConsumerEnabled = true;
50      private boolean allowDuplicates = false;
51      private boolean useDatabaseLocking = false;
52      private long deadLetterQueueTTL = 0L;
53      private long deadLetterTopicTTL = 0L;
54      private IdGenerator idGenerator = new IdGenerator();
55     
56      /**
57       * Construct a dead letter policy
58       * 
59       * @param broker
60       */
61      public DeadLetterPolicy(Broker broker) {
62          this.broker = broker;
63      }
64      
65      public DeadLetterPolicy(BrokerContainer brokerContainer) {
66        this(brokerContainer.getBroker());
67      }
68  
69      /**
70       * Default constructor
71       */
72      public DeadLetterPolicy() {
73      }
74  
75      /**
76       * @return Returns the broker.
77       */
78      public Broker getBroker() {
79          return broker;
80      }
81  
82      /**
83       * @param broker The broker to set.
84       */
85      public void setBroker(Broker broker) {
86          this.broker = broker;
87      }
88  
89      /**
90       * @return Returns the deadLetterEnabled.
91       */
92      public boolean isDeadLetterEnabled() {
93          return deadLetterEnabled;
94      }
95  
96      /**
97       * @param deadLetterEnabled The deadLetterEnabled to set.
98       */
99      public void setDeadLetterEnabled(boolean deadLetterEnabled) {
100         this.deadLetterEnabled = deadLetterEnabled;
101     }
102 
103     /**
104      * @return Returns the deadLetterPerDestinationName.
105      */
106     public boolean isDeadLetterPerDestinationName() {
107         return deadLetterPerDestinationName;
108     }
109 
110     /**
111      * @param deadLetterPerDestinationName The deadLetterPerDestinationName to set.
112      */
113     public void setDeadLetterPerDestinationName(boolean deadLetterPerDestinationName) {
114         this.deadLetterPerDestinationName = deadLetterPerDestinationName;
115     }
116 
117     /**
118      * @return Returns the deadLetterName.
119      */
120     public String getDeadLetterName() {
121         return deadLetterName;
122     }
123 
124     /**
125      * @param deadLetterName The deadLetterName to set.
126      */
127     public void setDeadLetterName(String deadLetterName) {
128         this.deadLetterName = deadLetterName;
129     }
130 
131     /**
132      * @return Returns the deadLetterPrefix.
133      */
134     public String getDeadLetterPrefix() {
135         return deadLetterPrefix;
136     }
137 
138     /**
139      * @param deadLetterPrefix The deadLetterPrefix to set.
140      */
141     public void setDeadLetterPrefix(String deadLetterPrefix) {
142         this.deadLetterPrefix = deadLetterPrefix;
143     }
144 
145     /**
146      * @return Returns the storeNonPersistentMessages.
147      */
148     public boolean isStoreNonPersistentMessages() {
149         return storeNonPersistentMessages;
150     }
151 
152     /**
153      * @param storeNonPersistentMessages The storeNonPersistentMessages to set.
154      */
155     public void setStoreNonPersistentMessages(boolean storeNonPersistentMessages) {
156         this.storeNonPersistentMessages = storeNonPersistentMessages;
157     }
158     
159     /**
160      * @return Returns the noTopicConsumerEnabled.
161      */
162     public boolean isNoTopicConsumerEnabled() {
163         return noTopicConsumerEnabled;
164     }
165     /**
166      * @param noTopicConsumerEnabled The noTopicConsumerEnabled to set.
167      */
168     public void setNoTopicConsumerEnabled(boolean noTopicConsumerEnabled) {
169         this.noTopicConsumerEnabled = noTopicConsumerEnabled;
170     }
171     
172     /**
173    * @return Returns the allowDuplicates.
174    */
175   public boolean isAllowDuplicates() {
176     return allowDuplicates;
177   }
178   /**
179    * @param allowDuplicates The allowDuplicates to set.
180    */
181   public void setAllowDuplicates(boolean allowDuplicates) {
182     this.allowDuplicates = allowDuplicates;
183   }
184   /**
185    * @return Returns the useDatabaseLocking.
186    */
187   public boolean isUseDatabaseLocking() {
188     return useDatabaseLocking;
189   }  
190   /**
191    * @param useDatabaseLocking The useDatabaseLocking to set.
192    */
193   public void setUseDatabaseLocking(boolean useDatabaseLocking) {
194     this.useDatabaseLocking = useDatabaseLocking;
195   }
196   /**
197    * @param deadLetterQueueTTL The deadLetterQueueTTL to set.
198    */
199   public void setDeadLetterQueueTTL(long deadLetterQueueTTL) {
200     this.deadLetterQueueTTL = deadLetterQueueTTL;
201   }
202   /**
203    * @param deadLetterTopicTTL The deadLetterTopicTTL to set.
204    */
205   public void setDeadLetterTopicTTL(long deadLetterTopicTTL) {
206     this.deadLetterTopicTTL = deadLetterTopicTTL;
207   }
208   /**
209      * Get the name of the DLQ from the destination provided
210      * @param destination
211      * @return the name of the DLQ for this Destination
212      */
213     public String getDeadLetterNameFromDestination(ActiveMQDestination destination){
214         String deadLetterName = deadLetterPrefix;
215         if (deadLetterPerDestinationName) {
216             deadLetterName += destination.getPhysicalName();
217         }
218         else {
219             deadLetterName += this.deadLetterName;
220         }
221         return deadLetterName;
222     }
223 
224     /**
225      * Send a message to a dead letter queue
226      * 
227      * @param message
228      * @throws JMSException
229      */
230     public void sendToDeadLetter(ActiveMQMessage message) {
231         if (deadLetterEnabled && message != null && (message.isPersistent() || storeNonPersistentMessages) && !message.isDispatchedFromDLQ()) {
232             if (broker != null) {
233               // process duplicates
234               if (!isAllowDuplicates()) {
235                   PersistenceAdapter persistenceAdapter = getBroker().getPersistenceAdapter();
236                   // make sure no previous dead letter was already sent
237                   if (persistenceAdapter!=null
238                       && message.getJMSMessageIdentity()!=null
239               && message.getJMSMessageIdentity().getSequenceNumber()!=null
240               && persistenceAdapter.deadLetterAlreadySent(((Long)message.getJMSMessageIdentity().getSequenceNumber()).longValue(), isUseDatabaseLocking())) {
241                     if (log.isDebugEnabled()) log.debug("Dead letter has been already sent for this message: " + message.getJMSMessageID());
242                     return;
243                   }
244               }
245 
246                 // send a dead letter message
247                 String dlqName = getDeadLetterNameFromDestination(message.getJMSActiveMQDestination());
248                 try {
249                 ActiveMQMessage deadMessage = createDeadLetterMessage(dlqName, message);
250                   broker.sendToDeadLetterQueue(dlqName, deadMessage);
251                   if (log.isDebugEnabled()) log.debug("Passed message: " + deadMessage + " to DLQ: " + dlqName);
252                 } catch (JMSException e) {
253                   log.warn("Failed to send message to dead letter due to: " + e, e);
254                 }
255             }
256             else {
257                 log.warn("Broker is not initialized - cannot add to DLQ: " + message);
258             }
259         }else if (log.isDebugEnabled()){
260             log.debug("DLQ not storing message: " + message);
261         }
262     }
263     
264     protected ActiveMQMessage createDeadLetterMessage(String dlqName, ActiveMQMessage message) throws JMSException {
265       // make a shallow copy of the orginal message
266       ActiveMQMessage deadMessage = message.shallowCopy();
267       
268         // generate a new producer and message ID
269         String id = idGenerator.generateId();
270         String producerKey = IdGenerator.getSeedFromId(id);
271         long seq = IdGenerator.getCountFromId(id);
272         deadMessage.setProducerKey(producerKey);
273         deadMessage.setJMSMessageID(id);
274         deadMessage.setSequenceNumber(seq);
275         deadMessage.getJMSMessageIdentity().setMessageID(id);
276         deadMessage.getJMSMessageIdentity().setSequenceNumber(new Long(seq));
277 
278         ActiveMQQueue destination = new ActiveMQQueue(dlqName);
279         deadMessage.setJMSDestination(destination);
280         deadMessage.setDispatchedFromDLQ(true);
281         
282         // set the expiration of the dead letter message
283         long expiration = 0L;
284         long timeStamp = System.currentTimeMillis();
285         if (message.getJMSActiveMQDestination().isTopic()) {
286           if (deadLetterTopicTTL > 0) {
287             expiration = deadLetterTopicTTL + timeStamp;
288           }
289         } else {
290           if (deadLetterQueueTTL > 0) {
291             expiration = deadLetterQueueTTL + timeStamp;
292           }
293         }
294         deadMessage.setJMSExpiration(expiration);
295         deadMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
296         
297         return deadMessage;
298     }
299 }