Source code: com/presumo/jms/router/RouterAdapter.java
1 /**
2 * This file is part of Presumo.
3 *
4 * Presumo is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * Presumo is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with Presumo; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 *
19 * Copyright 2001 Dan Greff
20 */
21 package com.presumo.jms.router;
22
23 import com.presumo.jms.message.AckHelper;
24 import com.presumo.jms.message.JmsMessage;
25 import com.presumo.jms.plugin.MessageQueue;
26
27 import java.io.IOException;
28
29 import com.presumo.jms.resources.Resources;
30 import com.presumo.util.log.Logger;
31 import com.presumo.util.log.LoggerFactory;
32
33 /**
34 * Class to represent the functionality of starting stoping and closing
35 * a thread that is doing routing. This handles all of the multithreading
36 * logic for the router, and was pulled out to an abstract base class to
37 * simplify the actual Router implementation.
38 *
39 * WARNING: If you change ANYTHING in this class unit test!!!!!
40 */
41 public abstract class RouterAdapter implements Runnable
42 {
43
44 /////////////////////////////////////////////////////////////////////////
45 // Private Instance Variables //
46 /////////////////////////////////////////////////////////////////////////
47 private volatile boolean closed;
48 private volatile boolean stopRouting = true;
49 private int batchSize;
50 private final Object startStopLock = new String("startStopLock");
51 private final Object routerLock = new String("routerLock");
52 private Thread routingThread;
53 private final String threadName;
54 private MessageQueue inbox;
55
56 /////////////////////////////////////////////////////////////////////////
57 // Constructors //
58 /////////////////////////////////////////////////////////////////////////
59
60 public RouterAdapter(MessageQueue queue, int batchSize, String threadName)
61 {
62 super();
63 this.batchSize = batchSize;
64 this.inbox = queue;
65 this.threadName = threadName;
66 }
67
68 /////////////////////////////////////////////////////////////////////////
69 // Public Methods //
70 /////////////////////////////////////////////////////////////////////////
71
72 /**
73 * Implementation of runnable. The method will remain in the loop until
74 * closeRouter() is called. This loop will call routeMessages() while
75 * there are messages to be routed. If there are no messages to be routed
76 * the thread will wait() on the inbox until there are messages.
77 */
78 public void run()
79 {
80 logger.entry(threadName + ": run");
81 synchronized (startStopLock) {
82
83 for(;;) {
84
85 if (stopRouting == true)
86 break;
87
88 routeMessages(batchSize);
89
90 synchronized (inbox) {
91 while (stopRouting == false && inbox.size() == 0) {
92 try { inbox.wait(3000); } catch (InterruptedException ie) {}
93 if (stopRouting == false && inbox.size() == 0)
94 timerTick();
95 }
96 }
97 }
98 }
99
100 logger.exit(threadName + ": run");
101 }
102
103
104 /////////////////////////////////////////////////////////////////////////
105 // Public Methods //
106 /////////////////////////////////////////////////////////////////////////
107
108 public void setMessageQueue(MessageQueue queue)
109 {
110 logger.entry("setMessageQueue", queue);
111
112 synchronized(routerLock) {
113 if (stopRouting == true) throw new
114 IllegalStateException("Attempt to change message queue on running router");
115
116 synchronized (inbox) {
117 inbox = queue;
118 }
119 }
120 logger.exit("setMessageQueue");
121 }
122
123 /**
124 *
125 */
126 public void closeRouter()
127 {
128 logger.entry(threadName + ": closeRouter");
129
130 synchronized (routerLock) {
131 if (closed) return;
132 stopRouter();
133 closed = true;
134 }
135
136 logger.exit(threadName + ": closeRouter");
137 }
138
139
140 /**
141 * Starts the delivery of asynchronous messages.
142 */
143 public void startRouter()
144 {
145 logger.entry(threadName + ": startRouter");
146
147 synchronized (routerLock) {
148
149 if (closed)
150 throw new IllegalStateException("start called on a closed router");
151 if (stopRouting == false) return;
152
153 stopRouting = false;
154 routingThread = new Thread(this, threadName);
155 routingThread.start();
156
157 }
158
159 logger.exit(threadName + ": startRouter");
160 }
161
162 /**
163 * Stops the delivery of asynchronous messages.
164 */
165 public void stopRouter()
166 {
167 logger.entry(threadName + ": stopRouter");
168
169 synchronized (routerLock) {
170
171 if (closed) throw new IllegalStateException("stop() called on a closed connection");
172 if (stopRouting == true) return;
173
174 // bump the thread out of its inbox.wait
175 synchronized(inbox) {
176 stopRouting = true;
177 inbox.notifyAll();
178 }
179
180 try {
181 routingThread.join();
182 routingThread = null;
183 } catch (InterruptedException ie) {
184 ie.printStackTrace();
185 }
186 }
187 logger.exit(threadName + ": stopRouter");
188 }
189
190 /**
191 * The size of batches which the router processes messages is a potential
192 * touch point. Consequently I'm making it configurable on the fly.
193 */
194 public void setBatchSize(int batchSize)
195 {
196 this.batchSize = batchSize;
197 }
198
199 //////////////////////////////////////////////////////////////////////////
200 // Protected Methods //
201 //////////////////////////////////////////////////////////////////////////
202
203 /**
204 * Subclasses of this adapter implement this method to add the final
205 * piece of functionality. The thread embedded in this class will
206 * call this method when there are messages on the inbox that need
207 * to be routed.
208 *
209 * @param number Indicates a batch interval in terms of the number of
210 * messages routeMessages() should try to router before
211 * returning.
212 */
213 protected abstract void routeMessages(int number);
214
215 /**
216 * Called when the routing thread has done nothing for a while
217 */
218 protected void timerTick()
219 {
220 }
221
222 /**
223 * @return The next message off of the queue. The queue maintains FIFO.
224 */
225 protected final JmsMessage [] getNext(int batchsize) throws IOException
226 {
227 JmsMessage [] msgs = null;
228 synchronized (inbox) {
229 msgs = inbox.getNext(batchsize);
230 }
231 return msgs;
232 }
233
234 protected final int queueSize()
235 {
236 synchronized (inbox) {
237 return inbox.size();
238 }
239 }
240
241 /**
242 * Puts a message on the queue and notifies the routing thread.
243 */
244 protected final void queueMessage(JmsMessage msg) throws IOException
245 {
246 if (inbox.isPersistent()) {
247 AckHelper ah = msg.getAckHelper();
248 if (ah != null) {
249 ah.setMessageQueue(inbox);
250 }
251 }
252
253 synchronized (inbox) {
254 inbox.push(msg);
255 inbox.notifyAll();
256 }
257
258 }
259
260 /**
261 protected final void queueMessageFront(JmsMessage msg) throws IOException
262 {
263 // TODO::
264 synchronized (inbox) {
265 inbox.push(msg);
266 inbox.notifyAll();
267 }
268
269 }
270 **/
271
272 /**
273 * Puts an array of messages on the queue and notifies the routing thread.
274 */
275 protected final void queueMessages(JmsMessage [] msgs) throws IOException
276 {
277 if (inbox.isPersistent()) {
278 for (int i=0; i < msgs.length; ++i) {
279 AckHelper ah = msgs[i].getAckHelper();
280 if (ah != null) {
281 ah.setMessageQueue(inbox);
282 }
283 }
284 }
285
286 synchronized (inbox) {
287 inbox.push(msgs);
288 inbox.notifyAll();
289 }
290
291 }
292
293 ////////////////////////////// Misc stuff ////////////////////////////////
294 private static Logger logger =
295 LoggerFactory.getLogger(RouterAdapter.class, Resources.getBundle());
296 ///////////////////////////////////////////////////////////////////////////
297 }
298
299
300
301
302
303
304
305
306
307
308
309