Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/presumo/jms/router/Router.java


1   /**
2    * This file is part of Presumo.
3    *
4    * Presumo is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU General Public License as published by
6    * the Free Software Foundation; either version 2 of the License, or
7    * (at your option) any later version.
8    *
9    * Presumo is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU General Public License
15   * along with Presumo; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17   *
18   * Copyright (c) 2001, 2002 Dan Greff
19   */
20  package com.presumo.jms.router;
21  
22  import com.presumo.jms.message.JmsMessage;
23  import com.presumo.jms.plugin.MessageQueue;
24  import com.presumo.jms.resources.Resources;
25  import com.presumo.jms.selector.JmsOperand;
26  import com.presumo.jms.selector.Parser;
27  
28  import com.presumo.util.log.Logger;
29  import com.presumo.util.log.LoggerFactory;
30  
31  import java.io.IOException;
32  import java.util.ArrayList;
33  import javax.jms.ExceptionListener;
34  import javax.jms.DeliveryMode;
35  import javax.jms.JMSException;
36  
37  
38  /**
39   * The main routing functionality.  The Router maintains a list of
40   * <code>RoutingTarget</code>'s, and for every message it is asked 
41   * to route,  it simply hands it to each routing target asking.  The 
42   * RoutingTarget itself determines if it needs the message and keeps 
43   * it if so.
44   * <p>
45   * The Router is also responsible for maintaining the filters.
46   *
47   * @author Dan Greff
48   */
49  public final class Router extends RouterAdapter
50  {
51    /** Local instance of the parser singleton for convienance. **/
52    private final Parser parser;
53    
54    /** All possible routing targets to route messages to **/
55    private RoutingTarget [] targets = new RoutingTarget[0];
56  
57    /** Number of routing targets **/
58    private int numOfTargets;
59  
60    /** Synchronization lock  **/
61    private final Object targetChangeLock = "JVM_Router_Target_change_lock";
62  
63    /** Data structure of all exception listeners **/
64    private final ArrayList eListeners = new ArrayList();
65  
66    /** Number of messages routed by this instance **/
67    private long msgsRouted;
68    
69    /** Name of the router **/
70    private String name = "router";
71    
72      /////////////////////////////////////////////////////////////////////////
73     // Constructors                                                        //
74    /////////////////////////////////////////////////////////////////////////
75  
76    public Router(MessageQueue queue)
77    {
78      super(queue, 100, "JVM Router");
79      parser = Parser.getInstance();
80    }
81  
82      /////////////////////////////////////////////////////////////////////////
83     // Public Methods                                                      //
84    /////////////////////////////////////////////////////////////////////////
85    
86    public final String getName()
87    {
88      return name;
89    }
90  
91    public final void setName(String value)
92    {
93      name = value;
94    }
95  
96    /**
97     * Add the given target to this instances' list of 
98     * <code>RoutingTarget</code>'s to route to.
99     */
100   public final void addTarget(RoutingTarget target)
101   {
102     synchronized (targetChangeLock) {
103       logger.entry("addTarget: " + target.toString());
104       
105       boolean targetAdded = false;
106       for (int i=0; i < targets.length; ++i) {
107         if (targets[i] == null) {
108           targets[i] = target;
109           target.setTargetID(i);
110           targetAdded = true;
111           break;
112         }
113       }
114     
115       if (! targetAdded) {
116         int newSize = targets.length + 1;
117         int targetID = targets.length;
118         RoutingTarget [] tmp = new RoutingTarget[newSize];
119         System.arraycopy(targets, 0, tmp, 0, targets.length);
120         tmp[targetID] = target;
121         targets = tmp;
122         target.setTargetID(targetID);
123       }
124       
125       ++numOfTargets;
126       recalculateFilters(true);
127       
128       if (numOfTargets == 1)
129         this.startRouter();
130         
131       logger.exit("addTarget");
132     }    
133   }
134   
135   /**
136    *
137    */
138   public final void removeTarget(RoutingTarget target)
139   {
140     synchronized (targetChangeLock)
141     {
142       logger.entry("removeTarget");
143       
144       for (int i=0; i < targets.length; ++i) {
145         if (target.equals(targets[i])) {
146           targets[i] = null;
147           --numOfTargets;
148           recalculateFilters(false);
149         }
150       }
151 
152       if (numOfTargets == 0) {
153         this.stopRouter();
154       }
155       logger.exit("removeTarget");
156     }
157   }
158   
159   /**
160    *
161    */
162   public final void recalculateFilters(boolean filterAdded)
163   {    
164     synchronized (targetChangeLock) {
165       logger.entry("recalculateFilters");
166 
167       int i,j, targetLoc;
168 
169       for (i=0; i < targets.length; ++i) {
170         RoutingTarget t = targets[i];
171         if (t != null && t.needsFilterUpdates()) {
172           if (numOfTargets == 1) {
173              try { 
174                t.setRemoteRoutingFilter(parser.parseFilter("false"), false);
175              } catch (javax.jms.InvalidSelectorException ise) {}
176              break;
177           }
178           JmsOperand [] allFilters = new JmsOperand[numOfTargets-1];
179           
180           for (j=0, targetLoc=0; targetLoc < targets.length; ++targetLoc) {
181  
182             // Don't include the current or null filters.
183             if (targetLoc != i && targets[targetLoc] != null) { 
184               allFilters[j] = targets[targetLoc].getRoutingFilter();
185               ++j;
186             }
187           }
188           JmsOperand joinedFilter = parser.orTogether(allFilters);
189           t.setRemoteRoutingFilter(joinedFilter, filterAdded);
190         }
191       } // end for(i
192       
193       logger.exit("recalculateFilters");
194     } // end synchronized
195   }
196 
197   public final void routeMessage(JmsMessage msg) throws IOException
198   {
199     ++msgsRouted;
200     queueMessage(msg);
201   }
202   
203   public final void routeMessages(JmsMessage [] msgs) throws IOException
204   {
205     msgsRouted += msgs.length;
206     queueMessages(msgs);
207   }
208   
209   public final void addExceptionListener(ExceptionListener listener)
210   {
211     synchronized (eListeners) {
212       eListeners.add(listener); 
213     }
214   }
215   
216   public final void removeExceptionListener(ExceptionListener listener)
217   {
218     synchronized (eListeners) {
219       eListeners.remove(listener);
220     }
221   }
222   
223 
224     /////////////////////////////////////////////////////////////////////////
225    // Protected Methods                                                   //
226   /////////////////////////////////////////////////////////////////////////
227   
228   /**
229    *
230    *
231    */
232   protected final void routeMessages(int batchsize)
233   {
234     
235     try {
236       JmsMessage [] msgs = getNext(batchsize); // Possible Disk I/O
237       
238       if (msgs == null) return;
239       int length = msgs.length;
240 
241       synchronized(targetChangeLock)
242       {
243         parser.obtainLock();
244         int targetLength = targets.length;
245         
246         for (int i=0; i < length; i++)
247         {        
248           JmsMessage message = msgs[i];        // save on array accesses
249           // TODO:: message.setDeleteBlock();
250           parser.resetEvaluateOnce();          // @see Parser       
251           
252           // For each target...
253           for (int j=0; j < targetLength; j++)
254           {
255             RoutingTarget t = targets[j];
256             
257             if ( t != null) {
258               message = t.takeMessage(message);
259               if (message == null)
260                 break;
261             }
262           }
263           // TODO:: message.removeDeleteBlock();
264         }
265       }
266       parser.releaseLock();
267     } catch (IOException ioe) {
268       // TODO:: handle resynch scenerio
269     } 
270   }
271 
272     /////////////////////////////////////////////////////////////////////////
273    // Private Methods                                                     //
274   /////////////////////////////////////////////////////////////////////////
275   
276   /**
277    * Convienancs function to report an exception to all registered
278    * ExceptionListeners.
279    */
280   private void reportException(Exception e) 
281   {
282     JMSException jmsex;
283     if (e instanceof JMSException) {
284       jmsex = (JMSException) e;
285     } else {
286       jmsex = new JMSException("An exception occurred in the Router: " + 
287                                e.toString());
288       jmsex.setLinkedException(e);
289     }
290     synchronized (eListeners) {
291       if (eListeners.size() == 0) {
292         jmsex.printStackTrace();
293       } else {
294         for (int i=0; i < eListeners.size(); ++i)
295           ( (ExceptionListener) eListeners.get(i)).onException(jmsex);
296       }
297     }
298   }
299  
300   ////////////////////////////// Misc  stuff ////////////////////////////////
301   private static Logger logger =
302           LoggerFactory.getLogger(Router.class, Resources.getBundle());
303   ///////////////////////////////////////////////////////////////////////////    
304 }