Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/presumo/jms/router/RouterAdapter.java


1   /**
2    * This file is part of Presumo.
3    *
4    * Presumo is free software; you can redistribute it and/or modify
5    * it under the terms of the GNU General Public License as published by
6    * the Free Software Foundation; either version 2 of the License, or
7    * (at your option) any later version.
8    *
9    * Presumo is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU General Public License
15   * along with Presumo; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17   *
18   *
19   * Copyright 2001 Dan Greff
20   */
21  package com.presumo.jms.router;
22  
23  import com.presumo.jms.message.AckHelper;
24  import com.presumo.jms.message.JmsMessage;
25  import com.presumo.jms.plugin.MessageQueue;
26  
27  import java.io.IOException;
28  
29  import com.presumo.jms.resources.Resources;
30  import com.presumo.util.log.Logger;
31  import com.presumo.util.log.LoggerFactory;
32  
33  /**
34   * Class to represent the functionality of starting stoping and closing
35   * a thread that is doing routing.  This handles all of the multithreading
36   * logic for the router, and was pulled out to an abstract base class to
37   * simplify the actual Router implementation.
38   *
39   * WARNING: If you change ANYTHING in this class unit test!!!!! 
40   */
41  public abstract class RouterAdapter implements Runnable
42  {
43  
44      /////////////////////////////////////////////////////////////////////////
45     // Private Instance Variables                                          //
46    /////////////////////////////////////////////////////////////////////////
47    private volatile boolean closed;
48    private volatile boolean stopRouting = true;
49    private int              batchSize;
50    private final Object     startStopLock = new String("startStopLock");
51    private final Object     routerLock    = new String("routerLock");
52    private Thread           routingThread;
53    private final String     threadName;
54    private MessageQueue     inbox;
55  
56      /////////////////////////////////////////////////////////////////////////
57     // Constructors                                                        //
58    /////////////////////////////////////////////////////////////////////////
59  
60    public RouterAdapter(MessageQueue queue, int batchSize, String threadName) 
61    { 
62      super(); 
63      this.batchSize = batchSize;
64      this.inbox = queue;
65      this.threadName = threadName;
66    }
67  
68      /////////////////////////////////////////////////////////////////////////
69     // Public Methods                                                      //
70    /////////////////////////////////////////////////////////////////////////
71  
72    /**
73     * Implementation of runnable.  The method will remain in the loop until
74     * closeRouter() is called.  This loop will call routeMessages() while
75     * there are messages to be routed.  If there are no messages to be routed
76     * the thread will wait() on the inbox until there are messages.
77     */
78    public void run() 
79    {
80      logger.entry(threadName + ": run");
81      synchronized (startStopLock) {
82        
83        for(;;) {
84          
85          if (stopRouting == true)
86            break;
87  
88          routeMessages(batchSize);
89  
90          synchronized (inbox) {
91            while (stopRouting == false && inbox.size() == 0) {
92              try { inbox.wait(3000); } catch (InterruptedException ie) {}
93              if (stopRouting == false && inbox.size() == 0)
94                timerTick();
95            }
96          }
97        }
98      }
99  
100     logger.exit(threadName + ": run");
101   }
102 
103     
104     /////////////////////////////////////////////////////////////////////////
105    // Public Methods                                                      //
106   /////////////////////////////////////////////////////////////////////////
107 
108   public void setMessageQueue(MessageQueue queue)
109   {
110     logger.entry("setMessageQueue", queue);
111 
112     synchronized(routerLock) {
113       if (stopRouting == true) throw new 
114         IllegalStateException("Attempt to change message queue on running router");
115       
116       synchronized (inbox) {
117         inbox = queue;
118       }
119     }
120     logger.exit("setMessageQueue");
121   }
122   
123   /**
124    *
125    */
126   public void closeRouter() 
127   {
128     logger.entry(threadName + ": closeRouter");
129     
130     synchronized (routerLock) {
131       if (closed) return;
132       stopRouter();
133       closed = true;
134     } 
135     
136     logger.exit(threadName + ": closeRouter");
137   }
138 
139 
140   /**
141    * Starts the delivery of asynchronous messages.
142    */
143   public void startRouter()  
144   {
145     logger.entry(threadName + ": startRouter");
146     
147     synchronized (routerLock) {
148       
149       if (closed) 
150         throw new IllegalStateException("start called on a closed router");
151       if (stopRouting == false) return;
152 
153       stopRouting = false;
154       routingThread = new Thread(this, threadName);
155       routingThread.start();
156 
157     }
158     
159     logger.exit(threadName + ": startRouter");
160   }
161 
162   /**
163    * Stops the delivery of asynchronous messages.
164    */
165   public void stopRouter() 
166   {
167     logger.entry(threadName + ": stopRouter");
168     
169     synchronized (routerLock) {
170       
171       if (closed) throw new IllegalStateException("stop() called on a closed connection");
172       if (stopRouting == true) return;
173       
174       // bump the thread out of its inbox.wait 
175       synchronized(inbox) {
176         stopRouting = true; 
177         inbox.notifyAll();
178       }
179       
180       try {
181         routingThread.join();
182         routingThread = null;
183       } catch (InterruptedException ie) {
184         ie.printStackTrace();  
185       }
186     }
187     logger.exit(threadName + ": stopRouter");
188   }
189 
190   /**
191    * The size of batches which the router processes messages is a potential
192    * touch point.  Consequently I'm making it configurable on the fly.
193    */
194   public void setBatchSize(int batchSize)
195   {
196     this.batchSize = batchSize;
197   }
198 
199     //////////////////////////////////////////////////////////////////////////
200    // Protected Methods                                                    //
201   //////////////////////////////////////////////////////////////////////////
202 
203   /**
204    * Subclasses of this adapter implement this method to add the final 
205    * piece of functionality.  The thread embedded in this class will
206    * call this method when there are messages on the inbox that need
207    * to be routed.
208    *
209    * @param number Indicates a batch interval in terms of the number of 
210    *               messages routeMessages() should try to router before
211    *               returning.
212    */
213   protected abstract void routeMessages(int number); 
214 
215   /**
216    * Called when the routing thread has done nothing for a while
217    */
218   protected void timerTick()
219   {
220   }
221   
222   /**
223    * @return The next message off of the queue.  The queue maintains FIFO.
224    */
225   protected final JmsMessage [] getNext(int batchsize) throws IOException
226   {
227     JmsMessage [] msgs = null;
228     synchronized (inbox) {
229       msgs = inbox.getNext(batchsize);
230     }
231     return msgs;
232   }
233 
234   protected final int queueSize()
235   {
236     synchronized (inbox) {
237       return inbox.size();
238     }
239   }
240 
241   /**
242    * Puts a message on the queue and notifies the routing thread.
243    */
244   protected final void queueMessage(JmsMessage msg) throws IOException
245   {
246     if (inbox.isPersistent()) {
247       AckHelper ah = msg.getAckHelper();
248       if (ah != null) {
249         ah.setMessageQueue(inbox);
250       }
251     }
252 
253     synchronized (inbox) {
254       inbox.push(msg);
255       inbox.notifyAll();
256     }
257 
258   }
259 
260   /**
261   protected final void queueMessageFront(JmsMessage msg) throws IOException
262   {
263     // TODO::
264     synchronized (inbox) {
265       inbox.push(msg);
266       inbox.notifyAll();
267     }
268 
269   }
270   **/
271 
272   /**
273    * Puts an array of messages on the queue and notifies the routing thread.
274    */
275   protected final void queueMessages(JmsMessage [] msgs) throws IOException
276   {
277     if (inbox.isPersistent()) {
278       for (int i=0; i < msgs.length; ++i) {
279         AckHelper ah = msgs[i].getAckHelper();
280         if (ah != null) {
281           ah.setMessageQueue(inbox);
282         }
283       }
284     }
285 
286     synchronized (inbox) {
287       inbox.push(msgs);
288       inbox.notifyAll();
289     }
290 
291   }
292 
293   ////////////////////////////// Misc  stuff ////////////////////////////////
294   private static Logger logger =
295      LoggerFactory.getLogger(RouterAdapter.class, Resources.getBundle());
296   ///////////////////////////////////////////////////////////////////////////     
297 }
298 
299 
300 
301 
302 
303 
304 
305 
306 
307 
308 
309