Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/ematgine/utils/concurrent/FJTaskRunnerGroup.java


1   /*
2     File: FJTaskRunnerGroup.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    7Jan1999   dl                 First public release
12    12Jan1999  dl                 made getActiveCount public; misc minor cleanup.
13    14Jan1999  dl                 Added executeTask
14    20Jan1999  dl                 Allow use of priorities; reformat stats
15    6Feb1999   dl                 Lazy thread starts
16    27Apr1999  dl                 Renamed
17  */
18  
19  package org.ematgine.utils.concurrent;
20  
21  /**
22   * A stripped down analog of a ThreadGroup used for
23   * establishing and managing FJTaskRunner threads.
24   * ThreadRunnerGroups serve as the control boundary separating
25   * the general world of normal threads from the specialized world
26   * of FJTasks. 
27   * <p>
28   * By intent, this class does not subclass java.lang.ThreadGroup, and
29   * does not support most methods found in ThreadGroups, since they
30   * would make no sense for FJTaskRunner threads. In fact, the class
31   * does not deal with ThreadGroups at all. If you want to restrict
32   * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
33   * it from within that ThreadGroup.
34   * <p>
35   * The main contextual parameter for a FJTaskRunnerGroup is
36   * the group size, established in the constructor. 
37   * Groups must be of a fixed size.
38   * There is no way to dynamically increase or decrease the number
39   * of threads in an existing group.
40   * <p>
41   * In general, the group size should be equal to the number
42   * of CPUs on the system. (Unfortunately, there is no portable
43   * means of automatically detecting the number of CPUs on a JVM, so there is
44   * no good way to automate defaults.)  In principle, when
45   * FJTasks are used for computation-intensive tasks, having only 
46   * as many threads as CPUs should minimize bookkeeping overhead
47   * and contention, and so maximize throughput. However, because
48   * FJTaskRunners lie atop Java threads, and in turn operating system
49   * thread support and scheduling policies, 
50   * it is very possible that using more threads
51   * than CPUs will improve overall throughput even though it adds
52   * to overhead. This will always be so if FJTasks are I/O bound.
53   * So it may pay to experiment a bit when tuning on particular platforms.
54   * You can also use <code>setRunPriorities</code> to either
55   * increase or decrease the priorities of active threads, which
56   * may interact with group size choice.
57   * <p>
58   * In any case, overestimating group sizes never
59   * seriously degrades performance (at least within reasonable bounds). 
60   * You can also use a value
61   * less than the number of CPUs in order to reserve processing
62   * for unrelated threads. 
63   * <p>
64   * There are two general styles for using a FJTaskRunnerGroup.
65   * You can create one group per entire program execution, for example 
66   * as a static singleton, and use it for all parallel tasks:
67   * <pre>
68   * class Tasks {
69   *   static FJTaskRunnerGroup group;
70   *   public void initialize(int groupsize) {
71   *      group = new FJTaskRunnerGroup(groupSize);
72   *   }
73   *   // ...
74   * }
75   * </pre>
76   * Alternatively, you can make new groups on the fly and use them only for
77   * particular task sets. This is more flexible,,
78   * and leads to more controllable and deterministic execution patterns,
79   * but it encounters greater overhead on startup. Also, to reclaim
80   * system resources, you should
81   * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
82   * using one-shot groups. Otherwise, because FJTaskRunners set 
83   * <code>Thread.isDaemon</code>
84   * status, they will not normally be reclaimed until program termination.
85   * <p>
86   * The main supported methods are <code>execute</code>,
87   * which starts a task processed by FJTaskRunner threads,
88   * and <code>invoke</code>, which starts one and waits for completion.
89   * For example, you might extend the above <code>FJTasks</code>
90   * class to support a task-based computation, say, the
91   * <code>Fib</code> class from the <code>FJTask</code> documentation:
92   * <pre>
93   * class Tasks { // continued
94   *   // ...
95   *   static int fib(int n) {
96   *     try {
97   *       Fib f = new Fib(n);
98   *       group.invoke(f);
99   *       return f.getAnswer();
100  *     }
101  *     catch (InterruptedException ex) {
102  *       throw new Error("Interrupted during computation");
103  *     }
104  *   }
105  * }
106  * </pre>
107  * <p>
108  * Method <code>stats()</code> can be used to monitor performance.
109  * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
110  * the compile-time constant COLLECT_STATS set to false. In this
111  * case, various simple counts reported in stats() are not collected.
112  * On platforms tested,
113  * this leads to such a tiny performance improvement that there is 
114  * very little motivation to bother.
115  *
116  * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
117  * <p>
118  * @see FJTask
119  * @see FJTaskRunner
120  **/
121 
122 public class FJTaskRunnerGroup implements Executor {
123 
124   /** The threads in this group **/
125   protected final FJTaskRunner[] threads;
126 
127   /** Group-wide queue for tasks entered via execute() **/
128   protected final LinkedQueue entryQueue = new LinkedQueue();
129 
130   /** Number of threads that are not waiting for work **/
131   protected int activeCount = 0;
132 
133   /** Number of threads that have been started. Used to avoid
134       unecessary contention during startup of task sets.
135   **/
136   protected int nstarted = 0;
137 
138   /**
139    * Compile-time constant. If true, various counts of
140    * runs, waits, etc., are maintained. These are NOT
141    * updated with synchronization, so statistics reports
142    * might not be accurate.
143    **/
144   
145   static final boolean COLLECT_STATS = true;
146   //  static final boolean COLLECT_STATS = false;
147 
148   // for stats
149 
150   /** The time at which this ThreadRunnerGroup was constructed **/
151   long initTime = 0;
152 
153   /** Total number of executes or invokes **/
154   int entries = 0;
155 
156   static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
157 
158   /** 
159    * Create a FJTaskRunnerGroup with the indicated number
160    * of FJTaskRunner threads. Normally, the best size to use is
161    * the number of CPUs on the system. 
162    * <p>
163    * The threads in a FJTaskRunnerGroup are created with their
164    * isDaemon status set, so do not normally need to be
165    * shut down manually upon program termination.
166    **/
167 
168   public FJTaskRunnerGroup(int groupSize) { 
169     threads = new FJTaskRunner[groupSize];
170     initializeThreads();
171     initTime = System.currentTimeMillis();
172   }
173 
174   /**
175    * Arrange for execution of the given task
176    * by placing it in a work queue. If the argument
177    * is not of type FJTask, it is embedded in a FJTask via 
178    * <code>FJTask.Wrap</code>.
179    * @exception InterruptedException if current Thread is
180    * currently interrupted 
181    **/
182 
183   public void execute(Runnable r) throws InterruptedException {
184     if (r instanceof FJTask) {
185       entryQueue.put((FJTask)r);
186     }
187     else {
188       entryQueue.put(new FJTask.Wrap(r));
189     }
190     signalNewTask();
191   }
192 
193 
194   /**
195    * Specialized form of execute called only from within FJTasks
196    **/
197   public void executeTask(FJTask t) {
198     try {
199       entryQueue.put(t);
200       signalNewTask();
201     }
202     catch (InterruptedException ex) {
203       Thread.currentThread().interrupt();
204     }
205   }
206 
207 
208   /**
209    * Start a task and wait it out. Returns when the task completes.
210    * @exception InterruptedException if current Thread is
211    * interrupted before completion of the task.
212    **/
213 
214   public void invoke(Runnable r) throws InterruptedException {
215     InvokableFJTask w = new InvokableFJTask(r);
216     entryQueue.put(w);
217     signalNewTask();
218     w.awaitTermination();
219   }
220 
221 
222   /**
223    * Try to shut down all FJTaskRunner threads in this group
224    * by interrupting them all. This method is designed
225    * to be used during cleanup when it is somehow known
226    * that all threads are idle.
227    * FJTaskRunners only
228    * check for interruption when they are not otherwise
229    * processing a task (and its generated subtasks,
230    * if any), so if any threads are active, shutdown may
231    * take a while, and may lead to unpredictable
232    * task processing.
233    **/
234 
235   public void interruptAll() {
236     // paranoically interrupt current thread last if in group.
237     Thread current = Thread.currentThread();
238     boolean stopCurrent = false;
239 
240     for (int i = 0; i < threads.length; ++i) {
241       Thread t = threads[i];
242       if (t == current) 
243         stopCurrent = true;
244       else
245         t.interrupt();
246     }
247     if (stopCurrent)
248       current.interrupt();
249   }
250 
251 
252   /**
253    * Set the priority to use while a FJTaskRunner is
254    * polling for new tasks to perform. Default
255    * is currently Thread.MIN_PRIORITY+1. The value
256    * set may not go into effect immediately, but
257    * will be used at least the next time a thread scans for work.
258    **/
259   public synchronized void setScanPriorities(int pri) {
260     for (int i = 0; i < threads.length; ++i) {
261       FJTaskRunner t = threads[i];
262       t.setScanPriority(pri);
263       if (!t.active) t.setPriority(pri);
264     }
265   }
266 
267 
268   /**
269    * Set the priority to use while a FJTaskRunner is
270    * actively running tasks. Default
271    * is the priority that was in effect by the thread that
272    * constructed this FJTaskRunnerGroup. Setting this value
273    * while threads are running may momentarily result in
274    * them running at this priority even when idly waiting for work.
275    **/
276   public synchronized void setRunPriorities(int pri) {
277     for (int i = 0; i < threads.length; ++i) {
278       FJTaskRunner t = threads[i];
279       t.setRunPriority(pri);
280       if (t.active) t.setPriority(pri);
281     }
282   }
283 
284     
285 
286   /** Return the number of FJTaskRunner threads in this group **/
287 
288   public int size() { return threads.length; }
289 
290 
291   /** 
292    * Return the number of threads that are not idly waiting for work.
293    * Beware that even active threads might not be doing any useful
294    * work, but just spinning waiting for other dependent tasks.
295    * Also, since this is just a snapshot value, some tasks
296    * may be in the process of becoming idle.
297    **/
298   public synchronized int getActiveCount() { return activeCount; }
299 
300   /**
301    * Prints various snapshot statistics to System.out.
302    * <ul>
303    *   <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
304    *         <em>n</em> from zero to group size - 1):
305    *     <ul>
306    *       <li> A star "*" is printed if the thread is currently active;
307    *            that is, not sleeping while waiting for work. Because
308    *            threads gradually enter sleep modes, an active thread
309    *            may in fact be about to sleep (or wake up).
310    *       <li> <em>Q Cap</em> The current capacity of its task queue.
311    *       <li> <em>Run</em> The total number of tasks that have been run.
312    *       <li> <em>New</em> The number of these tasks that were
313    *               taken from either the entry queue or from other 
314    *               thread queues; that is, the number of tasks run
315    *               that were <em>not</em> forked by the thread itself.
316    *       <li> <em>Scan</em> The number of times other task
317    *               queues or the entry queue were polled for tasks.
318    *     </ul>
319    *   <li> <em>Execute</em> The total number of tasks entered
320    *        (but not necessarily yet run) via execute or invoke.
321    *   <li> <em>Time</em> Time in seconds since construction of this
322    *         FJTaskRunnerGroup.
323    *   <li> <em>Rate</em> The total number of tasks processed
324    *          per second across all threads. This
325    *          may be useful as a simple throughput indicator
326    *          if all processed tasks take approximately the
327    *          same time to run.
328    * </ul>
329    * <p>
330    * Cautions: Some statistics are updated and gathered 
331    * without synchronization,
332    * so may not be accurate. However, reported counts may be considered
333    * as lower bounds of actual values. 
334    * Some values may be zero if classes are compiled
335    * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
336    * classes can be independently compiled with different values of
337    * COLLECT_STATS.) Also, the counts are maintained as ints so could
338    * overflow in exceptionally long-lived applications.
339    * <p>
340    * These statistics can be useful when tuning algorithms or diagnosing
341    * problems. For example:
342    * <ul>
343    *  <li> High numbers of scans may mean that there is insufficient
344    *      parallelism to keep threads busy. However, high scan rates
345    *      are expected if the number
346    *      of Executes is also high or there is a lot of global
347    *      synchronization in the application, and the system is not otherwise
348    *      busy. Threads may scan
349    *      for work hundreds of times upon startup, shutdown, and
350    *      global synch points of task sets.
351    *  <li> Large imbalances in tasks run across different threads might
352    *      just reflect contention with unrelated threads on a system
353    *      (possibly including JVM threads such as GC), but may also
354    *      indicate some systematic bias in how you generate tasks.
355    *  <li> Large task queue capacities may mean that too many tasks are being
356    *     generated before they can be run. 
357    *     Capacities are reported rather than current numbers of tasks
358    *     in queues because they are better indicators of the existence
359    *     of these kinds of possibly-transient problems.
360    *     Queue capacities are
361    *     resized on demand from their initial value of 4096 elements,
362    *     which is much more than sufficient for the kinds of 
363    *     applications that this framework is intended to best support.
364    * </ul>
365    **/
366 
367   public void stats() {
368     long time = System.currentTimeMillis() - initTime;
369     double secs = ((double)time) / 1000.0;
370     long totalRuns = 0;
371     long totalScans = 0;
372     long totalSteals = 0;
373 
374     System.out.print("Thread" +
375                      "\tQ Cap" +
376                        "\tScans" +
377                        "\tNew" +
378                        "\tRuns" +
379                        "\n");
380 
381     for (int i = 0; i < threads.length; ++i) {
382       FJTaskRunner t = threads[i];
383       int truns = t.runs;
384       totalRuns += truns;
385 
386       int tscans = t.scans;
387       totalScans += tscans;
388 
389       int tsteals = t.steals;
390       totalSteals += tsteals;
391 
392       String star = (getActive(t))? "*" : " ";
393 
394 
395       System.out.print("T" + i + star +
396                        "\t" + t.deqSize() +
397                        "\t" + tscans +
398                        "\t" + tsteals +
399                        "\t" + truns +
400                        "\n");
401     }
402 
403     System.out.print("Total" +
404                      "\t    " +
405                      "\t" + totalScans +
406                      "\t" + totalSteals +
407                      "\t" + totalRuns +
408                      "\n");
409 
410     System.out.print("Execute: " + entries); 
411     
412     System.out.print("\tTime: " + secs);
413 
414     long rps = 0;
415     if (secs != 0) rps = Math.round((double)(totalRuns) / secs);
416 
417     System.out.println("\tRate: " + rps);
418   }
419 
420 
421   /* ------------ Methods called only by FJTaskRunners ------------- */
422 
423 
424   /**
425    * Return the array of threads in this group. 
426    * Called only by FJTaskRunner.scan().
427    **/
428 
429   protected FJTaskRunner[] getArray() { return threads; }
430 
431 
432   /**
433    * Return a task from entry queue, or null if empty.
434    * Called only by FJTaskRunner.scan().
435    **/
436 
437   protected FJTask pollEntryQueue() {
438     try {
439       FJTask t = (FJTask)(entryQueue.poll(0));
440       return t;
441     }
442     catch(InterruptedException ex) { // ignore interrupts
443       Thread.currentThread().interrupt();
444       return null;
445     }
446   }
447 
448 
449   /**
450    * Return active status of t.
451    * Per-thread active status can only be accessed and
452    * modified via synchronized method here in the group class.
453    **/
454 
455   protected synchronized boolean getActive(FJTaskRunner t) {
456     return t.active;
457   }
458 
459 
460   /**
461    * Set active status of thread t to true, and notify others
462    * that might be waiting for work. 
463    **/
464 
465   protected synchronized void setActive(FJTaskRunner t) {
466     if (!t.active) { 
467       t.active = true;
468       ++activeCount;
469       if (nstarted < threads.length) 
470         threads[nstarted++].start();
471       else
472         notifyAll();
473     }
474   }
475 
476   /**
477    * Set active status of thread t to false.
478    **/
479 
480   protected synchronized void setInactive(FJTaskRunner t) {
481     if (t.active) { 
482       t.active = false;
483       --activeCount;
484     }
485   }
486 
487   /**
488    * The number of times to scan other threads for tasks 
489    * before transitioning to a mode where scans are
490    * interleaved with sleeps (actually timed waits).
491    * Upon transition, sleeps are for duration of
492    * scans / SCANS_PER_SLEEP milliseconds.
493    * <p>
494    * This is not treated as a user-tunable parameter because
495    * good values do not appear to vary much across JVMs or
496    * applications. Its main role is to help avoid some
497    * useless spinning and contention during task startup.
498    **/
499   static final long SCANS_PER_SLEEP = 15;
500 
501   /**
502    * The maximum time (in msecs) to sleep when a thread is idle,
503    * yet others are not, so may eventually generate work that
504    * the current thread can steal. This value reflects the maximum time
505    * that a thread may sleep when it possibly should not, because there
506    * are other active threads that might generate work. In practice,
507    * designs in which some threads become stalled because others
508    * are running yet not generating tasks are not likely to work
509    * well in this framework anyway, so the exact value does not matter
510    * too much. However, keeping it in the sub-second range does
511    * help smooth out startup and shutdown effects.
512    **/
513 
514   static final long MAX_SLEEP_TIME = 100;
515 
516   /**
517    * Set active status of thread t to false, and
518    * then wait until: (a) there is a task in the entry 
519    * queue, or (b) other threads are active, or (c) the current
520    * thread is interrupted. Upon return, it
521    * is not certain that there will be work available.
522    * The thread must itself check. 
523    * <p>
524    * The main underlying reason
525    * for these mechanics is that threads do not
526    * signal each other when they add elements to their queues.
527    * (This would add to task overhead, reduce locality.
528    * and increase contention.)
529    * So we must rely on a tamed form of polling. However, tasks
530    * inserted into the entry queue do result in signals, so
531    * tasks can wait on these if all of them are otherwise idle.
532    **/
533 
534   protected synchronized void checkActive(FJTaskRunner t, long scans) {
535 
536     setInactive(t);
537 
538     try {
539       // if nothing available, do a hard wait
540       if (activeCount == 0 && entryQueue.peek() == null) { 
541         wait();
542       }
543       else { 
544         // If there is possibly some work,
545         // sleep for a while before rechecking 
546 
547         long msecs = scans / SCANS_PER_SLEEP;
548         if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
549         int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
550         wait(msecs, nsecs);
551       }
552     }
553     catch (InterruptedException ex) {
554       notify(); // avoid lost notifies on interrupts
555       Thread.currentThread().interrupt();
556     }
557   }
558 
559   /* ------------ Utility methods  ------------- */
560 
561   /**
562    * Start or wake up any threads waiting for work
563    **/
564 
565   protected synchronized void signalNewTask() {
566     if (COLLECT_STATS) ++entries;
567     if (nstarted < threads.length) 
568        threads[nstarted++].start();
569     else
570       notify();
571   }
572 
573   /**
574    * Create all FJTaskRunner threads in this group.
575    **/
576 
577   protected void initializeThreads() {
578     for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
579   }
580 
581 
582 
583 
584   /**
585    * Wrap wait/notify mechanics around a task so that
586    * invoke() can wait it out 
587    **/
588   protected static final class InvokableFJTask extends FJTask {
589     protected final Runnable wrapped;
590     protected boolean terminated = false;
591 
592     protected InvokableFJTask(Runnable r) { wrapped = r; }
593 
594     public void run() {
595       try {
596         if (wrapped instanceof FJTask)
597           FJTask.invoke((FJTask)(wrapped));
598         else
599           wrapped.run();
600       }
601       finally {
602         setTerminated();
603       }
604     }
605 
606     protected synchronized void setTerminated() {
607       terminated = true;
608       notifyAll(); 
609     }
610 
611     protected synchronized void awaitTermination() throws InterruptedException {
612       while (!terminated) wait();
613     }
614   }
615 
616 
617 }
618