Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/apache/tomcat/util/threads/ThreadPool.java


1   /*
2    *  Copyright 1999-2005 The Apache Software Foundation
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package org.apache.tomcat.util.threads;
18  
19  import java.util.*;
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.tomcat.util.res.StringManager;
23  
24  /**
25   * A thread pool that is trying to copy the apache process management.
26   *
27   * Should we remove this in favor of Doug Lea's thread package?
28   *
29   * @author Gal Shachor
30   * @author Yoav Shapira <yoavs@apache.org>
31   */
32  public class ThreadPool  {
33  
34      private static Log log = LogFactory.getLog(ThreadPool.class);
35  
36      private static StringManager sm =
37          StringManager.getManager("org.apache.tomcat.util.threads.res");
38  
39      private static boolean logfull=true;
40  
41      /*
42       * Default values ...
43       */
44      public static final int MAX_THREADS = 200;
45      public static final int MAX_THREADS_MIN = 10;
46      public static final int MAX_SPARE_THREADS = 50;
47      public static final int MIN_SPARE_THREADS = 4;
48      public static final int WORK_WAIT_TIMEOUT = 60*1000;
49  
50      /*
51       * Where the threads are held.
52       */
53      protected ControlRunnable[] pool = null;
54  
55      /*
56       * A monitor thread that monitors the pool for idel threads.
57       */
58      protected MonitorRunnable monitor;
59  
60  
61      /*
62       * Max number of threads that you can open in the pool.
63       */
64      protected int maxThreads;
65  
66      /*
67       * Min number of idel threads that you can leave in the pool.
68       */
69      protected int minSpareThreads;
70  
71      /*
72       * Max number of idel threads that you can leave in the pool.
73       */
74      protected int maxSpareThreads;
75  
76      /*
77       * Number of threads in the pool.
78       */
79      protected int currentThreadCount;
80  
81      /*
82       * Number of busy threads in the pool.
83       */
84      protected int currentThreadsBusy;
85  
86      /*
87       * Flag that the pool should terminate all the threads and stop.
88       */
89      protected boolean stopThePool;
90  
91      /* Flag to control if the main thread is 'daemon' */
92      protected boolean isDaemon=true;
93  
94      /** The threads that are part of the pool.
95       * Key is Thread, value is the ControlRunnable
96       */
97      protected Hashtable threads=new Hashtable();
98  
99      protected Vector listeners=new Vector();
100 
101     /** Name of the threadpool
102      */
103     protected String name = "TP";
104 
105     /**
106      * Sequence.
107      */
108     protected int sequence = 1;
109 
110     /**
111      * Thread priority.
112      */
113     protected int threadPriority = Thread.NORM_PRIORITY;
114 
115 
116     /**
117      * Constructor.
118      */    
119     public ThreadPool() {
120         maxThreads = MAX_THREADS;
121         maxSpareThreads = MAX_SPARE_THREADS;
122         minSpareThreads = MIN_SPARE_THREADS;
123         currentThreadCount = 0;
124         currentThreadsBusy = 0;
125         stopThePool = false;
126     }
127 
128 
129     /** Create a ThreadPool instance.
130      *
131      * @param jmx UNUSED 
132      * @return ThreadPool instance. If JMX support is requested, you need to
133      *   call register() in order to set a name.
134      */
135     public static ThreadPool createThreadPool(boolean jmx) {
136         return new ThreadPool();
137     }
138 
139     public synchronized void start() {
140   stopThePool=false;
141         currentThreadCount  = 0;
142         currentThreadsBusy  = 0;
143 
144         adjustLimits();
145 
146         pool = new ControlRunnable[maxThreads];
147 
148         openThreads(minSpareThreads);
149         if (maxSpareThreads < maxThreads) {
150             monitor = new MonitorRunnable(this);
151         }
152     }
153 
154     public MonitorRunnable getMonitor() {
155         return monitor;
156     }
157   
158     /**
159      * Sets the thread priority for current
160      * and future threads in this pool.
161      *
162      * @param threadPriority The new priority
163      * @throws IllegalArgumentException If the specified
164      *  priority is less than Thread.MIN_PRIORITY or
165      *  more than Thread.MAX_PRIORITY 
166      */
167     public synchronized void setThreadPriority(int threadPriority) {
168         if(log.isDebugEnabled())
169             log.debug(getClass().getName() +
170                       ": setPriority(" + threadPriority + "): here.");
171 
172       if (threadPriority < Thread.MIN_PRIORITY) {
173         throw new IllegalArgumentException("new priority < MIN_PRIORITY");
174       } else if (threadPriority > Thread.MAX_PRIORITY) {
175         throw new IllegalArgumentException("new priority > MAX_PRIORITY");
176       }
177 
178       // Set for future threads
179       this.threadPriority = threadPriority;
180 
181       Enumeration currentThreads = getThreads();
182       Thread t = null;
183       while(currentThreads.hasMoreElements()) {
184         t = (Thread) currentThreads.nextElement();
185         t.setPriority(threadPriority);
186       } 
187     }
188 
189     /**
190      * Returns the priority level of current and
191      * future threads in this pool.
192      *
193      * @return The priority
194      */
195     public int getThreadPriority() {
196       return threadPriority;
197     }   
198      
199 
200     public void setMaxThreads(int maxThreads) {
201         this.maxThreads = maxThreads;
202     }
203 
204     public int getMaxThreads() {
205         return maxThreads;
206     }
207 
208     public void setMinSpareThreads(int minSpareThreads) {
209         this.minSpareThreads = minSpareThreads;
210     }
211 
212     public int getMinSpareThreads() {
213         return minSpareThreads;
214     }
215 
216     public void setMaxSpareThreads(int maxSpareThreads) {
217         this.maxSpareThreads = maxSpareThreads;
218     }
219 
220     public int getMaxSpareThreads() {
221         return maxSpareThreads;
222     }
223 
224     public int getCurrentThreadCount() {
225         return currentThreadCount;
226     }
227 
228     public int getCurrentThreadsBusy() {
229         return currentThreadsBusy;
230     }
231 
232     public boolean isDaemon() {
233         return isDaemon;
234     }
235 
236     public static int getDebug() {
237         return 0;
238     }
239 
240     /** The default is true - the created threads will be
241      *  in daemon mode. If set to false, the control thread
242      *  will not be daemon - and will keep the process alive.
243      */
244     public void setDaemon( boolean b ) {
245         isDaemon=b;
246     }
247     
248     public boolean getDaemon() {
249         return isDaemon;
250     }
251 
252     public void setName(String name) {
253         this.name = name;
254     }
255 
256     public String getName() {
257         return name;
258     }
259 
260     public int getSequence() {
261         return sequence++;
262     }
263 
264     public void addThread( Thread t, ControlRunnable cr ) {
265         threads.put( t, cr );
266         for( int i=0; i<listeners.size(); i++ ) {
267             ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
268             tpl.threadStart(this, t);
269         }
270     }
271 
272     public void removeThread( Thread t ) {
273         threads.remove(t);
274         for( int i=0; i<listeners.size(); i++ ) {
275             ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
276             tpl.threadEnd(this, t);
277         }
278     }
279 
280     public void addThreadPoolListener( ThreadPoolListener tpl ) {
281         listeners.addElement( tpl );
282     }
283 
284     public Enumeration getThreads(){
285         return threads.keys();
286     }
287 
288     public void run(Runnable r) {
289         ControlRunnable c = findControlRunnable();
290         c.runIt(r);
291     }    
292     
293     //
294     // You may wonder what you see here ... basically I am trying
295     // to maintain a stack of threads. This way locality in time
296     // is kept and there is a better chance to find residues of the
297     // thread in memory next time it runs.
298     //
299 
300     /**
301      * Executes a given Runnable on a thread in the pool, block if needed.
302      */
303     public void runIt(ThreadPoolRunnable r) {
304         if(null == r) {
305             throw new NullPointerException();
306         }
307 
308         ControlRunnable c = findControlRunnable();
309         c.runIt(r);
310     }
311 
312     private ControlRunnable findControlRunnable() {
313         ControlRunnable c=null;
314 
315         if ( stopThePool ) {
316             throw new IllegalStateException();
317         }
318 
319         // Obtain a free thread from the pool.
320         synchronized(this) {
321 
322             while (currentThreadsBusy == currentThreadCount) {
323                  // All threads are busy
324                 if (currentThreadCount < maxThreads) {
325                     // Not all threads were open,
326                     // Open new threads up to the max number of idel threads
327                     int toOpen = currentThreadCount + minSpareThreads;
328                     openThreads(toOpen);
329                 } else {
330                     logFull(log, currentThreadCount, maxThreads);
331                     // Wait for a thread to become idel.
332                     try {
333                         this.wait();
334                     }
335                     // was just catch Throwable -- but no other
336                     // exceptions can be thrown by wait, right?
337                     // So we catch and ignore this one, since
338                     // it'll never actually happen, since nowhere
339                     // do we say pool.interrupt().
340                     catch(InterruptedException e) {
341                         log.error("Unexpected exception", e);
342                     }
343         if( log.isDebugEnabled() ) {
344       log.debug("Finished waiting: CTC="+currentThreadCount +
345           ", CTB=" + currentThreadsBusy);
346                     }
347                     // Pool was stopped. Get away of the pool.
348                     if( stopThePool) {
349                         break;
350                     }
351                 }
352             }
353             // Pool was stopped. Get away of the pool.
354             if(0 == currentThreadCount || stopThePool) {
355                 throw new IllegalStateException();
356             }
357                     
358             // If we are here it means that there is a free thread. Take it.
359             int pos = currentThreadCount - currentThreadsBusy - 1;
360             c = pool[pos];
361             pool[pos] = null;
362             currentThreadsBusy++;
363 
364         }
365         return c;
366     }
367 
368     private static void logFull(Log loghelper, int currentThreadCount,
369                                 int maxThreads) {
370   if( logfull ) {
371             log.error(sm.getString("threadpool.busy",
372                                    new Integer(currentThreadCount),
373                                    new Integer(maxThreads)));
374             logfull=false;
375         } else if( log.isDebugEnabled() ) {
376             log.debug("All threads are busy " + currentThreadCount + " " +
377                       maxThreads );
378         }
379     }
380 
381     /**
382      * Stop the thread pool
383      */
384     public synchronized void shutdown() {
385         if(!stopThePool) {
386             stopThePool = true;
387             if (monitor != null) {
388                 monitor.terminate();
389                 monitor = null;
390             }
391             for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
392                 try {
393                     pool[i].terminate();
394                 } catch(Throwable t) {
395                     /*
396          * Do nothing... The show must go on, we are shutting
397          * down the pool and nothing should stop that.
398          */
399         log.error("Ignored exception while shutting down thread pool", t);
400                 }
401             }
402             currentThreadsBusy = currentThreadCount = 0;
403             pool = null;
404             notifyAll();
405         }
406     }
407 
408     /**
409      * Called by the monitor thread to harvest idle threads.
410      */
411     protected synchronized void checkSpareControllers() {
412 
413         if(stopThePool) {
414             return;
415         }
416 
417         if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
418             int toFree = currentThreadCount -
419                          currentThreadsBusy -
420                          maxSpareThreads;
421 
422             for(int i = 0 ; i < toFree ; i++) {
423                 ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1];
424                 c.terminate();
425                 pool[currentThreadCount - currentThreadsBusy - 1] = null;
426                 currentThreadCount --;
427             }
428 
429         }
430 
431     }
432 
433     /**
434      * Returns the thread to the pool.
435      * Called by threads as they are becoming idel.
436      */
437     protected synchronized void returnController(ControlRunnable c) {
438 
439         if(0 == currentThreadCount || stopThePool) {
440             c.terminate();
441             return;
442         }
443 
444         // atomic
445         currentThreadsBusy--;
446 
447         pool[currentThreadCount - currentThreadsBusy - 1] = c;
448         notify();
449     }
450 
451     /**
452      * Inform the pool that the specific thread finish.
453      *
454      * Called by the ControlRunnable.run() when the runnable
455      * throws an exception.
456      */
457     protected synchronized void notifyThreadEnd(ControlRunnable c) {
458         currentThreadsBusy--;
459         currentThreadCount --;
460         notify();
461     }
462 
463 
464     /*
465      * Checks for problematic configuration and fix it.
466      * The fix provides reasonable settings for a single CPU
467      * with medium load.
468      */
469     protected void adjustLimits() {
470         if(maxThreads <= 0) {
471             maxThreads = MAX_THREADS;
472         } else if (maxThreads < MAX_THREADS_MIN) {
473             log.warn(sm.getString("threadpool.max_threads_too_low",
474                                   new Integer(maxThreads),
475                                   new Integer(MAX_THREADS_MIN)));
476             maxThreads = MAX_THREADS_MIN;
477         }
478 
479         if(maxSpareThreads >= maxThreads) {
480             maxSpareThreads = maxThreads;
481         }
482 
483         if(maxSpareThreads <= 0) {
484             if(1 == maxThreads) {
485                 maxSpareThreads = 1;
486             } else {
487                 maxSpareThreads = maxThreads/2;
488             }
489         }
490 
491         if(minSpareThreads >  maxSpareThreads) {
492             minSpareThreads =  maxSpareThreads;
493         }
494 
495         if(minSpareThreads <= 0) {
496             if(1 == maxSpareThreads) {
497                 minSpareThreads = 1;
498             } else {
499                 minSpareThreads = maxSpareThreads/2;
500             }
501         }
502     }
503 
504     /** Create missing threads.
505      *
506      * @param toOpen Total number of threads we'll have open
507      */
508     protected void openThreads(int toOpen) {
509 
510         if(toOpen > maxThreads) {
511             toOpen = maxThreads;
512         }
513 
514         for(int i = currentThreadCount ; i < toOpen ; i++) {
515             pool[i - currentThreadsBusy] = new ControlRunnable(this);
516         }
517 
518         currentThreadCount = toOpen;
519     }
520 
521     /** @deprecated */
522     void log( String s ) {
523   log.info(s);
524   //loghelper.flush();
525     }
526     
527     /** 
528      * Periodically execute an action - cleanup in this case
529      */
530     public static class MonitorRunnable implements Runnable {
531         ThreadPool p;
532         Thread     t;
533         int interval=WORK_WAIT_TIMEOUT;
534         boolean    shouldTerminate;
535 
536         MonitorRunnable(ThreadPool p) {
537             this.p=p;
538             this.start();
539         }
540 
541         public void start() {
542             shouldTerminate = false;
543             t = new Thread(this);
544             t.setDaemon(p.getDaemon() );
545       t.setName(p.getName() + "-Monitor");
546             t.start();
547         }
548 
549         public void setInterval(int i ) {
550             this.interval=i;
551         }
552 
553         public void run() {
554             while(true) {
555                 try {
556 
557                     // Sleep for a while.
558                     synchronized(this) {
559                         this.wait(interval);
560                     }
561 
562                     // Check if should terminate.
563                     // termination happens when the pool is shutting down.
564                     if(shouldTerminate) {
565                         break;
566                     }
567 
568                     // Harvest idle threads.
569                     p.checkSpareControllers();
570 
571                 } catch(Throwable t) {
572         ThreadPool.log.error("Unexpected exception", t);
573                 }
574             }
575         }
576 
577         public void stop() {
578             this.terminate();
579         }
580 
581   /** Stop the monitor
582    */
583         public synchronized void terminate() {
584             shouldTerminate = true;
585             this.notify();
586         }
587     }
588 
589     /**
590      * A Thread object that executes various actions ( ThreadPoolRunnable )
591      *  under control of ThreadPool
592      */
593     public static class ControlRunnable implements Runnable {
594         /**
595    * ThreadPool where this thread will be returned
596    */
597         private ThreadPool p;
598 
599   /**
600    * The thread that executes the actions
601    */
602         private ThreadWithAttributes     t;
603 
604   /**
605    * The method that is executed in this thread
606    */
607         
608         private ThreadPoolRunnable   toRun;
609         private Runnable toRunRunnable;
610 
611   /**
612    * Stop this thread
613    */
614   private boolean    shouldTerminate;
615 
616   /**
617    * Activate the execution of the action
618    */
619         private boolean    shouldRun;
620 
621   /**
622    * Per thread data - can be used only if all actions are
623    *  of the same type.
624    *  A better mechanism is possible ( that would allow association of
625    *  thread data with action type ), but right now it's enough.
626    */
627   private boolean noThData;
628 
629   /**
630    * Start a new thread, with no method in it
631    */
632         ControlRunnable(ThreadPool p) {
633             toRun = null;
634             shouldTerminate = false;
635             shouldRun = false;
636             this.p = p;
637             t = new ThreadWithAttributes(p, this);
638             t.setDaemon(true);
639             t.setName(p.getName() + "-Processor" + p.getSequence());
640             t.setPriority(p.getThreadPriority());
641             p.addThread( t, this );
642       noThData=true;
643             t.start();
644         }
645 
646         public void run() {
647             boolean _shouldRun = false;
648             boolean _shouldTerminate = false;
649             ThreadPoolRunnable _toRun = null;
650             try {
651                 while (true) {
652                     try {
653                         /* Wait for work. */
654                         synchronized (this) {
655                             while (!shouldRun && !shouldTerminate) {
656                                 this.wait();
657                             }
658                             _shouldRun = shouldRun;
659                             _shouldTerminate = shouldTerminate;
660                             _toRun = toRun;
661                         }
662 
663                         if (_shouldTerminate) {
664                             if (ThreadPool.log.isDebugEnabled())
665                                 ThreadPool.log.debug("Terminate");
666                             break;
667                         }
668 
669                         /* Check if should execute a runnable.  */
670                         try {
671                             if (noThData) {
672                                 if (_toRun != null) {
673                                     Object thData[] = _toRun.getInitData();
674                                     t.setThreadData(p, thData);
675                                     if (ThreadPool.log.isDebugEnabled())
676                                         ThreadPool.log.debug(
677                                             "Getting new thread data");
678                                 }
679                                 noThData = false;
680                             }
681 
682                             if (_shouldRun) {
683                                 if (_toRun != null) {
684                                     _toRun.runIt(t.getThreadData(p));
685                                 } else if (toRunRunnable != null) {
686                                     toRunRunnable.run();
687                                 } else {
688                                     if (ThreadPool.log.isDebugEnabled())
689                                     ThreadPool.log.debug("No toRun ???");
690                                 }
691                             }
692                         } catch (Throwable t) {
693                             ThreadPool.log.error(sm.getString
694                                 ("threadpool.thread_error", t, toRun.toString()));
695                             /*
696                              * The runnable throw an exception (can be even a ThreadDeath),
697                              * signalling that the thread die.
698                              *
699                             * The meaning is that we should release the thread from
700                             * the pool.
701                             */
702                             _shouldTerminate = true;
703                             _shouldRun = false;
704                             p.notifyThreadEnd(this);
705                         } finally {
706                             if (_shouldRun) {
707                                 shouldRun = false;
708                                 /*
709                                 * Notify the pool that the thread is now idle.
710                                  */
711                                 p.returnController(this);
712                             }
713                         }
714 
715                         /*
716                         * Check if should terminate.
717                         * termination happens when the pool is shutting down.
718                         */
719                         if (_shouldTerminate) {
720                             break;
721                         }
722                     } catch (InterruptedException ie) { /* for the wait operation */
723                         // can never happen, since we don't call interrupt
724                         ThreadPool.log.error("Unexpected exception", ie);
725                     }
726                 }
727             } finally {
728                 p.removeThread(Thread.currentThread());
729             }
730         }
731         /** Run a task
732          *
733          * @param toRun
734          */
735         public synchronized void runIt(Runnable toRun) {
736       this.toRunRunnable = toRun;
737       // Do not re-init, the whole idea is to run init only once per
738       // thread - the pool is supposed to run a single task, that is
739       // initialized once.
740             // noThData = true;
741             shouldRun = true;
742             this.notify();
743         }
744 
745         /** Run a task
746          *
747          * @param toRun
748          */
749         public synchronized void runIt(ThreadPoolRunnable toRun) {
750       this.toRun = toRun;
751       // Do not re-init, the whole idea is to run init only once per
752       // thread - the pool is supposed to run a single task, that is
753       // initialized once.
754             // noThData = true;
755             shouldRun = true;
756             this.notify();
757         }
758 
759         public void stop() {
760             this.terminate();
761         }
762 
763         public void kill() {
764             t.stop();
765         }
766 
767         public synchronized void terminate() {
768             shouldTerminate = true;
769             this.notify();
770         }
771     }
772 
773     /** 
774      * Debug display of the stage of each thread. The return is html style,
775      * for display in the console ( it can be easily parsed too ).
776      *
777      * @return The thread status display
778      */
779     public String threadStatusString() {
780         StringBuffer sb=new StringBuffer();
781         Iterator it=threads.keySet().iterator();
782         sb.append("<ul>");
783         while( it.hasNext()) {
784             sb.append("<li>");
785             ThreadWithAttributes twa=(ThreadWithAttributes)
786                     it.next();
787             sb.append(twa.getCurrentStage(this) ).append(" ");
788             sb.append( twa.getParam(this));
789             sb.append( "</li>\n");
790         }
791         sb.append("</ul>");
792         return sb.toString();
793     }
794 
795     /** Return an array with the status of each thread. The status
796      * indicates the current request processing stage ( for tomcat ) or
797      * whatever the thread is doing ( if the application using TP provide
798      * this info )
799      *
800      * @return The status of all threads
801      */
802     public String[] getThreadStatus() {
803         String status[]=new String[ threads.size()];
804         Iterator it=threads.keySet().iterator();
805         for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
806             ThreadWithAttributes twa=(ThreadWithAttributes)
807                     it.next();
808             status[i]=twa.getCurrentStage(this);
809         }
810         return status;
811     }
812 
813     /** Return an array with the current "param" ( XXX better name ? )
814      * of each thread. This is typically the last request.
815      *
816      * @return The params of all threads
817      */
818     public String[] getThreadParam() {
819         String status[]=new String[ threads.size()];
820         Iterator it=threads.keySet().iterator();
821         for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
822             ThreadWithAttributes twa=(ThreadWithAttributes)
823                     it.next();
824             Object o=twa.getParam(this);
825             status[i]=(o==null)? null : o.toString();
826         }
827         return status;
828     }
829     
830     /** Interface to allow applications to be notified when
831      * a threads are created and stopped.
832      */
833     public static interface ThreadPoolListener {
834         public void threadStart( ThreadPool tp, Thread t);
835 
836         public void threadEnd( ThreadPool tp, Thread t);
837     }
838 }