1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.resource.adapter.jms.inflow;
23
24 import java.lang.reflect.Method;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import javax.jms.Connection;
28 import javax.jms.Destination;
29 import javax.jms.ExceptionListener;
30 import javax.jms.JMSException;
31 import javax.jms.Message;
32 import javax.jms.MessageListener;
33 import javax.jms.Queue;
34 import javax.jms.QueueConnection;
35 import javax.jms.QueueConnectionFactory;
36 import javax.jms.Topic;
37 import javax.jms.TopicConnection;
38 import javax.jms.TopicConnectionFactory;
39 import javax.jms.XAQueueConnectionFactory;
40 import javax.jms.XATopicConnectionFactory;
41 import javax.management.Notification;
42 import javax.naming.Context;
43 import javax.resource.ResourceException;
44 import javax.resource.spi.endpoint.MessageEndpointFactory;
45 import javax.resource.spi.work.Work;
46 import javax.resource.spi.work.WorkManager;
47 import javax.transaction.TransactionManager;
48
49 import org.jboss.jms.jndi.JMSProviderAdapter;
50 import org.jboss.logging.Logger;
51 import org.jboss.mx.util.JBossNotificationBroadcasterSupport;
52 import org.jboss.resource.adapter.jms.JmsResourceAdapter;
53 import org.jboss.tm.TransactionManagerLocator;
54 import org.jboss.util.Strings;
55 import org.jboss.util.naming.Util;
56
57 /**
58 * A generic jms Activation.
59 *
60 * @author <a href="adrian@jboss.com">Adrian Brock</a>
61 * @version $Revision: 71554 $
62 */
63 public class JmsActivation implements ExceptionListener
64 {
65 /** The log */
66 private static final Logger log = Logger.getLogger(JmsActivation.class);
67
68 /** Notification sent before connectioning */
69 private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING";
70
71 /** Notification sent after connection */
72 private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED";
73
74 /** Notification sent before disconnection */
75 private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING";
76
77 /** Notification sent before disconnected */
78 private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED";
79
80 /** Notification sent at connection failure */
81 private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE";
82
83 /** The onMessage method */
84 public static final Method ONMESSAGE;
85
86 /** The resource adapter */
87 protected JmsResourceAdapter ra;
88
89 /** The activation spec */
90 protected JmsActivationSpec spec;
91
92 /** The message endpoint factory */
93 protected MessageEndpointFactory endpointFactory;
94
95 /** The notification emitter */
96 protected JBossNotificationBroadcasterSupport emitter;
97
98 /** Whether delivery is active */
99 protected AtomicBoolean deliveryActive = new AtomicBoolean(false);
100
101 // Whether we are in the failure recovery loop
102 private AtomicBoolean inFailure = new AtomicBoolean(false);
103
104 /** The jms provider adapter */
105 protected JMSProviderAdapter adapter;
106
107 /** The destination */
108 protected Destination destination;
109
110 /** The connection */
111 protected Connection connection;
112
113 /** The server session pool */
114 protected JmsServerSessionPool pool;
115
116 /** Is the delivery transacted */
117 protected boolean isDeliveryTransacted;
118
119 /** The DLQ handler */
120 protected DLQHandler dlqHandler;
121
122 /** The TransactionManager */
123 protected TransactionManager tm;
124
125
126 static
127 {
128 try
129 {
130 ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
131 }
132 catch (Exception e)
133 {
134 throw new RuntimeException(e);
135 }
136 }
137
138 public JmsActivation(JmsResourceAdapter ra, MessageEndpointFactory endpointFactory, JmsActivationSpec spec) throws ResourceException
139 {
140 this.ra = ra;
141 this.endpointFactory = endpointFactory;
142 this.spec = spec;
143 try
144 {
145 this.isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ONMESSAGE);
146 }
147 catch (Exception e)
148 {
149 throw new ResourceException(e);
150 }
151 if (endpointFactory instanceof JBossNotificationBroadcasterSupport)
152 emitter = (JBossNotificationBroadcasterSupport) endpointFactory;
153 }
154
155 /**
156 * @return the activation spec
157 */
158 public JmsActivationSpec getActivationSpec()
159 {
160 return spec;
161 }
162
163 /**
164 * @return the message endpoint factory
165 */
166 public MessageEndpointFactory getMessageEndpointFactory()
167 {
168 return endpointFactory;
169 }
170
171 /**
172 * @return whether delivery is transacted
173 */
174 public boolean isDeliveryTransacted()
175 {
176 return isDeliveryTransacted;
177 }
178
179 /**
180 * @return the work manager
181 */
182 public WorkManager getWorkManager()
183 {
184 return ra.getWorkManager();
185 }
186
187 public TransactionManager getTransactionManager()
188 {
189 if (tm == null)
190 tm = TransactionManagerLocator.locateTransactionManager();
191 return tm;
192 }
193
194 /**
195 * @return the connection
196 */
197 public Connection getConnection()
198 {
199 return connection;
200 }
201
202 /**
203 * @return the destination
204 */
205 public Destination getDestination()
206 {
207 return destination;
208 }
209
210 /**
211 * @return the provider adapter
212 */
213 public JMSProviderAdapter getProviderAdapter()
214 {
215 return adapter;
216 }
217
218 /**
219 * @return the dlq handler
220 */
221 public DLQHandler getDLQHandler()
222 {
223 return dlqHandler;
224 }
225
226 /**
227 * Start the activation
228 *
229 * @throws ResourceException for any error
230 */
231 public void start() throws ResourceException
232 {
233 deliveryActive.set(true);
234 ra.getWorkManager().scheduleWork(new SetupActivation());
235 }
236
237 /**
238 * Stop the activation
239 */
240 public void stop()
241 {
242 deliveryActive.set(false);
243 teardown();
244 }
245
246 /**
247 * Handles any failure by trying to reconnect
248 *
249 * @param failure the reason for the failure
250 */
251 public void handleFailure(Throwable failure)
252 {
253 log.warn("Failure in jms activation " + spec, failure);
254 int reconnectCount = 0;
255
256 // Only enter the failure loop once
257 if (inFailure.getAndSet(true))
258 return;
259 try
260 {
261 while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
262 {
263 teardown();
264
265 sendNotification(FAILURE_NOTIFICATION, failure);
266
267 try
268 {
269 Thread.sleep(spec.getReconnectIntervalLong());
270 }
271 catch (InterruptedException e)
272 {
273 log.debug("Interrupted trying to reconnect " + spec, e);
274 break;
275 }
276
277 log.info("Attempting to reconnect " + spec);
278 try
279 {
280 setup();
281 log.info("Reconnected with messaging provider.");
282 break;
283 }
284 catch (Throwable t)
285 {
286 log.error("Unable to reconnect " + spec, t);
287 }
288 ++reconnectCount;
289 }
290 }
291 finally
292 {
293 // Leaving failure recovery loop
294 inFailure.set(false);
295 }
296 }
297
298 public void onException(JMSException exception)
299 {
300 handleFailure(exception);
301 }
302
303 public String toString()
304 {
305 StringBuffer buffer = new StringBuffer();
306 buffer.append(Strings.defaultToString(this)).append('(');
307 buffer.append("spec=").append(Strings.defaultToString(spec));
308 buffer.append(" mepf=").append(Strings.defaultToString(endpointFactory));
309 buffer.append(" active=").append(deliveryActive.get());
310 if (destination != null)
311 buffer.append(" destination=").append(destination);
312 if (connection != null)
313 buffer.append(" connection=").append(connection);
314 if (pool != null)
315 buffer.append(" pool=").append(Strings.defaultToString(pool));
316 if (dlqHandler != null)
317 buffer.append(" dlq=").append(Strings.defaultToString(dlqHandler));
318 buffer.append(" transacted=").append(isDeliveryTransacted);
319 buffer.append(')');
320 return buffer.toString();
321 }
322
323 /**
324 * Setup the activation
325 *
326 * @throws Exception for any error
327 */
328 protected void setup() throws Exception
329 {
330 log.debug("Setting up " + spec);
331
332 sendNotification(CONNECTING_NOTIFICATION, null);
333
334 setupJMSProviderAdapter();
335 Context ctx = adapter.getInitialContext();
336 log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
337 try
338 {
339 setupDLQ(ctx);
340 setupDestination(ctx);
341 setupConnection(ctx);
342 }
343 finally
344 {
345 ctx.close();
346 }
347 setupSessionPool();
348
349 log.debug("Setup complete " + this);
350
351 sendNotification(CONNECTED_NOTIFICATION, null);
352 }
353
354 /**
355 * Teardown the activation
356 */
357 protected void teardown()
358 {
359 log.debug("Tearing down " + spec);
360
361 sendNotification(DISCONNECTING_NOTIFICATION, null);
362
363 teardownSessionPool();
364 teardownConnection();
365 teardownDestination();
366 teardownDLQ();
367
368 log.debug("Tearing down complete " + this);
369
370 sendNotification(DISCONNECTED_NOTIFICATION, null);
371 }
372
373 /**
374 * Get the jms provider
375 *
376 * @throws Exception for any error
377 */
378 protected void setupJMSProviderAdapter() throws Exception
379 {
380 String providerAdapterJNDI = spec.getProviderAdapterJNDI();
381 if (providerAdapterJNDI.startsWith("java:") == false)
382 providerAdapterJNDI = "java:" + providerAdapterJNDI;
383
384 log.debug("Retrieving the jms provider adapter " + providerAdapterJNDI + " for " + this);
385 adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI, JMSProviderAdapter.class);
386 log.debug("Using jms provider adapter " + adapter + " for " + this);
387 }
388
389 /**
390 * Setup the DLQ
391 *
392 * @param ctx the naming context
393 * @throws Exception for any error
394 */
395 protected void setupDLQ(Context ctx) throws Exception
396 {
397 if (spec.isUseDLQ())
398 {
399 Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
400 dlqHandler = (DLQHandler) clazz.newInstance();
401 dlqHandler.setup(this, ctx);
402 }
403
404 log.debug("Setup DLQ " + this);
405 }
406
407 /**
408 * Teardown the DLQ
409 */
410 protected void teardownDLQ()
411 {
412 log.debug("Removing DLQ " + this);
413 try
414 {
415 if (dlqHandler != null)
416 dlqHandler.teardown();
417 }
418 catch (Throwable t)
419 {
420 log.debug("Error tearing down the DLQ " + dlqHandler, t);
421 }
422 dlqHandler = null;
423 }
424
425 /**
426 * Setup the Destination
427 *
428 * @param ctx the naming context
429 * @throws Exception for any error
430 */
431 protected void setupDestination(Context ctx) throws Exception
432 {
433 Class<?> destinationType;
434 if (spec.isTopic())
435 destinationType = Topic.class;
436 else
437 destinationType = Queue.class;
438
439 String destinationName = spec.getDestination();
440 log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
441 destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
442 log.debug("Got destination " + destination + " from " + destinationName);
443 }
444
445 /**
446 * Teardown the destination
447 */
448 protected void teardownDestination()
449 {
450 destination = null;
451 }
452
453 /**
454 * Setup the Connection
455 *
456 * @param ctx the naming context
457 * @throws Exception for any error
458 */
459 protected void setupConnection(Context ctx) throws Exception
460 {
461 log.debug("setup connection " + this);
462
463 String user = spec.getUser();
464 String pass = spec.getPassword();
465 String clientID = spec.getClientId();
466 if (spec.isTopic())
467 connection = setupTopicConnection(ctx, user, pass, clientID);
468 else
469 connection = setupQueueConnection(ctx, user, pass, clientID);
470
471 log.debug("established connection " + this);
472 }
473
474 /**
475 * Setup a Queue Connection
476 *
477 * @param ctx the naming context
478 * @param user the user
479 * @param pass the password
480 * @param clientID the client id
481 * @return the connection
482 * @throws Exception for any error
483 */
484 protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception
485 {
486 String queueFactoryRef = adapter.getQueueFactoryRef();
487 log.debug("Attempting to lookup queue connection factory " + queueFactoryRef);
488 QueueConnectionFactory qcf = (QueueConnectionFactory) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory.class);
489 log.debug("Got queue connection factory " + qcf + " from " + queueFactoryRef);
490 log.debug("Attempting to create queue connection with user " + user);
491 QueueConnection result;
492 if (qcf instanceof XAQueueConnectionFactory && isDeliveryTransacted)
493 {
494 XAQueueConnectionFactory xaqcf = (XAQueueConnectionFactory) qcf;
495 if (user != null)
496 result = xaqcf.createXAQueueConnection(user, pass);
497 else
498 result = xaqcf.createXAQueueConnection();
499 }
500 else
501 {
502 if (user != null)
503 result = qcf.createQueueConnection(user, pass);
504 else
505 result = qcf.createQueueConnection();
506 }
507 try
508 {
509 if (clientID != null)
510 result.setClientID(clientID);
511 result.setExceptionListener(this);
512 log.debug("Using queue connection " + result);
513 return result;
514 }
515 catch (Throwable t)
516 {
517 try
518 {
519 result.close();
520 }
521 catch (Exception e)
522 {
523 log.trace("Ignored error closing connection", e);
524 }
525 if (t instanceof Exception)
526 throw (Exception) t;
527 throw new RuntimeException("Error configuring connection", t);
528 }
529 }
530
531 /**
532 * Setup a Topic Connection
533 *
534 * @param ctx the naming context
535 * @param user the user
536 * @param pass the password
537 * @param clientID the client id
538 * @return the connection
539 * @throws Exception for any error
540 */
541 protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception
542 {
543 String topicFactoryRef = adapter.getTopicFactoryRef();
544 log.debug("Attempting to lookup topic connection factory " + topicFactoryRef);
545 TopicConnectionFactory tcf = (TopicConnectionFactory) Util.lookup(ctx, topicFactoryRef, TopicConnectionFactory.class);
546 log.debug("Got topic connection factory " + tcf + " from " + topicFactoryRef);
547 log.debug("Attempting to create topic connection with user " + user);
548 TopicConnection result;
549 if (tcf instanceof XATopicConnectionFactory && isDeliveryTransacted)
550 {
551 XATopicConnectionFactory xatcf = (XATopicConnectionFactory) tcf;
552 if (user != null)
553 result = xatcf.createXATopicConnection(user, pass);
554 else
555 result = xatcf.createXATopicConnection();
556 }
557 else
558 {
559 if (user != null)
560 result = tcf.createTopicConnection(user, pass);
561 else
562 result = tcf.createTopicConnection();
563 }
564 try
565 {
566 if (clientID != null)
567 result.setClientID(clientID);
568 result.setExceptionListener(this);
569 log.debug("Using topic connection " + result);
570 return result;
571 }
572 catch (Throwable t)
573 {
574 try
575 {
576 result.close();
577 }
578 catch (Exception e)
579 {
580 log.trace("Ignored error closing connection", e);
581 }
582 if (t instanceof Exception)
583 throw (Exception) t;
584 throw new RuntimeException("Error configuring connection", t);
585 }
586 }
587
588 /**
589 * Teardown the connection
590 */
591 protected void teardownConnection()
592 {
593 try
594 {
595 if (connection != null)
596 {
597 log.debug("Closing the " + connection);
598 connection.close();
599 }
600 }
601 catch (Throwable t)
602 {
603 log.debug("Error closing the connection " + connection, t);
604 }
605 connection = null;
606 }
607
608 /**
609 * Setup the server session pool
610 *
611 * @throws Exception for any error
612 */
613 protected void setupSessionPool() throws Exception
614 {
615 pool = new JmsServerSessionPool(this);
616 log.debug("Created session pool " + pool);
617
618 log.debug("Starting session pool " + pool);
619 pool.start();
620 log.debug("Started session pool " + pool);
621
622 log.debug("Starting delivery " + connection);
623 connection.start();
624 log.debug("Started delivery " + connection);
625 }
626
627 /**
628 * Teardown the server session pool
629 */
630 protected void teardownSessionPool()
631 {
632 try
633 {
634 if (connection != null)
635 {
636 log.debug("Stopping delivery " + connection);
637 connection.stop();
638 }
639 }
640 catch (Throwable t)
641 {
642 log.debug("Error stopping delivery " + connection, t);
643 }
644
645 try
646 {
647 if (pool != null)
648 {
649 log.debug("Stopping the session pool " + pool);
650 pool.stop();
651 }
652 }
653 catch (Throwable t)
654 {
655 log.debug("Error clearing the pool " + pool, t);
656 }
657 pool = null;
658 }
659
660 /**
661 * Notify of an event
662 *
663 * @param event the event
664 * @param userData any user data, e.g. the exception on a failure
665 */
666 protected void sendNotification(String event, Object userData)
667 {
668 if (emitter == null)
669 return;
670
671 try
672 {
673 Notification notif = new Notification(event, spec, emitter.nextNotificationSequenceNumber());
674 notif.setUserData(userData);
675 emitter.sendNotification(notif);
676 }
677 catch (Throwable t)
678 {
679 log.warn("Error sending notification: " + event, t);
680 }
681 }
682
683 /**
684 * Handles the setup
685 */
686 private class SetupActivation implements Work
687 {
688 public void run()
689 {
690 try
691 {
692 setup();
693 }
694 catch (Throwable t)
695 {
696 handleFailure(t);
697 }
698 }
699
700 public void release()
701 {
702 }
703 }
704 }