Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/ematgine/utils/concurrent/TimeDaemon.java


1   /**
2    *  Ematgine server source file
3    *
4    *    Copyright (C) 2000-2001  <Mathieu Beauvais>
5    *
6    *    This program is free software; you can redistribute it and/or modify
7    *    it under the terms of the GNU General Public License as published by
8    *    the Free Software Foundation; either version 2 of the License, or
9    *    any later version.
10   *
11   *    This program is distributed in the hope that it will be useful,
12   *    but WITHOUT ANY WARRANTY; without even the implied warranty of
13   *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14   *    GNU General Public License for more details.
15  
16   *    You should have received a copy of the GNU General Public License
17   *    along with this program; if not, write to the Free Software
18   *    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19   *
20   *  Concurrent Versions System
21   *  $Id: TimeDaemon.java,v 1.2 2002/11/10 21:00:48 none Exp $
22   */
23  /*
24    File: At.java
25  
26    Originally written by Doug Lea and released into the public domain.
27    This may be used for any purposes whatsoever without acknowledgment.
28    Thanks for the assistance and support of Sun Microsystems Labs,
29    and everyone contributing, testing, and using this code.
30  
31    History:
32    Date       Who                What
33    29Aug1998  dl               created initial public version
34  */
35  
36  package org.ematgine.utils.concurrent;
37  import java.util.Comparator;
38  import java.util.Date;
39  
40  /**
41   * A general-purpose timer daemon, vaguely similar in functionality
42   * common system-level utilities like at (and the associated crond) in Unix.
43   * Objects of this class maintain a single thread and a task queue
44   * that may be used to execute Runnable commands in any of three modes --
45   * absolute (run at a given time), relative (run after a given delay),
46   * and periodic (cyclically run with a given delay).
47   * <p>
48   * All commands are executed by the single background thread. 
49   * The thread is not actually started until the first 
50   * request is encountered. Also, if the
51   * thread is stopped for any reason, one is started upon the next request.
52   * <p>
53   * If you would instead like commands run in their own threads, you can
54   * use as arguments Runnable commands that start their own threads
55   * (or perhaps wrap within ThreadedExecutors). 
56   * <p>
57   * You can also use multiple
58   * daemon objects, each using a different background thread. However,
59   * one of the reasons for using a time daemon is to pool together
60   * processing of infrequent tasks using a single background thread.
61   * <p>
62   * Background threads are created using a ThreadFactory. The
63   * default factory does <em>not</em>
64   * automatically <code>setDaemon</code> status.
65   *
66   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
67   **/
68  
69  public class TimeDaemon extends ThreadFactoryUser  {
70  
71  
72    /** tasks are maintained in a standard priority queue **/
73    protected final Heap heap_ = new Heap(DefaultChannelCapacity.get());
74  
75  
76    protected static class TaskNode implements Comparable {
77      final Runnable command;   // The command to run
78      final long period;        // The cycle period, or -1 if not periodic
79      private long timeToRun_;  // The time to run command
80  
81      // Cancellation does not immediately remove node, it just
82      // sets up lazy deletion bit, so is thrown away when next 
83      // encountered in run loop
84  
85      private boolean cancelled_ = false;
86  
87      // Access to cancellation status and and run time needs sync 
88      // since they can be written and read in different threads
89  
90      synchronized void setCancelled() { cancelled_ = true; }
91      synchronized boolean getCancelled() { return cancelled_; }
92  
93      synchronized void setTimeToRun(long w) { timeToRun_ = w; }
94      synchronized long getTimeToRun() { return timeToRun_; }
95      
96      
97      public int compareTo(Object other) {
98        long a = getTimeToRun();
99        long b = ((TaskNode)(other)).getTimeToRun();
100       return (a < b)? -1 : ((a == b)? 0 : 1);
101     }
102 
103     TaskNode(long w, Runnable c, long p) {
104       timeToRun_ = w; command = c; period = p;
105     }
106 
107     TaskNode(long w, Runnable c) {
108       timeToRun_ = w; command = c; period = -1;
109     }
110   }
111 
112 
113   /** 
114    * Execute the given command at the given time.
115    * @param date -- the absolute time to run the command, expressed
116    * as a java.util.Date.
117    * @param command -- the command to run at the given time.
118    * @return taskID -- an opaque reference that can be used to cancel execution request
119    **/
120   public Object executeAt(Date date, Runnable command) {
121     TaskNode task = new TaskNode(date.getTime(), command); 
122     heap_.insert(task);
123     restart();
124     return task;
125   }
126 
127   /** 
128    * Excecute the given command after waiting for the given delay.
129    * @param millisecondsToDelay -- the number of milliseconds
130    * from now to run the command.
131    * @param command -- the command to run after the delay.
132    * @return taskID -- an opaque reference that can be used to cancel execution request
133    * <p>
134    * <b>Sample Usage.</b>
135    * You can use a TimeDaemon to arrange timeout callbacks to break out
136    * of stuck IO. For example (code sketch):
137    * <pre>
138    * class X {   ...
139    * 
140    *   TimeDaemon timer = ...
141    *   Thread readerThread;
142    *   FileInputStream datafile;
143    * 
144    *   void startReadThread() {
145    *     datafile = new FileInputStream("data", ...);
146    * 
147    *     readerThread = new Thread(new Runnable() {
148    *      public void run() {
149    *        for(;;) {
150    *          // try to gracefully exit before blocking
151    *         if (Thread.currentThread().isInterrupted()) {
152    *           quietlyWrapUpAndReturn();
153    *         }
154    *         else {
155    *           try {
156    *             int c = datafile.read();
157    *             if (c == -1) break;
158    *             else process(c);
159    *           }
160    *           catch (IOException ex) {
161    *            cleanup();
162    *            return;
163    *          }
164    *       }
165    *     } };
166    *
167    *    readerThread.start();
168    *
169    *    // establish callback to cancel after 60 seconds
170    *    timer.executeAfterDelay(60000, new Runnable() {
171    *      readerThread.interrupt();    // try to interrupt thread
172    *      datafile.close(); // force thread to lose its input file 
173    *    });
174    *   } 
175    * }
176    * </pre>
177    **/
178   public Object executeAfterDelay(long millisecondsToDelay, Runnable command) {
179     long runtime = System.currentTimeMillis() + millisecondsToDelay;
180     TaskNode task = new TaskNode(runtime, command);
181     heap_.insert(task);
182     restart();
183     return task;
184   }
185 
186   /** 
187    * Execute the given command every <code>period</code> milliseconds.
188    * If <code>startNow</code> is true, execution begins immediately,
189    * otherwise, it begins after the first <code>period</code> delay.
190    * @param period -- the period, in milliseconds. Periods are
191    *  measured from start-of-task to the next start-of-task. It is
192    * generally a bad idea to use a period that is shorter than 
193    * the expected task duration.
194    * @param command -- the command to run at each cycle
195    * @param startNow -- true if the cycle should start with execution
196    * of the task now. Otherwise, the cycle starts with a delay of
197    * <code>period</code> milliseconds.
198    * @exception IllegalArgumentException if period less than or equal to zero.
199    * @return taskID -- an opaque reference that can be used to cancel execution request
200    **/
201   public Object executePeriodically(long period,
202                                     Runnable command, 
203                                     boolean startNow) {
204 
205     if (period <= 0) throw new IllegalArgumentException();
206 
207     long firstTime = System.currentTimeMillis();
208     if (!startNow) firstTime += period;
209 
210     TaskNode task = new TaskNode(firstTime, command, period); 
211     heap_.insert(task);
212     restart();
213     return task;
214   }
215 
216   /** 
217    * Cancel a scheduled task. The task will be cancelled
218    * upon the <em>next</em> opportunity to run it. This has no effect if
219    * this is a one-shot task that has already executed.
220    * If an execution is in progress, it will complete normally,
221    * but if it is a periodic task, future iterations are cancelled. 
222    * @param taskID -- a task reference returned by one of
223    * the execute commands
224    * @exception ClassCastException if the taskID argument is not 
225    * of the type returned by an execute command.
226    **/
227   public static void cancel(Object taskID) {
228     ((TaskNode)taskID).setCancelled();
229   }
230    
231 
232   /** The thread used to process commands **/
233   protected Thread thread_;
234 
235   
236   /**
237    * Return the thread being used to process commands, or
238    * null if there is no such thread. You can use this
239    * to invoke any special methods on the thread, for
240    * example, to interrupt it.
241    **/
242   public synchronized Thread getThread() { 
243     return thread_;
244   }
245 
246   /** set thread_ to null to indicate termination **/
247   protected synchronized void clearThread() {
248     thread_ = null;
249   }
250 
251   /**
252    * Start (or restart) a thread to process commands, or wake
253    * up an existing thread if one is already running.
254    **/
255 
256   protected synchronized void restart() {
257     if (thread_ == null) {
258       thread_ = threadFactory_.newThread(runLoop_);
259       thread_.start();
260     }
261     else
262       notify();
263   }
264 
265 
266   /**
267    * Cancel all tasks and interrupt the background thread executing
268    * the current task, if any.
269    * (A new background thread will be started if new execution
270    * requests are encountered.)
271    **/
272   public synchronized void shutDown() {
273     heap_.clear();
274     if (thread_ != null) 
275       thread_.interrupt();
276   }
277 
278   /** Return the next task to execute, or null if thread is interrupted **/
279   protected synchronized TaskNode nextTask() {
280 
281     try {
282 
283       for (;;) {
284 
285         if (Thread.interrupted()) return null;
286 
287         long waitTime = 0; // assume indefinite wait below
288 
289         // Using peek simplifies dealing with spurious wakeups
290 
291         TaskNode task = (TaskNode)(heap_.peek());
292 
293         if (task != null) {
294           long now = System.currentTimeMillis();
295           long when = task.getTimeToRun();
296 
297           if (when > now) // false alarm wakeup
298             waitTime = when - now;
299 
300           else {
301             // Even if a new task was inserted since peek
302             // the current least must be right one to return
303             task = (TaskNode)(heap_.extract());
304 
305             // Task cannot be null here, but could in plausible subclasses
306             if (task != null) {
307 
308               // Skip if cancelled
309               if (task.getCancelled()) {
310                 waitTime = -1; // bypass the wait below
311               }
312 
313               else {
314                 // If periodic, requeue 
315                 if (task.period > 0) {
316                   task.setTimeToRun(now + task.period);
317                   heap_.insert(task);
318                 }
319                 
320                 return task;
321               }
322             }
323           }
324         }
325         if (waitTime >= 0)
326           wait(waitTime);
327       }
328     }
329     catch (InterruptedException ex) {
330       return null;
331     }
332   }
333 
334   /**
335    * The runloop is isolated in its own Runnable class
336    * just so that the main 
337    * class need not implement Runnable,  which would
338    * allow others to directly invoke run, which would
339    * never make sense here.
340    **/
341 
342   protected class RunLoop implements Runnable {
343     public void run() {
344       try {
345         for (;;) {
346           TaskNode task = nextTask();
347           if (task != null) 
348             task.command.run();
349           else
350             break;
351         }
352       }
353       finally {
354         clearThread();
355       }
356     }
357   }
358 
359   protected final RunLoop runLoop_;
360 
361   /** 
362    * Create a new TimeDaemon 
363    **/
364 
365   public TimeDaemon() {
366     runLoop_ = new RunLoop();
367   }
368 
369     
370 
371 }