1 /*
2 * Copyright 2002-2008 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.jms.listener;
18
19 import javax.jms.Connection;
20 import javax.jms.Destination;
21 import javax.jms.JMSException;
22 import javax.jms.Message;
23 import javax.jms.MessageConsumer;
24 import javax.jms.Session;
25 import javax.jms.Topic;
26
27 import org.springframework.beans.factory.BeanNameAware;
28 import org.springframework.jms.connection.ConnectionFactoryUtils;
29 import org.springframework.jms.connection.JmsResourceHolder;
30 import org.springframework.jms.connection.SingleConnectionFactory;
31 import org.springframework.jms.support.JmsUtils;
32 import org.springframework.transaction.PlatformTransactionManager;
33 import org.springframework.transaction.TransactionStatus;
34 import org.springframework.transaction.support.DefaultTransactionDefinition;
35 import org.springframework.transaction.support.ResourceTransactionManager;
36 import org.springframework.transaction.support.TransactionSynchronizationManager;
37 import org.springframework.transaction.support.TransactionSynchronizationUtils;
38
39 /**
40 * Base class for listener container implementations which are based on polling.
41 * Provides support for listener handling based on {@link javax.jms.MessageConsumer},
42 * optionally participating in externally managed transactions.
43 *
44 * <p>This listener container variant is built for repeated polling attempts,
45 * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
46 * may be reobtained fo reach attempt or cached inbetween attempts; this is up
47 * to the concrete implementation. The receive timeout for each attempt can be
48 * configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
49 *
50 * <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
51 * which is perfectly compatible with both native JMS and JMS in a J2EE environment.
52 * Neither the JMS <code>MessageConsumer.setMessageListener</code> facility
53 * nor the JMS ServerSessionPool facility is required. A further advantage
54 * of this approach is full control over the listening process, allowing for
55 * custom scaling and throttling and of concurrent message processing
56 * (which is up to concrete subclasses).
57 *
58 * <p>Message reception and listener execution can automatically be wrapped
59 * in transactions through passing a Spring
60 * {@link org.springframework.transaction.PlatformTransactionManager} into the
61 * {@link #setTransactionManager "transactionManager"} property. This will usually
62 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
63 * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
64 * from JNDI (check your J2EE server's documentation).
65 *
66 * <p>This base class does not assume any specific mechanism for asynchronous
67 * execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
68 * for a concrete implementation which is based on Spring's
69 * {@link org.springframework.core.task.TaskExecutor} abstraction,
70 * including dynamic scaling of concurrent consumers and automatic self recovery.
71 *
72 * @author Juergen Hoeller
73 * @since 2.0.3
74 * @see #createListenerConsumer
75 * @see #receiveAndExecute
76 * @see #setTransactionManager
77 */
78 public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer
79 implements BeanNameAware {
80
81 /**
82 * The default receive timeout: 1000 ms = 1 second.
83 */
84 public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
85
86
87 private final MessageListenerContainerResourceFactory transactionalResourceFactory =
88 new MessageListenerContainerResourceFactory();
89
90 private boolean sessionTransactedCalled = false;
91
92 private boolean pubSubNoLocal = false;
93
94 private PlatformTransactionManager transactionManager;
95
96 private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
97
98 private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
99
100
101 public void setSessionTransacted(boolean sessionTransacted) {
102 super.setSessionTransacted(sessionTransacted);
103 this.sessionTransactedCalled = true;
104 }
105
106 /**
107 * Set whether to inhibit the delivery of messages published by its own connection.
108 * Default is "false".
109 * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
110 */
111 public void setPubSubNoLocal(boolean pubSubNoLocal) {
112 this.pubSubNoLocal = pubSubNoLocal;
113 }
114
115 /**
116 * Return whether to inhibit the delivery of messages published by its own connection.
117 */
118 protected boolean isPubSubNoLocal() {
119 return this.pubSubNoLocal;
120 }
121
122 /**
123 * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
124 * to use for transactional wrapping of message reception plus listener execution.
125 * <p>Default is none, not performing any transactional wrapping.
126 * If specified, this will usually be a Spring
127 * {@link org.springframework.transaction.jta.JtaTransactionManager} or one
128 * of its subclasses, in combination with a JTA-aware ConnectionFactory that
129 * this message listener container obtains its Connections from.
130 * <p><b>Note: Consider the use of local JMS transactions instead.</b>
131 * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
132 * to "true" in order to use a locally transacted JMS Session for the entire
133 * receive processing, including any Session operations performed by a
134 * {@link SessionAwareMessageListener} (e.g. sending a response message).
135 * Alternatively, a {@link org.springframework.jms.connection.JmsTransactionManager}
136 * may be used for fully synchronized Spring transactions based on local JMS
137 * transactions. Check {@link AbstractMessageListenerContainer}'s javadoc for
138 * a discussion of transaction choices and message redelivery scenarios.
139 * @see org.springframework.transaction.jta.JtaTransactionManager
140 * @see org.springframework.jms.connection.JmsTransactionManager
141 */
142 public void setTransactionManager(PlatformTransactionManager transactionManager) {
143 this.transactionManager = transactionManager;
144 }
145
146 /**
147 * Return the Spring PlatformTransactionManager to use for transactional
148 * wrapping of message reception plus listener execution.
149 */
150 protected final PlatformTransactionManager getTransactionManager() {
151 return this.transactionManager;
152 }
153
154 /**
155 * Specify the transaction name to use for transactional wrapping.
156 * Default is the bean name of this listener container, if any.
157 * @see org.springframework.transaction.TransactionDefinition#getName()
158 */
159 public void setTransactionName(String transactionName) {
160 this.transactionDefinition.setName(transactionName);
161 }
162
163 /**
164 * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
165 * Default is none, using the transaction manager's default timeout.
166 * @see org.springframework.transaction.TransactionDefinition#getTimeout()
167 * @see #setReceiveTimeout
168 */
169 public void setTransactionTimeout(int transactionTimeout) {
170 this.transactionDefinition.setTimeout(transactionTimeout);
171 }
172
173 /**
174 * Set the timeout to use for receive calls, in <b>milliseconds</b>.
175 * The default is 1000 ms, that is, 1 second.
176 * <p><b>NOTE:</b> This value needs to be smaller than the transaction
177 * timeout used by the transaction manager (in the appropriate unit,
178 * of course). -1 indicates no timeout at all; however, this is only
179 * feasible if not running within a transaction manager.
180 * @see javax.jms.MessageConsumer#receive(long)
181 * @see javax.jms.MessageConsumer#receive()
182 * @see #setTransactionTimeout
183 */
184 public void setReceiveTimeout(long receiveTimeout) {
185 this.receiveTimeout = receiveTimeout;
186 }
187
188
189 public void initialize() {
190 // Set sessionTransacted=true in case of a non-JTA transaction manager.
191 if (!this.sessionTransactedCalled &&
192 this.transactionManager instanceof ResourceTransactionManager &&
193 !TransactionSynchronizationUtils.sameResourceFactory(
194 (ResourceTransactionManager) this.transactionManager, getConnectionFactory())) {
195 super.setSessionTransacted(true);
196 }
197
198 // Use bean name as default transaction name.
199 if (this.transactionDefinition.getName() == null) {
200 this.transactionDefinition.setName(getBeanName());
201 }
202
203 // Proceed with superclass initialization.
204 super.initialize();
205 }
206
207
208 /**
209 * Create a MessageConsumer for the given JMS Session,
210 * registering a MessageListener for the specified listener.
211 * @param session the JMS Session to work on
212 * @return the MessageConsumer
213 * @throws javax.jms.JMSException if thrown by JMS methods
214 * @see #receiveAndExecute
215 */
216 protected MessageConsumer createListenerConsumer(Session session) throws JMSException {
217 Destination destination = getDestination();
218 if (destination == null) {
219 destination = resolveDestinationName(session, getDestinationName());
220 }
221 return createConsumer(session, destination);
222 }
223
224 /**
225 * Execute the listener for a message received from the given consumer,
226 * wrapping the entire operation in an external transaction if demanded.
227 * @param session the JMS Session to work on
228 * @param consumer the MessageConsumer to work on
229 * @return whether a message has been received
230 * @throws JMSException if thrown by JMS methods
231 * @see #doReceiveAndExecute
232 */
233 protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
234 throws JMSException {
235
236 if (this.transactionManager != null) {
237 // Execute receive within transaction.
238 TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
239 boolean messageReceived = true;
240 try {
241 messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
242 }
243 catch (JMSException ex) {
244 rollbackOnException(status, ex);
245 throw ex;
246 }
247 catch (RuntimeException ex) {
248 rollbackOnException(status, ex);
249 throw ex;
250 }
251 catch (Error err) {
252 rollbackOnException(status, err);
253 throw err;
254 }
255 this.transactionManager.commit(status);
256 return messageReceived;
257 }
258
259 else {
260 // Execute receive outside of transaction.
261 return doReceiveAndExecute(invoker, session, consumer, null);
262 }
263 }
264
265 /**
266 * Actually execute the listener for a message received from the given consumer,
267 * fetching all requires resources and invoking the listener.
268 * @param session the JMS Session to work on
269 * @param consumer the MessageConsumer to work on
270 * @param status the TransactionStatus (may be <code>null</code>)
271 * @return whether a message has been received
272 * @throws JMSException if thrown by JMS methods
273 * @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
274 */
275 protected boolean doReceiveAndExecute(
276 Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
277 throws JMSException {
278
279 Connection conToClose = null;
280 Session sessionToClose = null;
281 MessageConsumer consumerToClose = null;
282 try {
283 Session sessionToUse = session;
284 boolean transactional = false;
285 if (sessionToUse == null) {
286 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
287 getConnectionFactory(), this.transactionalResourceFactory, true);
288 transactional = (sessionToUse != null);
289 }
290 if (sessionToUse == null) {
291 Connection conToUse = null;
292 if (sharedConnectionEnabled()) {
293 conToUse = getSharedConnection();
294 }
295 else {
296 conToUse = createConnection();
297 conToClose = conToUse;
298 conToUse.start();
299 }
300 sessionToUse = createSession(conToUse);
301 sessionToClose = sessionToUse;
302 }
303 MessageConsumer consumerToUse = consumer;
304 if (consumerToUse == null) {
305 consumerToUse = createListenerConsumer(sessionToUse);
306 consumerToClose = consumerToUse;
307 }
308 Message message = receiveMessage(consumerToUse);
309 if (message != null) {
310 if (logger.isDebugEnabled()) {
311 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
312 consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
313 sessionToUse + "]");
314 }
315 messageReceived(invoker, sessionToUse);
316 boolean exposeResource = (!transactional && isExposeListenerSession() &&
317 !TransactionSynchronizationManager.hasResource(getConnectionFactory()));
318 if (exposeResource) {
319 TransactionSynchronizationManager.bindResource(
320 getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
321 }
322 try {
323 doExecuteListener(sessionToUse, message);
324 }
325 catch (Throwable ex) {
326 if (status != null) {
327 if (logger.isDebugEnabled()) {
328 logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
329 }
330 status.setRollbackOnly();
331 }
332 handleListenerException(ex);
333 // Rethrow JMSException to indicate an infrastructure problem
334 // that may have to trigger recovery...
335 if (ex instanceof JMSException) {
336 throw (JMSException) ex;
337 }
338 }
339 finally {
340 if (exposeResource) {
341 TransactionSynchronizationManager.unbindResource(getConnectionFactory());
342 }
343 }
344 return true;
345 }
346 else {
347 if (logger.isTraceEnabled()) {
348 logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
349 "session [" + sessionToUse + "] did not receive a message");
350 }
351 noMessageReceived(invoker, sessionToUse);
352 return false;
353 }
354 }
355 finally {
356 JmsUtils.closeMessageConsumer(consumerToClose);
357 JmsUtils.closeSession(sessionToClose);
358 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
359 }
360 }
361
362 /**
363 * This implementation checks whether the Session is externally synchronized.
364 * In this case, the Session is not locally transacted, despite the listener
365 * container's "sessionTransacted" flag being set to "true".
366 * @see org.springframework.jms.connection.JmsResourceHolder
367 */
368 protected boolean isSessionLocallyTransacted(Session session) {
369 if (!super.isSessionLocallyTransacted(session)) {
370 return false;
371 }
372 JmsResourceHolder resourceHolder =
373 (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
374 return (resourceHolder == null || resourceHolder instanceof LocallyExposedJmsResourceHolder ||
375 !resourceHolder.containsSession(session));
376 }
377
378 /**
379 * Perform a rollback, handling rollback exceptions properly.
380 * @param status object representing the transaction
381 * @param ex the thrown listener exception or error
382 */
383 private void rollbackOnException(TransactionStatus status, Throwable ex) {
384 logger.debug("Initiating transaction rollback on listener exception", ex);
385 try {
386 this.transactionManager.rollback(status);
387 }
388 catch (RuntimeException ex2) {
389 logger.error("Listener exception overridden by rollback exception", ex);
390 throw ex2;
391 }
392 catch (Error err) {
393 logger.error("Listener exception overridden by rollback error", ex);
394 throw err;
395 }
396 }
397
398 /**
399 * Receive a message from the given consumer.
400 * @param consumer the MessageConsumer to use
401 * @return the Message, or <code>null</code> if none
402 * @throws JMSException if thrown by JMS methods
403 */
404 protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
405 return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
406 }
407
408 /**
409 * Template method that gets called right when a new message has been received,
410 * before attempting to process it. Allows subclasses to react to the event
411 * of an actual incoming message, for example adapting their consumer count.
412 * @param invoker the invoker object (passed through)
413 * @param session the receiving JMS Session
414 */
415 protected void messageReceived(Object invoker, Session session) {
416 }
417
418 /**
419 * Template method that gets called right <i>no</i> message has been received,
420 * before attempting to process it. Allows subclasses to react to the event
421 * of an actual incoming message, for example marking .
422 * @param invoker the invoker object (passed through)
423 * @param session the receiving JMS Session
424 */
425 protected void noMessageReceived(Object invoker, Session session) {
426 }
427
428
429 //-------------------------------------------------------------------------
430 // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
431 //-------------------------------------------------------------------------
432
433 /**
434 * Fetch an appropriate Connection from the given JmsResourceHolder.
435 * <p>This implementation accepts any JMS 1.1 Connection.
436 * @param holder the JmsResourceHolder
437 * @return an appropriate Connection fetched from the holder,
438 * or <code>null</code> if none found
439 */
440 protected Connection getConnection(JmsResourceHolder holder) {
441 return holder.getConnection();
442 }
443
444 /**
445 * Fetch an appropriate Session from the given JmsResourceHolder.
446 * <p>This implementation accepts any JMS 1.1 Session.
447 * @param holder the JmsResourceHolder
448 * @return an appropriate Session fetched from the holder,
449 * or <code>null</code> if none found
450 */
451 protected Session getSession(JmsResourceHolder holder) {
452 return holder.getSession();
453 }
454
455 /**
456 * Create a JMS MessageConsumer for the given Session and Destination.
457 * <p>This implementation uses JMS 1.1 API.
458 * @param session the JMS Session to create a MessageConsumer for
459 * @param destination the JMS Destination to create a MessageConsumer for
460 * @return the new JMS MessageConsumer
461 * @throws javax.jms.JMSException if thrown by JMS API methods
462 */
463 protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
464 // Only pass in the NoLocal flag in case of a Topic:
465 // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
466 // in case of the NoLocal flag being specified for a Queue.
467 if (isPubSubDomain()) {
468 if (isSubscriptionDurable() && destination instanceof Topic) {
469 return session.createDurableSubscriber(
470 (Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
471 }
472 else {
473 return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
474 }
475 }
476 else {
477 return session.createConsumer(destination, getMessageSelector());
478 }
479 }
480
481
482 /**
483 * ResourceFactory implementation that delegates to this listener container's protected callback methods.
484 */
485 private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
486
487 public Connection getConnection(JmsResourceHolder holder) {
488 return AbstractPollingMessageListenerContainer.this.getConnection(holder);
489 }
490
491 public Session getSession(JmsResourceHolder holder) {
492 return AbstractPollingMessageListenerContainer.this.getSession(holder);
493 }
494
495 public Connection createConnection() throws JMSException {
496 if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
497 Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
498 return new SingleConnectionFactory(sharedCon).createConnection();
499 }
500 else {
501 return AbstractPollingMessageListenerContainer.this.createConnection();
502 }
503 }
504
505 public Session createSession(Connection con) throws JMSException {
506 return AbstractPollingMessageListenerContainer.this.createSession(con);
507 }
508
509 public boolean isSynchedLocalTransactionAllowed() {
510 return AbstractPollingMessageListenerContainer.this.isSessionTransacted();
511 }
512 }
513
514 }