Source code: com/presumo/jms/router/Router.java
1 /**
2 * This file is part of Presumo.
3 *
4 * Presumo is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * Presumo is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with Presumo; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Copyright (c) 2001, 2002 Dan Greff
19 */
20 package com.presumo.jms.router;
21
22 import com.presumo.jms.message.JmsMessage;
23 import com.presumo.jms.plugin.MessageQueue;
24 import com.presumo.jms.resources.Resources;
25 import com.presumo.jms.selector.JmsOperand;
26 import com.presumo.jms.selector.Parser;
27
28 import com.presumo.util.log.Logger;
29 import com.presumo.util.log.LoggerFactory;
30
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import javax.jms.ExceptionListener;
34 import javax.jms.DeliveryMode;
35 import javax.jms.JMSException;
36
37
38 /**
39 * The main routing functionality. The Router maintains a list of
40 * <code>RoutingTarget</code>'s, and for every message it is asked
41 * to route, it simply hands it to each routing target asking. The
42 * RoutingTarget itself determines if it needs the message and keeps
43 * it if so.
44 * <p>
45 * The Router is also responsible for maintaining the filters.
46 *
47 * @author Dan Greff
48 */
49 public final class Router extends RouterAdapter
50 {
51 /** Local instance of the parser singleton for convienance. **/
52 private final Parser parser;
53
54 /** All possible routing targets to route messages to **/
55 private RoutingTarget [] targets = new RoutingTarget[0];
56
57 /** Number of routing targets **/
58 private int numOfTargets;
59
60 /** Synchronization lock **/
61 private final Object targetChangeLock = "JVM_Router_Target_change_lock";
62
63 /** Data structure of all exception listeners **/
64 private final ArrayList eListeners = new ArrayList();
65
66 /** Number of messages routed by this instance **/
67 private long msgsRouted;
68
69 /** Name of the router **/
70 private String name = "router";
71
72 /////////////////////////////////////////////////////////////////////////
73 // Constructors //
74 /////////////////////////////////////////////////////////////////////////
75
76 public Router(MessageQueue queue)
77 {
78 super(queue, 100, "JVM Router");
79 parser = Parser.getInstance();
80 }
81
82 /////////////////////////////////////////////////////////////////////////
83 // Public Methods //
84 /////////////////////////////////////////////////////////////////////////
85
86 public final String getName()
87 {
88 return name;
89 }
90
91 public final void setName(String value)
92 {
93 name = value;
94 }
95
96 /**
97 * Add the given target to this instances' list of
98 * <code>RoutingTarget</code>'s to route to.
99 */
100 public final void addTarget(RoutingTarget target)
101 {
102 synchronized (targetChangeLock) {
103 logger.entry("addTarget: " + target.toString());
104
105 boolean targetAdded = false;
106 for (int i=0; i < targets.length; ++i) {
107 if (targets[i] == null) {
108 targets[i] = target;
109 target.setTargetID(i);
110 targetAdded = true;
111 break;
112 }
113 }
114
115 if (! targetAdded) {
116 int newSize = targets.length + 1;
117 int targetID = targets.length;
118 RoutingTarget [] tmp = new RoutingTarget[newSize];
119 System.arraycopy(targets, 0, tmp, 0, targets.length);
120 tmp[targetID] = target;
121 targets = tmp;
122 target.setTargetID(targetID);
123 }
124
125 ++numOfTargets;
126 recalculateFilters(true);
127
128 if (numOfTargets == 1)
129 this.startRouter();
130
131 logger.exit("addTarget");
132 }
133 }
134
135 /**
136 *
137 */
138 public final void removeTarget(RoutingTarget target)
139 {
140 synchronized (targetChangeLock)
141 {
142 logger.entry("removeTarget");
143
144 for (int i=0; i < targets.length; ++i) {
145 if (target.equals(targets[i])) {
146 targets[i] = null;
147 --numOfTargets;
148 recalculateFilters(false);
149 }
150 }
151
152 if (numOfTargets == 0) {
153 this.stopRouter();
154 }
155 logger.exit("removeTarget");
156 }
157 }
158
159 /**
160 *
161 */
162 public final void recalculateFilters(boolean filterAdded)
163 {
164 synchronized (targetChangeLock) {
165 logger.entry("recalculateFilters");
166
167 int i,j, targetLoc;
168
169 for (i=0; i < targets.length; ++i) {
170 RoutingTarget t = targets[i];
171 if (t != null && t.needsFilterUpdates()) {
172 if (numOfTargets == 1) {
173 try {
174 t.setRemoteRoutingFilter(parser.parseFilter("false"), false);
175 } catch (javax.jms.InvalidSelectorException ise) {}
176 break;
177 }
178 JmsOperand [] allFilters = new JmsOperand[numOfTargets-1];
179
180 for (j=0, targetLoc=0; targetLoc < targets.length; ++targetLoc) {
181
182 // Don't include the current or null filters.
183 if (targetLoc != i && targets[targetLoc] != null) {
184 allFilters[j] = targets[targetLoc].getRoutingFilter();
185 ++j;
186 }
187 }
188 JmsOperand joinedFilter = parser.orTogether(allFilters);
189 t.setRemoteRoutingFilter(joinedFilter, filterAdded);
190 }
191 } // end for(i
192
193 logger.exit("recalculateFilters");
194 } // end synchronized
195 }
196
197 public final void routeMessage(JmsMessage msg) throws IOException
198 {
199 ++msgsRouted;
200 queueMessage(msg);
201 }
202
203 public final void routeMessages(JmsMessage [] msgs) throws IOException
204 {
205 msgsRouted += msgs.length;
206 queueMessages(msgs);
207 }
208
209 public final void addExceptionListener(ExceptionListener listener)
210 {
211 synchronized (eListeners) {
212 eListeners.add(listener);
213 }
214 }
215
216 public final void removeExceptionListener(ExceptionListener listener)
217 {
218 synchronized (eListeners) {
219 eListeners.remove(listener);
220 }
221 }
222
223
224 /////////////////////////////////////////////////////////////////////////
225 // Protected Methods //
226 /////////////////////////////////////////////////////////////////////////
227
228 /**
229 *
230 *
231 */
232 protected final void routeMessages(int batchsize)
233 {
234
235 try {
236 JmsMessage [] msgs = getNext(batchsize); // Possible Disk I/O
237
238 if (msgs == null) return;
239 int length = msgs.length;
240
241 synchronized(targetChangeLock)
242 {
243 parser.obtainLock();
244 int targetLength = targets.length;
245
246 for (int i=0; i < length; i++)
247 {
248 JmsMessage message = msgs[i]; // save on array accesses
249 // TODO:: message.setDeleteBlock();
250 parser.resetEvaluateOnce(); // @see Parser
251
252 // For each target...
253 for (int j=0; j < targetLength; j++)
254 {
255 RoutingTarget t = targets[j];
256
257 if ( t != null) {
258 message = t.takeMessage(message);
259 if (message == null)
260 break;
261 }
262 }
263 // TODO:: message.removeDeleteBlock();
264 }
265 }
266 parser.releaseLock();
267 } catch (IOException ioe) {
268 // TODO:: handle resynch scenerio
269 }
270 }
271
272 /////////////////////////////////////////////////////////////////////////
273 // Private Methods //
274 /////////////////////////////////////////////////////////////////////////
275
276 /**
277 * Convienancs function to report an exception to all registered
278 * ExceptionListeners.
279 */
280 private void reportException(Exception e)
281 {
282 JMSException jmsex;
283 if (e instanceof JMSException) {
284 jmsex = (JMSException) e;
285 } else {
286 jmsex = new JMSException("An exception occurred in the Router: " +
287 e.toString());
288 jmsex.setLinkedException(e);
289 }
290 synchronized (eListeners) {
291 if (eListeners.size() == 0) {
292 jmsex.printStackTrace();
293 } else {
294 for (int i=0; i < eListeners.size(); ++i)
295 ( (ExceptionListener) eListeners.get(i)).onException(jmsex);
296 }
297 }
298 }
299
300 ////////////////////////////// Misc stuff ////////////////////////////////
301 private static Logger logger =
302 LoggerFactory.getLogger(Router.class, Resources.getBundle());
303 ///////////////////////////////////////////////////////////////////////////
304 }