| Method from org.jboss.ejb.plugins.jms.DLQHandler Detail: |
protected void createService() throws Exception {
Context ctx = providerAdapter.getInitialContext();
try {
String factoryName = providerAdapter.getQueueFactoryRef();
QueueConnectionFactory factory = (QueueConnectionFactory)
ctx.lookup(factoryName);
log.debug("Using factory: " + factory);
if (dlqUser == null)
connection = factory.createQueueConnection();
else
connection = factory.createQueueConnection(dlqUser, dlqPass);
log.debug("Created connection: " + connection);
dlq = (Queue)ctx.lookup(destinationJNDI);
log.debug("Using Queue: " + dlq);
}
catch (Exception e)
{
if (e instanceof JMSException)
throw e;
else
{
JMSException x = new JMSException("Error creating the dlq connection: " + e.getMessage());
x.setLinkedException(e);
throw x;
}
}
finally {
ctx.close();
}
}
|
protected void deleteFromBuffer(String id) {
resentBuffer.remove(id);
}
Delete the entry in the message counter buffer for specifyed JMS id. |
protected void destroyService() throws Exception {
// Help the GC
if (connection != null)
connection.close();
connection = null;
dlq = null;
providerAdapter = null;
}
|
public boolean handleRedeliveredMessage(Message msg,
Transaction tx) {
boolean handled = false;
int max = this.maxResent;
String id = null;
boolean jbossmq = true;
int count = 0;
try
{
if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT);
if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
else
{
id = msg.getJMSMessageID();
if (id == null)
{
// if we can't get the id we are basically fucked
log.error("Message id is null, can't handle message");
return false;
}
count = incrementResentCount(id);
jbossmq = false;
}
if (count > max)
{
id = msg.getJMSMessageID();
log.warn("Message resent too many times; sending it to DLQ; message id=" + id);
sendMessage(msg);
deleteFromBuffer(id);
handled = true;
}
else if (jbossmq == false && tx != null)
{
// Register a synchronization to remove the buffer entry
// should the transaction commit
DLQSynchronization synch = new DLQSynchronization(id);
try
{
tx.registerSynchronization(synch);
}
catch (Exception e)
{
log.warn("Error registering DlQ Synchronization with transaction " + tx, e);
}
}
}
catch (JMSException e)
{
// If we can't send it ahead, we do not dare to just drop it...or?
log.error("Could not send message to Dead Letter Queue", e);
}
return handled;
}
Check if a message has been redelivered to many times.
If message has been redelivered to many times, send it to the
dead letter queue (default to queue/DLQ). |
public void importXml(Element element) throws DeploymentException {
destinationJNDI = MetaData.getElementContent
(MetaData.getUniqueChild(element, "DestinationQueue"));
try
{
String mr = MetaData.getElementContent
(MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
maxResent = Integer.parseInt(mr);
}
catch (Exception ignore) {}
try {
String ttl = MetaData.getElementContent
(MetaData.getUniqueChild(element, "TimeToLive"));
timeToLive = Long.parseLong(ttl);
if (timeToLive < 0) {
log.warn("Invalid TimeToLive: " + timeToLive + "; using default");
timeToLive = Message.DEFAULT_TIME_TO_LIVE;
}
}
catch (Exception ignore) {}
dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQUser"));
dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQPassword"));
}
Takes an MDBConfig Element |
protected int incrementResentCount(String id) {
BufferEntry entry = null;
boolean trace = log.isTraceEnabled();
if(!resentBuffer.containsKey(id))
{
if (trace)
log.trace("Making new entry for id " + id);
entry = new BufferEntry();
entry.id = id;
entry.count = 1;
resentBuffer.put(id,entry);
} else
{
entry = (BufferEntry)resentBuffer.get(id);
entry.count++;
if (trace)
log.trace("Incremented old entry for id " + id + " count " + entry.count);
}
return entry.count;
}
Increment the counter for the specific JMS message id. |
protected Message makeWritable(Message msg,
boolean trace) throws JMSException {
HashMap tmp = new HashMap();
// Save properties
for (Enumeration en=msg.getPropertyNames(); en.hasMoreElements();)
{
String key = (String)en.nextElement();
tmp.put(key, msg.getObjectProperty(key));
}
// Make them writable
msg.clearProperties();
Iterator i = tmp.entrySet().iterator();
while (i.hasNext())
{
Map.Entry me = (Map.Entry)i.next();
String key = (String) me.getKey();
try
{
msg.setObjectProperty(key, me.getValue());
}
catch (JMSException ignored)
{
if (trace)
log.trace("Could not copy message property " + key, ignored);
}
}
return msg;
}
Make the Message properties writable. |
protected void sendMessage(Message msg) throws JMSException {
boolean trace = log.isTraceEnabled();
QueueSession session = null;
QueueSender sender = null;
try
{
msg = makeWritable(msg, trace); // Don't know yet if we are gona clone or not
// Set the properties
msg.setStringProperty(JBOSS_ORIG_MESSAGEID,
msg.getJMSMessageID());
msg.setStringProperty(JBOSS_ORIG_DESTINATION,
msg.getJMSDestination().toString());
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
sender = session.createSender(dlq);
if (trace) {
log.trace("Sending message to DLQ; destination=" +
dlq + ", session=" + session + ", sender=" + sender);
}
sender.send(msg, deliveryMode, priority, timeToLive);
if (trace) {
log.trace("Message sent.");
}
}
finally
{
try
{
if (sender != null) sender.close();
if (session != null) session.close();
}
catch(Exception e)
{
log.warn("Failed to close sender or session; ignoring", e);
}
}
}
Send message to the configured dead letter queue, defaults to queue/DLQ. |
public String toString() {
return super.toString() +
"{ destinationJNDI=" + destinationJNDI +
", maxResent=" + maxResent +
", timeToLive=" + timeToLive +
" }";
}
|