Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ActiveMQMessageConsumer.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  
22  import java.util.LinkedList;
23  
24  import javax.jms.IllegalStateException;
25  import javax.jms.InvalidDestinationException;
26  import javax.jms.JMSException;
27  import javax.jms.Message;
28  import javax.jms.MessageConsumer;
29  import javax.jms.MessageListener;
30  
31  import org.activemq.io.util.MemoryBoundedQueue;
32  import org.activemq.management.JMSConsumerStatsImpl;
33  import org.activemq.management.StatsCapable;
34  import org.activemq.management.StatsImpl;
35  import org.activemq.message.ActiveMQDestination;
36  import org.activemq.message.ActiveMQMessage;
37  import org.activemq.selector.SelectorParser;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  
41  /**
42   * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE>
43   * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer
44   * creation method supplied by a session.
45   * <P>
46   * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers.
47   * <P>
48   * A message consumer can be created with a message selector. A message selector allows the client to restrict the
49   * messages delivered to the message consumer to those that match the selector.
50   * <P>
51   * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver
52   * them as they arrive.
53   * <P>
54   * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE>
55   * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait
56   * for the next message.
57   * <P>
58   * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As
59   * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
60   * onMessage</CODE> method.
61   * <P>
62   * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception.
63   *
64   * @version $Revision: 1.1.1.1 $
65   * @see javax.jms.MessageConsumer
66   * @see javax.jms.QueueReceiver
67   * @see javax.jms.TopicSubscriber
68   * @see javax.jms.Session
69   */
70  public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable, Closeable {
71      private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
72      protected ActiveMQSession session;
73      protected String consumerIdentifier;
74      protected MemoryBoundedQueue messageQueue;
75      protected String messageSelector;
76      private MessageListener messageListener;
77      protected String consumerName;
78      protected ActiveMQDestination destination;
79      private boolean closed;
80      protected int consumerNumber;
81      protected int prefetchNumber;
82      protected long startTime;
83      protected boolean noLocal;
84      protected boolean browser;
85      private Thread accessThread;
86      private Object messageListenerGuard;
87      private JMSConsumerStatsImpl stats;
88      
89      private SynchronizedBoolean running = new SynchronizedBoolean(true);
90      private LinkedList stoppedQueue=new LinkedList(); 
91      /**
92       * Create a MessageConsumer
93       *
94       * @param theSession
95       * @param dest
96       * @param name
97       * @param selector
98       * @param cnum
99       * @param prefetch
100      * @param noLocalValue
101      * @param browserValue
102      * @throws JMSException
103      */
104     protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name,
105                                       String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
106         if (dest == null) {
107             throw new InvalidDestinationException("Do not understand a null destination");
108         }
109         if (dest.isTemporary() && theSession.connection.isJ2EEcompliant() && !theSession.isInternalSession()) {
110             //validate that the destination comes from this Connection
111             String physicalName = dest.getPhysicalName();
112             if (physicalName == null) {
113                 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
114             }
115             String clientID = theSession.connection.getInitializedClientID();
116             if (physicalName.indexOf(clientID) < 0) {
117                 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
118             }
119             if (dest.isDeleted()) {
120                 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
121             }
122         }
123         dest.incrementConsumerCounter();
124         if (selector != null) {
125             selector = selector.trim();
126             if (selector.length() > 0) {
127                 // Validate that the selector
128                 new SelectorParser().parse(selector);
129             }
130         }
131         this.session = theSession;
132         this.destination = dest;
133         this.consumerName = name;
134         this.messageSelector = selector;
135 
136         this.consumerNumber = cnum;
137         this.prefetchNumber = prefetch;
138         this.noLocal = noLocalValue;
139         this.browser = browserValue;
140         this.consumerIdentifier = theSession.connection.getClientID() + "." + theSession.getSessionId() + "." + this.consumerNumber;
141         this.startTime = System.currentTimeMillis();
142         this.messageListenerGuard = new Object();
143         this.messageQueue = theSession.connection.getMemoryBoundedQueue(this.consumerIdentifier);
144         this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
145         this.session.addConsumer(this);
146     }
147 
148     /**
149      * @return the memory used by the internal queue for this MessageConsumer
150      */
151     public long getLocalMemoryUsage() {
152         return this.messageQueue.getLocalMemoryUsedByThisQueue();
153     }
154 
155     /**
156      * @return the number of messages enqueued by this consumer awaiting dispatch
157      */
158     public int size() {
159         return this.messageQueue.size();
160     }
161 
162 
163     /**
164      * @return Stats for this MessageConsumer
165      */
166     public StatsImpl getStats() {
167         return stats;
168     }
169 
170     /**
171      * @return Stats for this MessageConsumer
172      */
173     public JMSConsumerStatsImpl getConsumerStats() {
174         return stats;
175     }
176 
177     /**
178      * @return pretty print of this consumer
179      */
180     public String toString() {
181         return "MessageConsumer: " + consumerIdentifier + "[" + consumerNumber + "]";
182     }
183 
184     /**
185      * @return Returns the prefetchNumber.
186      */
187     public int getPrefetchNumber() {
188         return prefetchNumber;
189     }
190 
191     /**
192      * @param prefetchNumber The prefetchNumber to set.
193      */
194     public void setPrefetchNumber(int prefetchNumber) {
195         this.prefetchNumber = prefetchNumber;
196     }
197 
198     /**
199      * Gets this message consumer's message selector expression.
200      *
201      * @return this message consumer's message selector, or null if no message selector exists for the message consumer
202      *         (that is, if the message selector was not set or was set to null or the empty string)
203      * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
204      */
205     public String getMessageSelector() throws JMSException {
206         checkClosed();
207         return this.messageSelector;
208     }
209 
210     /**
211      * Gets the message consumer's <CODE>MessageListener</CODE>.
212      *
213      * @return the listener for the message consumer, or null if no listener is set
214      * @throws JMSException if the JMS provider fails to get the message listener due to some internal error.
215      * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
216      */
217     public MessageListener getMessageListener() throws JMSException {
218         checkClosed();
219         return this.messageListener;
220     }
221 
222     /**
223      * Sets the message consumer's <CODE>MessageListener</CODE>.
224      * <P>
225      * Setting the message listener to null is the equivalent of unsetting the message listener for the message
226      * consumer.
227      * <P>
228      * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an
229      * existing listener or the consumer is being used to consume messages synchronously is undefined.
230      *
231      * @param listener the listener to which the messages are to be delivered
232      * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
233      * @see javax.jms.MessageConsumer#getMessageListener()
234      */
235     public void setMessageListener(MessageListener listener) throws JMSException {
236         checkClosed();
237         synchronized (messageListenerGuard) {
238             this.messageListener = listener;
239         }
240         if (listener != null) {
241             session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC);
242             //messages may already be enqueued
243             ActiveMQMessage msg = null;
244             try {
245                 while ((msg = (ActiveMQMessage)messageQueue.dequeueNoWait()) != null) {
246                     processMessage(msg);
247                 }
248             }
249             catch (InterruptedException ex) {
250                 JMSException jmsEx = new JMSException("Interrupted setting message listener");
251                 jmsEx.setLinkedException(ex);
252                 throw jmsEx;
253             }
254         }
255     }
256 
257     /**
258      * Receives the next message produced for this message consumer.
259      * <P>
260      * This call blocks indefinitely until a message is produced or until this message consumer is closed.
261      * <P>
262      * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the
263      * transaction commits.
264      *
265      * @return the next message produced for this message consumer, or null if this message consumer is concurrently
266      *         closed
267      * @throws JMSException
268      */
269     public Message receive() throws JMSException {
270         checkClosed();
271         session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
272         try {
273             this.accessThread = Thread.currentThread();
274             ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
275             this.accessThread = null;
276             if (message != null) {
277                 boolean expired = message.isExpired();
278                 messageDelivered(message, true, expired);
279                 if (!expired) {
280                     message = message.shallowCopy();
281                 }
282                 else {
283                     message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
284                 }
285             }
286             if( message!=null && log.isDebugEnabled() ) {
287                 log.debug("Message received: "+message);
288             }            
289             return message;
290         }
291         catch (InterruptedException ioe) {
292             return null;
293         }
294     }
295 
296     /**
297      * Receives the next message that arrives within the specified timeout interval.
298      * <P>
299      * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE>
300      * timeout</CODE> of zero never expires, and the call blocks indefinitely.
301      *
302      * @param timeout the timeout value (in milliseconds)
303      * @return the next message produced for this message consumer, or null if the timeout expires or this message
304      *         consumer is concurrently closed
305      * @throws JMSException
306      */
307     public Message receive(long timeout) throws JMSException {
308         checkClosed();
309         session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
310         try {
311             if (timeout == 0) {
312                 return this.receive();
313             }
314             this.accessThread = Thread.currentThread();
315             ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
316             this.accessThread = null;
317             if (message != null) {
318                 boolean expired = message.isExpired();
319                 messageDelivered(message, true, expired);
320                 if (!expired) {
321                     message = message.shallowCopy();
322                 }
323                 else {
324                     message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
325                 }
326             }
327             if( message!=null && log.isDebugEnabled() ) {
328                 log.debug("Message received: "+message);
329             }            
330             return message;
331         }
332         catch (InterruptedException ioe) {
333             return null;
334         }
335     }
336 
337     /**
338      * Receives the next message if one is immediately available.
339      *
340      * @return the next message produced for this message consumer, or null if one is not available
341      * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
342      */
343     public Message receiveNoWait() throws JMSException {
344         checkClosed();
345         session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
346         try {
347             ActiveMQMessage message = null;
348             //iterate through an scrub delivered but expired messages
349             while ((message = (ActiveMQMessage) messageQueue.dequeueNoWait()) != null) {
350                 boolean expired = message.isExpired();
351                 messageDelivered(message, true, expired);
352                 if (!expired) {
353                     if( message!=null && log.isDebugEnabled() ) {
354                         log.debug("Message received: "+message);
355                     }            
356                     return message.shallowCopy();
357                 }
358             }
359         }
360         catch (InterruptedException ioe) {
361             throw new JMSException("Queue is interrupted: " + ioe.getMessage());
362         }
363         return null;
364     }
365 
366     /**
367      * Closes the message consumer.
368      * <P>
369      * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java
370      * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually
371      * reclaim these resources may not be timely enough.
372      * <P>
373      * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message
374      * consumer <CODE>receive</CODE> call returns null when this message consumer is closed.
375      *
376      * @throws JMSException if the JMS provider fails to close the consumer due to some internal error.
377      */
378     public void close() throws JMSException {
379         try {
380             this.accessThread.interrupt();
381         }
382         catch (NullPointerException npe) {
383         }
384         catch (SecurityException se) {
385         }
386         if (destination != null) {
387             destination.decrementConsumerCounter();
388         }
389 
390         this.session.removeConsumer(this);
391         messageQueue.close();
392         closed = true;
393     }
394 
395     /**
396      * @return true if this is a durable topic subscriber
397      */
398     public boolean isDurableSubscriber() {
399         return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0;
400     }
401     
402     /**
403      * @return true if this is a Transient Topic subscriber
404      */
405     public boolean isTransientSubscriber(){
406         return this.destination != null && destination.isTopic() && (consumerName == null || consumerName.length() ==0);
407     }
408 
409     /**
410      * @throws IllegalStateException
411      */
412     protected void checkClosed() throws IllegalStateException {
413         if (closed) {
414             throw new IllegalStateException("The Consumer is closed");
415         }
416     }
417 
418     /**
419      * Process a Message - passing either to the queue or message listener
420      *
421      * @param message
422      */
423     protected void processMessage(ActiveMQMessage message) {
424         if( !running.get() ) {
425             stoppedQueue.addLast(message);
426             return;
427         }
428         message.setConsumerIdentifer(this.consumerIdentifier);
429         MessageListener listener = null;
430         synchronized (messageListenerGuard) {
431             listener = this.messageListener;
432         }
433         boolean transacted = session.isTransacted();
434         try {
435             if (!closed) {
436                 if (message.getJMSActiveMQDestination() == null) {
437                     message.setJMSDestination(getDestination());
438                 }
439                 if (listener != null) {
440                     beforeMessageDelivered(message);
441                     boolean expired = message.isExpired();
442                     if (transacted) {
443                         afterMessageDelivered(message, true, expired, true);
444                     }
445                     if (!expired) {
446                         if( log.isDebugEnabled() ) {
447                             log.debug("Message delivered to message listener: "+message);
448                         }
449                         listener.onMessage(message.shallowCopy());
450                     }
451                     if (!transacted) {
452                         afterMessageDelivered(message, true, expired, true);
453                     }
454                 }
455                 else {
456                     this.messageQueue.enqueue(message);
457                 }
458             }
459             else {
460                 messageDelivered(message, false, false);
461             }
462         }
463         catch (Throwable e) {
464             log.warn("could not process message: " + message + ". Reason: " + e, e);
465             messageDelivered(message, false, false);
466         }
467     }
468 
469     /**
470      * @return Returns the consumerId.
471      */
472     protected String getConsumerIdentifier() {
473         return consumerIdentifier;
474     }
475 
476     /**
477      * @return the consumer name - used for durable consumers
478      */
479     protected String getConsumerName() {
480         return this.consumerName;
481     }
482 
483     /**
484      * Set the name of the Consumer - used for durable subscribers
485      *
486      * @param value
487      */
488     protected void setConsumerName(String value) {
489         this.consumerName = value;
490     }
491 
492     /**
493      * @return the locally unique Consumer Number
494      */
495     protected int getConsumerNumber() {
496         return this.consumerNumber;
497     }
498 
499     /**
500      * Set the locally unique consumer number
501      *
502      * @param value
503      */
504     protected void setConsumerNumber(int value) {
505         this.consumerNumber = value;
506     }
507 
508     /**
509      * @return true if this consumer does not accept locally produced messages
510      */
511     protected boolean isNoLocal() {
512         return this.noLocal;
513     }
514 
515     /**
516      * Retrive is a browser
517      *
518      * @return true if a browser
519      */
520     protected boolean isBrowser() {
521         return this.browser;
522     }
523 
524     /**
525      * Set true if only a Browser
526      *
527      * @param value
528      * @see ActiveMQQueueBrowser
529      */
530     protected void setBrowser(boolean value) {
531         this.browser = value;
532     }
533 
534     /**
535      * @return ActiveMQDestination
536      */
537     protected ActiveMQDestination getDestination() {
538         return this.destination;
539     }
540 
541     /**
542      * @return the startTime
543      */
544     protected long getStartTime() {
545         return startTime;
546     }
547 
548     protected void clearMessagesInProgress() {
549         messageQueue.clear();
550         stoppedQueue.clear();
551     }
552 
553     private void messageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired) {
554         afterMessageDelivered(message, messageRead, messageExpired, false);
555     }
556 
557     private void beforeMessageDelivered(ActiveMQMessage message) {
558         if (message == null) {
559             return;
560         }
561         boolean topic = destination != null && destination.isTopic();
562         message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
563         this.session.beforeMessageDelivered(message);
564     }
565 
566     private void afterMessageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired, boolean beforeCalled) {
567         if (message == null) {
568             return;
569         }
570 
571         boolean consumed = browser ? false : messageRead;
572         ActiveMQDestination destination = message.getJMSActiveMQDestination();
573         boolean topic = destination != null && destination.isTopic();
574         message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
575         this.session.afterMessageDelivered((isDurableSubscriber() || this.destination.isQueue()), message, consumed, messageExpired, beforeCalled);
576         if (messageRead) {
577             stats.onMessage(message);
578         }
579 
580     }
581 
582     public void start() {
583         running.set(true);
584         while( !stoppedQueue.isEmpty() ) {
585             ActiveMQMessage m = (ActiveMQMessage)stoppedQueue.removeFirst();
586             processMessage(m);
587         }
588     }
589 
590     synchronized public void stop() {
591         running.set(false);
592     }
593 }