Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ActiveMQSessionExecutor.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq;
20  
21  import java.util.Iterator;
22  import java.util.List;
23  
24  import javax.jms.JMSException;
25  
26  import org.activemq.io.util.MemoryBoundedQueue;
27  import org.activemq.message.ActiveMQMessage;
28  
29  /**
30   * A utility class used by the Session for dispatching messages asynchronously to consumers
31   *
32   * @version $Revision: 1.1.1.1 $
33   * @see javax.jms.Session
34   */
35  public class ActiveMQSessionExecutor implements Runnable {
36      private ActiveMQSession session;
37      private MemoryBoundedQueue messageQueue;
38      private boolean closed;
39      private Thread runner;
40      private boolean dispatchedBySessionPool;
41      private boolean optimizedMessageDispatch;
42  
43      ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
44          this.session = session;
45          this.messageQueue = queue;
46      }
47  
48      void setDispatchedBySessionPool(boolean value) {
49          dispatchedBySessionPool = value;
50      }
51      
52      /**
53       * @return Returns the optimizedMessageDispatch.
54       */
55      boolean isOptimizedMessageDispatch() {
56          return optimizedMessageDispatch;
57      }
58      /**
59       * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
60       */
61      void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
62          this.optimizedMessageDispatch = optimizedMessageDispatch;
63      }
64  
65      void execute(ActiveMQMessage message) {
66          if (optimizedMessageDispatch && !dispatchedBySessionPool){
67              dispatch(message);
68          }else {
69              messageQueue.enqueue(message);
70          }
71         
72      }
73  
74      void executeFirst(ActiveMQMessage message) {
75          messageQueue.enqueueFirstNoBlock(message);
76      }
77  
78      boolean hasUncomsumedMessages() {
79          return !messageQueue.isEmpty();
80      }
81  
82      List getUnconsumedMessages() {
83        return messageQueue.getContents();
84      }
85      
86      /**
87       * implementation of Runnable
88       */
89      public void run() {
90          while (!closed && !dispatchedBySessionPool) {
91              ActiveMQMessage message = null;
92              try {
93                  message = (ActiveMQMessage) messageQueue.dequeue(100);
94              }
95              catch (InterruptedException ie) {
96              }
97              if (!closed) {
98                  if (message != null) {
99                      if (!dispatchedBySessionPool) {
100                         dispatch(message);
101                     }
102                     else {
103                         messageQueue.enqueueFirstNoBlock(message);
104                     }
105                 }
106             }
107         }
108     }
109     
110     void dispatch(ActiveMQMessage message){
111         for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
112             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
113             if (message.isConsumerTarget(consumer.getConsumerNumber())) {
114                 try {
115                     consumer.processMessage(message.shallowCopy());
116                 }
117                 catch (JMSException e) {
118                     this.session.connection.handleAsyncException(e);
119                 }
120             }
121         }
122     }
123 
124     synchronized void start() {
125         messageQueue.start();
126         if (runner == null && (!dispatchedBySessionPool || optimizedMessageDispatch)) {
127             runner = new Thread(this, "JmsSessionDispatcher: " + session.getSessionId());
128             runner.setPriority(Thread.MAX_PRIORITY);
129             //runner.setDaemon(true);
130             runner.start();
131         }
132     }
133 
134     synchronized void stop() {
135         messageQueue.stop();
136     }
137 
138     synchronized void close() {
139         closed = true;
140         messageQueue.close();
141     }
142 
143     void clear() {
144         messageQueue.clear();
145     }
146 
147     ActiveMQMessage dequeueNoWait() {
148         try {
149             return (ActiveMQMessage) messageQueue.dequeueNoWait();
150         }
151         catch (InterruptedException ie) {
152             return null;
153         }
154     }
155     
156     protected void clearMessagesInProgress(){
157         messageQueue.clear();
158     }
159     
160     
161 }