Save This Page
Home » spring-framework-2.5.6-with-dependencies » org.springframework » jms » listener » [javadoc | source]
    1   /*
    2    * Copyright 2002-2008 the original author or authors.
    3    *
    4    * Licensed under the Apache License, Version 2.0 (the "License");
    5    * you may not use this file except in compliance with the License.
    6    * You may obtain a copy of the License at
    7    *
    8    *      http://www.apache.org/licenses/LICENSE-2.0
    9    *
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS,
   12    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13    * See the License for the specific language governing permissions and
   14    * limitations under the License.
   15    */
   16   
   17   package org.springframework.jms.listener;
   18   
   19   import javax.jms.Connection;
   20   import javax.jms.Destination;
   21   import javax.jms.JMSException;
   22   import javax.jms.Message;
   23   import javax.jms.MessageConsumer;
   24   import javax.jms.Session;
   25   import javax.jms.Topic;
   26   
   27   import org.springframework.beans.factory.BeanNameAware;
   28   import org.springframework.jms.connection.ConnectionFactoryUtils;
   29   import org.springframework.jms.connection.JmsResourceHolder;
   30   import org.springframework.jms.connection.SingleConnectionFactory;
   31   import org.springframework.jms.support.JmsUtils;
   32   import org.springframework.transaction.PlatformTransactionManager;
   33   import org.springframework.transaction.TransactionStatus;
   34   import org.springframework.transaction.support.DefaultTransactionDefinition;
   35   import org.springframework.transaction.support.ResourceTransactionManager;
   36   import org.springframework.transaction.support.TransactionSynchronizationManager;
   37   import org.springframework.transaction.support.TransactionSynchronizationUtils;
   38   
   39   /**
   40    * Base class for listener container implementations which are based on polling.
   41    * Provides support for listener handling based on {@link javax.jms.MessageConsumer},
   42    * optionally participating in externally managed transactions.
   43    *
   44    * <p>This listener container variant is built for repeated polling attempts,
   45    * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
   46    * may be reobtained fo reach attempt or cached inbetween attempts; this is up
   47    * to the concrete implementation. The receive timeout for each attempt can be
   48    * configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
   49    *
   50    * <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
   51    * which is perfectly compatible with both native JMS and JMS in a J2EE environment.
   52    * Neither the JMS <code>MessageConsumer.setMessageListener</code> facility
   53    * nor the JMS ServerSessionPool facility is required. A further advantage
   54    * of this approach is full control over the listening process, allowing for
   55    * custom scaling and throttling and of concurrent message processing
   56    * (which is up to concrete subclasses).
   57    *
   58    * <p>Message reception and listener execution can automatically be wrapped
   59    * in transactions through passing a Spring
   60    * {@link org.springframework.transaction.PlatformTransactionManager} into the
   61    * {@link #setTransactionManager "transactionManager"} property. This will usually
   62    * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
   63    * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
   64    * from JNDI (check your J2EE server's documentation).
   65    *
   66    * <p>This base class does not assume any specific mechanism for asynchronous
   67    * execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
   68    * for a concrete implementation which is based on Spring's
   69    * {@link org.springframework.core.task.TaskExecutor} abstraction,
   70    * including dynamic scaling of concurrent consumers and automatic self recovery.
   71    *
   72    * @author Juergen Hoeller
   73    * @since 2.0.3
   74    * @see #createListenerConsumer
   75    * @see #receiveAndExecute
   76    * @see #setTransactionManager
   77    */
   78   public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer
   79   		implements BeanNameAware {
   80   
   81   	/**
   82   	 * The default receive timeout: 1000 ms = 1 second.
   83   	 */
   84   	public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
   85   
   86   
   87   	private final MessageListenerContainerResourceFactory transactionalResourceFactory =
   88   			new MessageListenerContainerResourceFactory();
   89   
   90   	private boolean sessionTransactedCalled = false;
   91   
   92   	private boolean pubSubNoLocal = false;
   93   
   94   	private PlatformTransactionManager transactionManager;
   95   
   96   	private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
   97   
   98   	private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
   99   
  100   
  101   	public void setSessionTransacted(boolean sessionTransacted) {
  102   		super.setSessionTransacted(sessionTransacted);
  103   		this.sessionTransactedCalled = true;
  104   	}
  105   
  106   	/**
  107   	 * Set whether to inhibit the delivery of messages published by its own connection.
  108   	 * Default is "false".
  109   	 * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
  110   	 */
  111   	public void setPubSubNoLocal(boolean pubSubNoLocal) {
  112   		this.pubSubNoLocal = pubSubNoLocal;
  113   	}
  114   
  115   	/**
  116   	 * Return whether to inhibit the delivery of messages published by its own connection.
  117   	 */
  118   	protected boolean isPubSubNoLocal() {
  119   		return this.pubSubNoLocal;
  120   	}
  121   
  122   	/**
  123   	 * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
  124   	 * to use for transactional wrapping of message reception plus listener execution.
  125   	 * <p>Default is none, not performing any transactional wrapping.
  126   	 * If specified, this will usually be a Spring
  127   	 * {@link org.springframework.transaction.jta.JtaTransactionManager} or one
  128   	 * of its subclasses, in combination with a JTA-aware ConnectionFactory that
  129   	 * this message listener container obtains its Connections from.
  130   	 * <p><b>Note: Consider the use of local JMS transactions instead.</b>
  131   	 * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
  132   	 * to "true" in order to use a locally transacted JMS Session for the entire
  133   	 * receive processing, including any Session operations performed by a
  134   	 * {@link SessionAwareMessageListener} (e.g. sending a response message).
  135   	 * Alternatively, a {@link org.springframework.jms.connection.JmsTransactionManager}
  136   	 * may be used for fully synchronized Spring transactions based on local JMS
  137   	 * transactions. Check {@link AbstractMessageListenerContainer}'s javadoc for
  138   	 * a discussion of transaction choices and message redelivery scenarios.
  139   	 * @see org.springframework.transaction.jta.JtaTransactionManager
  140   	 * @see org.springframework.jms.connection.JmsTransactionManager
  141   	 */
  142   	public void setTransactionManager(PlatformTransactionManager transactionManager) {
  143   		this.transactionManager = transactionManager;
  144   	}
  145   
  146   	/**
  147   	 * Return the Spring PlatformTransactionManager to use for transactional
  148   	 * wrapping of message reception plus listener execution.
  149   	 */
  150   	protected final PlatformTransactionManager getTransactionManager() {
  151   		return this.transactionManager;
  152   	}
  153   
  154   	/**
  155   	 * Specify the transaction name to use for transactional wrapping.
  156   	 * Default is the bean name of this listener container, if any.
  157   	 * @see org.springframework.transaction.TransactionDefinition#getName()
  158   	 */
  159   	public void setTransactionName(String transactionName) {
  160   		this.transactionDefinition.setName(transactionName);
  161   	}
  162   
  163   	/**
  164   	 * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
  165   	 * Default is none, using the transaction manager's default timeout.
  166   	 * @see org.springframework.transaction.TransactionDefinition#getTimeout()
  167   	 * @see #setReceiveTimeout
  168   	 */
  169   	public void setTransactionTimeout(int transactionTimeout) {
  170   		this.transactionDefinition.setTimeout(transactionTimeout);
  171   	}
  172   
  173   	/**
  174   	 * Set the timeout to use for receive calls, in <b>milliseconds</b>.
  175   	 * The default is 1000 ms, that is, 1 second.
  176   	 * <p><b>NOTE:</b> This value needs to be smaller than the transaction
  177   	 * timeout used by the transaction manager (in the appropriate unit,
  178   	 * of course). -1 indicates no timeout at all; however, this is only
  179   	 * feasible if not running within a transaction manager.
  180   	 * @see javax.jms.MessageConsumer#receive(long)
  181   	 * @see javax.jms.MessageConsumer#receive()
  182   	 * @see #setTransactionTimeout
  183   	 */
  184   	public void setReceiveTimeout(long receiveTimeout) {
  185   		this.receiveTimeout = receiveTimeout;
  186   	}
  187   
  188   
  189   	public void initialize() {
  190   		// Set sessionTransacted=true in case of a non-JTA transaction manager.
  191   		if (!this.sessionTransactedCalled &&
  192   				this.transactionManager instanceof ResourceTransactionManager &&
  193   				!TransactionSynchronizationUtils.sameResourceFactory(
  194   						(ResourceTransactionManager) this.transactionManager, getConnectionFactory())) {
  195   			super.setSessionTransacted(true);
  196   		}
  197   
  198   		// Use bean name as default transaction name.
  199   		if (this.transactionDefinition.getName() == null) {
  200   			this.transactionDefinition.setName(getBeanName());
  201   		}
  202   
  203   		// Proceed with superclass initialization.
  204   		super.initialize();
  205   	}
  206   
  207   
  208   	/**
  209   	 * Create a MessageConsumer for the given JMS Session,
  210   	 * registering a MessageListener for the specified listener.
  211   	 * @param session the JMS Session to work on
  212   	 * @return the MessageConsumer
  213   	 * @throws javax.jms.JMSException if thrown by JMS methods
  214   	 * @see #receiveAndExecute
  215   	 */
  216   	protected MessageConsumer createListenerConsumer(Session session) throws JMSException {
  217   		Destination destination = getDestination();
  218   		if (destination == null) {
  219   			destination = resolveDestinationName(session, getDestinationName());
  220   		}
  221   		return createConsumer(session, destination);
  222   	}
  223   
  224   	/**
  225   	 * Execute the listener for a message received from the given consumer,
  226   	 * wrapping the entire operation in an external transaction if demanded.
  227   	 * @param session the JMS Session to work on
  228   	 * @param consumer the MessageConsumer to work on
  229   	 * @return whether a message has been received
  230   	 * @throws JMSException if thrown by JMS methods
  231   	 * @see #doReceiveAndExecute
  232   	 */
  233   	protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
  234   			throws JMSException {
  235   
  236   		if (this.transactionManager != null) {
  237   			// Execute receive within transaction.
  238   			TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
  239   			boolean messageReceived = true;
  240   			try {
  241   				messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
  242   			}
  243   			catch (JMSException ex) {
  244   				rollbackOnException(status, ex);
  245   				throw ex;
  246   			}
  247   			catch (RuntimeException ex) {
  248   				rollbackOnException(status, ex);
  249   				throw ex;
  250   			}
  251   			catch (Error err) {
  252   				rollbackOnException(status, err);
  253   				throw err;
  254   			}
  255   			this.transactionManager.commit(status);
  256   			return messageReceived;
  257   		}
  258   
  259   		else {
  260   			// Execute receive outside of transaction.
  261   			return doReceiveAndExecute(invoker, session, consumer, null);
  262   		}
  263   	}
  264   
  265   	/**
  266   	 * Actually execute the listener for a message received from the given consumer,
  267   	 * fetching all requires resources and invoking the listener.
  268   	 * @param session the JMS Session to work on
  269   	 * @param consumer the MessageConsumer to work on
  270   	 * @param status the TransactionStatus (may be <code>null</code>)
  271   	 * @return whether a message has been received
  272   	 * @throws JMSException if thrown by JMS methods
  273   	 * @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
  274   	 */
  275   	protected boolean doReceiveAndExecute(
  276   			Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
  277   			throws JMSException {
  278   
  279   		Connection conToClose = null;
  280   		Session sessionToClose = null;
  281   		MessageConsumer consumerToClose = null;
  282   		try {
  283   			Session sessionToUse = session;
  284   			boolean transactional = false;
  285   			if (sessionToUse == null) {
  286   				sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
  287   						getConnectionFactory(), this.transactionalResourceFactory, true);
  288   				transactional = (sessionToUse != null);
  289   			}
  290   			if (sessionToUse == null) {
  291   				Connection conToUse = null;
  292   				if (sharedConnectionEnabled()) {
  293   					conToUse = getSharedConnection();
  294   				}
  295   				else {
  296   					conToUse = createConnection();
  297   					conToClose = conToUse;
  298   					conToUse.start();
  299   				}
  300   				sessionToUse = createSession(conToUse);
  301   				sessionToClose = sessionToUse;
  302   			}
  303   			MessageConsumer consumerToUse = consumer;
  304   			if (consumerToUse == null) {
  305   				consumerToUse = createListenerConsumer(sessionToUse);
  306   				consumerToClose = consumerToUse;
  307   			}
  308   			Message message = receiveMessage(consumerToUse);
  309   			if (message != null) {
  310   				if (logger.isDebugEnabled()) {
  311   					logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
  312   							consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
  313   							sessionToUse + "]");
  314   				}
  315   				messageReceived(invoker, sessionToUse);
  316   				boolean exposeResource = (!transactional && isExposeListenerSession() &&
  317   						!TransactionSynchronizationManager.hasResource(getConnectionFactory()));
  318   				if (exposeResource) {
  319   					TransactionSynchronizationManager.bindResource(
  320   							getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
  321   				}
  322   				try {
  323   					doExecuteListener(sessionToUse, message);
  324   				}
  325   				catch (Throwable ex) {
  326   					if (status != null) {
  327   						if (logger.isDebugEnabled()) {
  328   							logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
  329   						}
  330   						status.setRollbackOnly();
  331   					}
  332   					handleListenerException(ex);
  333   					// Rethrow JMSException to indicate an infrastructure problem
  334   					// that may have to trigger recovery...
  335   					if (ex instanceof JMSException) {
  336   						throw (JMSException) ex;
  337   					}
  338   				}
  339   				finally {
  340   					if (exposeResource) {
  341   						TransactionSynchronizationManager.unbindResource(getConnectionFactory());
  342   					}
  343   				}
  344   				return true;
  345   			}
  346   			else {
  347   				if (logger.isTraceEnabled()) {
  348   					logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
  349   							"session [" + sessionToUse + "] did not receive a message");
  350   				}
  351   				noMessageReceived(invoker, sessionToUse);
  352   				return false;
  353   			}
  354   		}
  355   		finally {
  356   			JmsUtils.closeMessageConsumer(consumerToClose);
  357   			JmsUtils.closeSession(sessionToClose);
  358   			ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
  359   		}
  360   	}
  361   
  362   	/**
  363   	 * This implementation checks whether the Session is externally synchronized.
  364   	 * In this case, the Session is not locally transacted, despite the listener
  365   	 * container's "sessionTransacted" flag being set to "true".
  366   	 * @see org.springframework.jms.connection.JmsResourceHolder
  367   	 */
  368   	protected boolean isSessionLocallyTransacted(Session session) {
  369   		if (!super.isSessionLocallyTransacted(session)) {
  370   			return false;
  371   		}
  372   		JmsResourceHolder resourceHolder =
  373   				(JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
  374   		return (resourceHolder == null || resourceHolder instanceof LocallyExposedJmsResourceHolder ||
  375   				!resourceHolder.containsSession(session));
  376   	}
  377   
  378   	/**
  379   	 * Perform a rollback, handling rollback exceptions properly.
  380   	 * @param status object representing the transaction
  381   	 * @param ex the thrown listener exception or error
  382   	 */
  383   	private void rollbackOnException(TransactionStatus status, Throwable ex) {
  384   		logger.debug("Initiating transaction rollback on listener exception", ex);
  385   		try {
  386   			this.transactionManager.rollback(status);
  387   		}
  388   		catch (RuntimeException ex2) {
  389   			logger.error("Listener exception overridden by rollback exception", ex);
  390   			throw ex2;
  391   		}
  392   		catch (Error err) {
  393   			logger.error("Listener exception overridden by rollback error", ex);
  394   			throw err;
  395   		}
  396   	}
  397   
  398   	/**
  399   	 * Receive a message from the given consumer.
  400   	 * @param consumer the MessageConsumer to use
  401   	 * @return the Message, or <code>null</code> if none
  402   	 * @throws JMSException if thrown by JMS methods
  403   	 */
  404   	protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
  405   		return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
  406   	}
  407   
  408   	/**
  409   	 * Template method that gets called right when a new message has been received,
  410   	 * before attempting to process it. Allows subclasses to react to the event
  411   	 * of an actual incoming message, for example adapting their consumer count.
  412   	 * @param invoker the invoker object (passed through)
  413   	 * @param session the receiving JMS Session
  414   	 */
  415   	protected void messageReceived(Object invoker, Session session) {
  416   	}
  417   
  418   	/**
  419   	 * Template method that gets called right <i>no</i> message has been received,
  420   	 * before attempting to process it. Allows subclasses to react to the event
  421   	 * of an actual incoming message, for example marking .
  422   	 * @param invoker the invoker object (passed through)
  423   	 * @param session the receiving JMS Session
  424   	 */
  425   	protected void noMessageReceived(Object invoker, Session session) {
  426   	}
  427   
  428   
  429   	//-------------------------------------------------------------------------
  430   	// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
  431   	//-------------------------------------------------------------------------
  432   
  433   	/**
  434   	 * Fetch an appropriate Connection from the given JmsResourceHolder.
  435   	 * <p>This implementation accepts any JMS 1.1 Connection.
  436   	 * @param holder the JmsResourceHolder
  437   	 * @return an appropriate Connection fetched from the holder,
  438   	 * or <code>null</code> if none found
  439   	 */
  440   	protected Connection getConnection(JmsResourceHolder holder) {
  441   		return holder.getConnection();
  442   	}
  443   
  444   	/**
  445   	 * Fetch an appropriate Session from the given JmsResourceHolder.
  446   	 * <p>This implementation accepts any JMS 1.1 Session.
  447   	 * @param holder the JmsResourceHolder
  448   	 * @return an appropriate Session fetched from the holder,
  449   	 * or <code>null</code> if none found
  450   	 */
  451   	protected Session getSession(JmsResourceHolder holder) {
  452   		return holder.getSession();
  453   	}
  454   
  455   	/**
  456   	 * Create a JMS MessageConsumer for the given Session and Destination.
  457   	 * <p>This implementation uses JMS 1.1 API.
  458   	 * @param session the JMS Session to create a MessageConsumer for
  459   	 * @param destination the JMS Destination to create a MessageConsumer for
  460   	 * @return the new JMS MessageConsumer
  461   	 * @throws javax.jms.JMSException if thrown by JMS API methods
  462   	 */
  463   	protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
  464   		// Only pass in the NoLocal flag in case of a Topic:
  465   		// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
  466   		// in case of the NoLocal flag being specified for a Queue.
  467   		if (isPubSubDomain()) {
  468   			if (isSubscriptionDurable() && destination instanceof Topic) {
  469   				return session.createDurableSubscriber(
  470   						(Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
  471   			}
  472   			else {
  473   				return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
  474   			}
  475   		}
  476   		else {
  477   			return session.createConsumer(destination, getMessageSelector());
  478   		}
  479   	}
  480   
  481   
  482   	/**
  483   	 * ResourceFactory implementation that delegates to this listener container's protected callback methods.
  484   	 */
  485   	private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
  486   
  487   		public Connection getConnection(JmsResourceHolder holder) {
  488   			return AbstractPollingMessageListenerContainer.this.getConnection(holder);
  489   		}
  490   
  491   		public Session getSession(JmsResourceHolder holder) {
  492   			return AbstractPollingMessageListenerContainer.this.getSession(holder);
  493   		}
  494   
  495   		public Connection createConnection() throws JMSException {
  496   			if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
  497   				Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
  498   				return new SingleConnectionFactory(sharedCon).createConnection();
  499   			}
  500   			else {
  501   				return AbstractPollingMessageListenerContainer.this.createConnection();
  502   			}
  503   		}
  504   
  505   		public Session createSession(Connection con) throws JMSException {
  506   			return AbstractPollingMessageListenerContainer.this.createSession(con);
  507   		}
  508   
  509   		public boolean isSynchedLocalTransactionAllowed() {
  510   			return AbstractPollingMessageListenerContainer.this.isSessionTransacted();
  511   		}
  512   	}
  513   
  514   }

Save This Page
Home » spring-framework-2.5.6-with-dependencies » org.springframework » jms » listener » [javadoc | source]