1 /***************************************
2 * *
3 * JBoss: The OpenSource J2EE WebOS *
4 * *
5 * Distributable under LGPL license. *
6 * See terms of license at gnu.org. *
7 * *
8 ***************************************/
9
10 package org.jboss.ejb.plugins.jms;
11
12 import java.util.Hashtable;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Enumeration;
16 import java.util.Iterator;
17
18 import javax.naming.InitialContext;
19 import javax.naming.Context;
20 import javax.jms.Session;
21 import javax.jms.QueueConnection;
22 import javax.jms.QueueConnectionFactory;
23 import javax.jms.QueueSession;
24 import javax.jms.QueueSender;
25 import javax.jms.Queue;
26 import javax.jms.Message;
27 import javax.jms.JMSException;
28 import javax.transaction.Status;
29 import javax.transaction.Synchronization;
30 import javax.transaction.Transaction;
31
32 import org.w3c.dom.Element;
33
34 import org.jboss.logging.Logger;
35 import org.jboss.deployment.DeploymentException;
36 import org.jboss.metadata.MetaData;
37 import org.jboss.jms.jndi.JMSProviderAdapter;
38 import org.jboss.system.ServiceMBeanSupport;
39
40 /**
41 * Places redeliveded messages on a Dead Letter Queue.
42 *
43 *<p>
44 *The Dead Letter Queue handler is used to not set JBoss in an endles loop
45 * when a message is resent on and on due to transaction rollback for
46 * message receipt.
47 *
48 * <p>
49 * It sends message to a dead letter queue (configurable, defaults to
50 * queue/DLQ) when the message has been resent a configurable amount of times,
51 * defaults to 10.
52 *
53 * <p>
54 * The handler is configured through the element MDBConfig in
55 * container-invoker-conf.
56 *
57 * <p>
58 * The JMS property JBOSS_ORIG_DESTINATION in the resent message is set
59 * to the name of the original destination (Destionation.toString()).
60 *
61 * <p>
62 * The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set
63 * to the id of the original message.
64 *
65 * Created: Thu Aug 23 21:17:26 2001
66 *
67 * @version <tt>$Revision: 1.11.2.9 $</tt>
68 * @author ???
69 * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
70 */
71 public class DLQHandler
72 extends ServiceMBeanSupport
73 {
74 /** JMS property name holding original destination. */
75 public static final String JBOSS_ORIG_DESTINATION ="JBOSS_ORIG_DESTINATION";
76
77 /** JMS property name holding original JMS message id. */
78 public static final String JBOSS_ORIG_MESSAGEID="JBOSS_ORIG_MESSAGEID";
79
80 /** Properties copied from org.jboss.mq.SpyMessage */
81 private static final String JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT";
82 private static final String JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT";
83
84 // Configuratable stuff
85
86 /**
87 * Destination to send dead letters to.
88 *
89 * <p>
90 * Defaults to <em>queue/DLQ</em>, configurable through
91 * <tt>DestinationQueue</tt> element.
92 */
93 private String destinationJNDI = "queue/DLQ";
94
95 /**
96 * Maximum times a message is alowed to be resent.
97 *
98 * <p>Defaults to <em>10</em>, configurable through
99 * <tt>MaxTimesRedelivered</tt> element.
100 */
101 private int maxResent = 10;
102
103 /**
104 * Time to live for the message.
105 *
106 * <p>
107 * Defaults to <em>{@link Message#DEFAULT_TIME_TO_LIVE}</em>,
108 * configurable through the <tt>TimeToLive</tt> element.
109 */
110 private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
111
112 // May become configurable
113
114 /** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
115 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
116
117 /** Priority for the message, Message.DEFAULT_PRIORITY */
118 private int priority = Message.DEFAULT_PRIORITY;
119
120 /** The dlq user for the connection */
121 private String dlqUser;
122
123 /** The dlq password for the connection */
124 private String dlqPass;
125
126 // Private stuff
127 private QueueConnection connection;
128 private Queue dlq;
129 private JMSProviderAdapter providerAdapter;
130 private Hashtable resentBuffer = new Hashtable();
131
132 public DLQHandler(final JMSProviderAdapter providerAdapter)
133 {
134 this.providerAdapter = providerAdapter;
135 }
136
137 //--- Service
138
139 /**
140 * Initalize the service.
141 *
142 * @throws Exception Service failed to initalize.
143 */
144 protected void createService() throws Exception
145 {
146 Context ctx = providerAdapter.getInitialContext();
147
148 try {
149 String factoryName = providerAdapter.getQueueFactoryRef();
150 QueueConnectionFactory factory = (QueueConnectionFactory)
151 ctx.lookup(factoryName);
152 log.debug("Using factory: " + factory);
153
154 if (dlqUser == null)
155 connection = factory.createQueueConnection();
156 else
157 connection = factory.createQueueConnection(dlqUser, dlqPass);
158 log.debug("Created connection: " + connection);
159
160 dlq = (Queue)ctx.lookup(destinationJNDI);
161 log.debug("Using Queue: " + dlq);
162 }
163 catch (Exception e)
164 {
165 if (e instanceof JMSException)
166 throw e;
167 else
168 {
169 JMSException x = new JMSException("Error creating the dlq connection: " + e.getMessage());
170 x.setLinkedException(e);
171 throw x;
172 }
173 }
174 finally {
175 ctx.close();
176 }
177 }
178
179 protected void destroyService() throws Exception
180 {
181 // Help the GC
182 if (connection != null)
183 connection.close();
184 connection = null;
185 dlq = null;
186 providerAdapter = null;
187 }
188
189 //--- Logic
190
191 /**
192 * Check if a message has been redelivered to many times.
193 *
194 * If message has been redelivered to many times, send it to the
195 * dead letter queue (default to queue/DLQ).
196 *
197 * @return true if message is handled (i.e resent), false if not.
198 */
199 public boolean handleRedeliveredMessage(final Message msg, final Transaction tx)
200 {
201 boolean handled = false;
202 int max = this.maxResent;
203 String id = null;
204 boolean jbossmq = true;
205 int count = 0;
206
207 try
208 {
209
210 if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
211 max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT);
212
213 if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
214 count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
215 else
216 {
217 id = msg.getJMSMessageID();
218 if (id == null)
219 {
220 // if we can't get the id we are basically fucked
221 log.error("Message id is null, can't handle message");
222 return false;
223 }
224 count = incrementResentCount(id);
225 jbossmq = false;
226 }
227
228 if (count > max)
229 {
230 id = msg.getJMSMessageID();
231 log.warn("Message resent too many times; sending it to DLQ; message id=" + id);
232
233 sendMessage(msg);
234 deleteFromBuffer(id);
235
236 handled = true;
237 }
238 else if (jbossmq == false && tx != null)
239 {
240 // Register a synchronization to remove the buffer entry
241 // should the transaction commit
242 DLQSynchronization synch = new DLQSynchronization(id);
243 try
244 {
245 tx.registerSynchronization(synch);
246 }
247 catch (Exception e)
248 {
249 log.warn("Error registering DlQ Synchronization with transaction " + tx, e);
250 }
251 }
252 }
253 catch (JMSException e)
254 {
255 // If we can't send it ahead, we do not dare to just drop it...or?
256 log.error("Could not send message to Dead Letter Queue", e);
257 }
258
259 return handled;
260 }
261
262 /**
263 * Send message to the configured dead letter queue, defaults to queue/DLQ.
264 */
265 protected void sendMessage(Message msg) throws JMSException
266 {
267 boolean trace = log.isTraceEnabled();
268
269 QueueSession session = null;
270 QueueSender sender = null;
271
272 try
273 {
274 msg = makeWritable(msg, trace); // Don't know yet if we are gona clone or not
275
276 // Set the properties
277 msg.setStringProperty(JBOSS_ORIG_MESSAGEID,
278 msg.getJMSMessageID());
279 msg.setStringProperty(JBOSS_ORIG_DESTINATION,
280 msg.getJMSDestination().toString());
281
282 session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
283 sender = session.createSender(dlq);
284 if (trace) {
285 log.trace("Sending message to DLQ; destination=" +
286 dlq + ", session=" + session + ", sender=" + sender);
287 }
288
289 sender.send(msg, deliveryMode, priority, timeToLive);
290
291 if (trace) {
292 log.trace("Message sent.");
293 }
294
295 }
296 finally
297 {
298 try
299 {
300 if (sender != null) sender.close();
301 if (session != null) session.close();
302 }
303 catch(Exception e)
304 {
305 log.warn("Failed to close sender or session; ignoring", e);
306 }
307 }
308 }
309
310 /**
311 * Increment the counter for the specific JMS message id.
312 *
313 * @return the new counter value.
314 */
315 protected int incrementResentCount(String id)
316 {
317 BufferEntry entry = null;
318 boolean trace = log.isTraceEnabled();
319 if(!resentBuffer.containsKey(id))
320 {
321 if (trace)
322 log.trace("Making new entry for id " + id);
323 entry = new BufferEntry();
324 entry.id = id;
325 entry.count = 1;
326 resentBuffer.put(id,entry);
327 } else
328 {
329 entry = (BufferEntry)resentBuffer.get(id);
330 entry.count++;
331 if (trace)
332 log.trace("Incremented old entry for id " + id + " count " + entry.count);
333 }
334 return entry.count;
335 }
336
337 /**
338 * Delete the entry in the message counter buffer for specifyed JMS id.
339 */
340 protected void deleteFromBuffer(String id)
341 {
342 resentBuffer.remove(id);
343 }
344
345 /**
346 * Make the Message properties writable.
347 *
348 * @return the writable message.
349 */
350 protected Message makeWritable(Message msg, boolean trace) throws JMSException
351 {
352 HashMap tmp = new HashMap();
353
354 // Save properties
355 for (Enumeration en=msg.getPropertyNames(); en.hasMoreElements();)
356 {
357 String key = (String)en.nextElement();
358 tmp.put(key, msg.getObjectProperty(key));
359 }
360
361 // Make them writable
362 msg.clearProperties();
363
364 Iterator i = tmp.entrySet().iterator();
365 while (i.hasNext())
366 {
367 Map.Entry me = (Map.Entry)i.next();
368 String key = (String) me.getKey();
369 try
370 {
371 msg.setObjectProperty(key, me.getValue());
372 }
373 catch (JMSException ignored)
374 {
375 if (trace)
376 log.trace("Could not copy message property " + key, ignored);
377 }
378 }
379
380 return msg;
381 }
382
383 /**
384 * Takes an MDBConfig Element
385 */
386 public void importXml(final Element element) throws DeploymentException
387 {
388 destinationJNDI = MetaData.getElementContent
389 (MetaData.getUniqueChild(element, "DestinationQueue"));
390
391 try
392 {
393 String mr = MetaData.getElementContent
394 (MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
395 maxResent = Integer.parseInt(mr);
396 }
397 catch (Exception ignore) {}
398
399 try {
400 String ttl = MetaData.getElementContent
401 (MetaData.getUniqueChild(element, "TimeToLive"));
402 timeToLive = Long.parseLong(ttl);
403
404 if (timeToLive < 0) {
405 log.warn("Invalid TimeToLive: " + timeToLive + "; using default");
406 timeToLive = Message.DEFAULT_TIME_TO_LIVE;
407 }
408 }
409 catch (Exception ignore) {}
410
411 dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQUser"));
412 dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQPassword"));
413 }
414
415 public String toString()
416 {
417 return super.toString() +
418 "{ destinationJNDI=" + destinationJNDI +
419 ", maxResent=" + maxResent +
420 ", timeToLive=" + timeToLive +
421 " }";
422 }
423
424 private static class BufferEntry
425 {
426 int count;
427 String id;
428 }
429
430 /**
431 * Remove a redelivered message from the DLQ's buffer when it is acknowledged
432 */
433 protected class DLQSynchronization
434 implements Synchronization
435 {
436 /** The message id */
437 String id;
438
439 public DLQSynchronization(String id)
440 {
441 this.id = id;
442 }
443
444 public void beforeCompletion()
445 {
446 }
447
448 /**
449 * Forget the message when the transaction commits
450 */
451 public void afterCompletion(int status)
452 {
453 if (status == Status.STATUS_COMMITTED)
454 deleteFromBuffer(id);
455 }
456 }
457 }