Source code: org/activemq/ActiveMQSessionExecutor.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq;
20
21 import java.util.Iterator;
22 import java.util.List;
23
24 import javax.jms.JMSException;
25
26 import org.activemq.io.util.MemoryBoundedQueue;
27 import org.activemq.message.ActiveMQMessage;
28
29 /**
30 * A utility class used by the Session for dispatching messages asynchronously to consumers
31 *
32 * @version $Revision: 1.1.1.1 $
33 * @see javax.jms.Session
34 */
35 public class ActiveMQSessionExecutor implements Runnable {
36 private ActiveMQSession session;
37 private MemoryBoundedQueue messageQueue;
38 private boolean closed;
39 private Thread runner;
40 private boolean dispatchedBySessionPool;
41 private boolean optimizedMessageDispatch;
42
43 ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
44 this.session = session;
45 this.messageQueue = queue;
46 }
47
48 void setDispatchedBySessionPool(boolean value) {
49 dispatchedBySessionPool = value;
50 }
51
52 /**
53 * @return Returns the optimizedMessageDispatch.
54 */
55 boolean isOptimizedMessageDispatch() {
56 return optimizedMessageDispatch;
57 }
58 /**
59 * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
60 */
61 void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
62 this.optimizedMessageDispatch = optimizedMessageDispatch;
63 }
64
65 void execute(ActiveMQMessage message) {
66 if (optimizedMessageDispatch && !dispatchedBySessionPool){
67 dispatch(message);
68 }else {
69 messageQueue.enqueue(message);
70 }
71
72 }
73
74 void executeFirst(ActiveMQMessage message) {
75 messageQueue.enqueueFirstNoBlock(message);
76 }
77
78 boolean hasUncomsumedMessages() {
79 return !messageQueue.isEmpty();
80 }
81
82 List getUnconsumedMessages() {
83 return messageQueue.getContents();
84 }
85
86 /**
87 * implementation of Runnable
88 */
89 public void run() {
90 while (!closed && !dispatchedBySessionPool) {
91 ActiveMQMessage message = null;
92 try {
93 message = (ActiveMQMessage) messageQueue.dequeue(100);
94 }
95 catch (InterruptedException ie) {
96 }
97 if (!closed) {
98 if (message != null) {
99 if (!dispatchedBySessionPool) {
100 dispatch(message);
101 }
102 else {
103 messageQueue.enqueueFirstNoBlock(message);
104 }
105 }
106 }
107 }
108 }
109
110 void dispatch(ActiveMQMessage message){
111 for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
112 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
113 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
114 try {
115 consumer.processMessage(message.shallowCopy());
116 }
117 catch (JMSException e) {
118 this.session.connection.handleAsyncException(e);
119 }
120 }
121 }
122 }
123
124 synchronized void start() {
125 messageQueue.start();
126 if (runner == null && (!dispatchedBySessionPool || optimizedMessageDispatch)) {
127 runner = new Thread(this, "JmsSessionDispatcher: " + session.getSessionId());
128 runner.setPriority(Thread.MAX_PRIORITY);
129 //runner.setDaemon(true);
130 runner.start();
131 }
132 }
133
134 synchronized void stop() {
135 messageQueue.stop();
136 }
137
138 synchronized void close() {
139 closed = true;
140 messageQueue.close();
141 }
142
143 void clear() {
144 messageQueue.clear();
145 }
146
147 ActiveMQMessage dequeueNoWait() {
148 try {
149 return (ActiveMQMessage) messageQueue.dequeueNoWait();
150 }
151 catch (InterruptedException ie) {
152 return null;
153 }
154 }
155
156 protected void clearMessagesInProgress(){
157 messageQueue.clear();
158 }
159
160
161 }