Source code: org/apache/tomcat/util/threads/ThreadPool.java
1 /*
2 * Copyright 1999-2005 The Apache Software Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.tomcat.util.threads;
18
19 import java.util.*;
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.tomcat.util.res.StringManager;
23
24 /**
25 * A thread pool that is trying to copy the apache process management.
26 *
27 * Should we remove this in favor of Doug Lea's thread package?
28 *
29 * @author Gal Shachor
30 * @author Yoav Shapira <yoavs@apache.org>
31 */
32 public class ThreadPool {
33
34 private static Log log = LogFactory.getLog(ThreadPool.class);
35
36 private static StringManager sm =
37 StringManager.getManager("org.apache.tomcat.util.threads.res");
38
39 private static boolean logfull=true;
40
41 /*
42 * Default values ...
43 */
44 public static final int MAX_THREADS = 200;
45 public static final int MAX_THREADS_MIN = 10;
46 public static final int MAX_SPARE_THREADS = 50;
47 public static final int MIN_SPARE_THREADS = 4;
48 public static final int WORK_WAIT_TIMEOUT = 60*1000;
49
50 /*
51 * Where the threads are held.
52 */
53 protected ControlRunnable[] pool = null;
54
55 /*
56 * A monitor thread that monitors the pool for idel threads.
57 */
58 protected MonitorRunnable monitor;
59
60
61 /*
62 * Max number of threads that you can open in the pool.
63 */
64 protected int maxThreads;
65
66 /*
67 * Min number of idel threads that you can leave in the pool.
68 */
69 protected int minSpareThreads;
70
71 /*
72 * Max number of idel threads that you can leave in the pool.
73 */
74 protected int maxSpareThreads;
75
76 /*
77 * Number of threads in the pool.
78 */
79 protected int currentThreadCount;
80
81 /*
82 * Number of busy threads in the pool.
83 */
84 protected int currentThreadsBusy;
85
86 /*
87 * Flag that the pool should terminate all the threads and stop.
88 */
89 protected boolean stopThePool;
90
91 /* Flag to control if the main thread is 'daemon' */
92 protected boolean isDaemon=true;
93
94 /** The threads that are part of the pool.
95 * Key is Thread, value is the ControlRunnable
96 */
97 protected Hashtable threads=new Hashtable();
98
99 protected Vector listeners=new Vector();
100
101 /** Name of the threadpool
102 */
103 protected String name = "TP";
104
105 /**
106 * Sequence.
107 */
108 protected int sequence = 1;
109
110 /**
111 * Thread priority.
112 */
113 protected int threadPriority = Thread.NORM_PRIORITY;
114
115
116 /**
117 * Constructor.
118 */
119 public ThreadPool() {
120 maxThreads = MAX_THREADS;
121 maxSpareThreads = MAX_SPARE_THREADS;
122 minSpareThreads = MIN_SPARE_THREADS;
123 currentThreadCount = 0;
124 currentThreadsBusy = 0;
125 stopThePool = false;
126 }
127
128
129 /** Create a ThreadPool instance.
130 *
131 * @param jmx UNUSED
132 * @return ThreadPool instance. If JMX support is requested, you need to
133 * call register() in order to set a name.
134 */
135 public static ThreadPool createThreadPool(boolean jmx) {
136 return new ThreadPool();
137 }
138
139 public synchronized void start() {
140 stopThePool=false;
141 currentThreadCount = 0;
142 currentThreadsBusy = 0;
143
144 adjustLimits();
145
146 pool = new ControlRunnable[maxThreads];
147
148 openThreads(minSpareThreads);
149 if (maxSpareThreads < maxThreads) {
150 monitor = new MonitorRunnable(this);
151 }
152 }
153
154 public MonitorRunnable getMonitor() {
155 return monitor;
156 }
157
158 /**
159 * Sets the thread priority for current
160 * and future threads in this pool.
161 *
162 * @param threadPriority The new priority
163 * @throws IllegalArgumentException If the specified
164 * priority is less than Thread.MIN_PRIORITY or
165 * more than Thread.MAX_PRIORITY
166 */
167 public synchronized void setThreadPriority(int threadPriority) {
168 if(log.isDebugEnabled())
169 log.debug(getClass().getName() +
170 ": setPriority(" + threadPriority + "): here.");
171
172 if (threadPriority < Thread.MIN_PRIORITY) {
173 throw new IllegalArgumentException("new priority < MIN_PRIORITY");
174 } else if (threadPriority > Thread.MAX_PRIORITY) {
175 throw new IllegalArgumentException("new priority > MAX_PRIORITY");
176 }
177
178 // Set for future threads
179 this.threadPriority = threadPriority;
180
181 Enumeration currentThreads = getThreads();
182 Thread t = null;
183 while(currentThreads.hasMoreElements()) {
184 t = (Thread) currentThreads.nextElement();
185 t.setPriority(threadPriority);
186 }
187 }
188
189 /**
190 * Returns the priority level of current and
191 * future threads in this pool.
192 *
193 * @return The priority
194 */
195 public int getThreadPriority() {
196 return threadPriority;
197 }
198
199
200 public void setMaxThreads(int maxThreads) {
201 this.maxThreads = maxThreads;
202 }
203
204 public int getMaxThreads() {
205 return maxThreads;
206 }
207
208 public void setMinSpareThreads(int minSpareThreads) {
209 this.minSpareThreads = minSpareThreads;
210 }
211
212 public int getMinSpareThreads() {
213 return minSpareThreads;
214 }
215
216 public void setMaxSpareThreads(int maxSpareThreads) {
217 this.maxSpareThreads = maxSpareThreads;
218 }
219
220 public int getMaxSpareThreads() {
221 return maxSpareThreads;
222 }
223
224 public int getCurrentThreadCount() {
225 return currentThreadCount;
226 }
227
228 public int getCurrentThreadsBusy() {
229 return currentThreadsBusy;
230 }
231
232 public boolean isDaemon() {
233 return isDaemon;
234 }
235
236 public static int getDebug() {
237 return 0;
238 }
239
240 /** The default is true - the created threads will be
241 * in daemon mode. If set to false, the control thread
242 * will not be daemon - and will keep the process alive.
243 */
244 public void setDaemon( boolean b ) {
245 isDaemon=b;
246 }
247
248 public boolean getDaemon() {
249 return isDaemon;
250 }
251
252 public void setName(String name) {
253 this.name = name;
254 }
255
256 public String getName() {
257 return name;
258 }
259
260 public int getSequence() {
261 return sequence++;
262 }
263
264 public void addThread( Thread t, ControlRunnable cr ) {
265 threads.put( t, cr );
266 for( int i=0; i<listeners.size(); i++ ) {
267 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
268 tpl.threadStart(this, t);
269 }
270 }
271
272 public void removeThread( Thread t ) {
273 threads.remove(t);
274 for( int i=0; i<listeners.size(); i++ ) {
275 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
276 tpl.threadEnd(this, t);
277 }
278 }
279
280 public void addThreadPoolListener( ThreadPoolListener tpl ) {
281 listeners.addElement( tpl );
282 }
283
284 public Enumeration getThreads(){
285 return threads.keys();
286 }
287
288 public void run(Runnable r) {
289 ControlRunnable c = findControlRunnable();
290 c.runIt(r);
291 }
292
293 //
294 // You may wonder what you see here ... basically I am trying
295 // to maintain a stack of threads. This way locality in time
296 // is kept and there is a better chance to find residues of the
297 // thread in memory next time it runs.
298 //
299
300 /**
301 * Executes a given Runnable on a thread in the pool, block if needed.
302 */
303 public void runIt(ThreadPoolRunnable r) {
304 if(null == r) {
305 throw new NullPointerException();
306 }
307
308 ControlRunnable c = findControlRunnable();
309 c.runIt(r);
310 }
311
312 private ControlRunnable findControlRunnable() {
313 ControlRunnable c=null;
314
315 if ( stopThePool ) {
316 throw new IllegalStateException();
317 }
318
319 // Obtain a free thread from the pool.
320 synchronized(this) {
321
322 while (currentThreadsBusy == currentThreadCount) {
323 // All threads are busy
324 if (currentThreadCount < maxThreads) {
325 // Not all threads were open,
326 // Open new threads up to the max number of idel threads
327 int toOpen = currentThreadCount + minSpareThreads;
328 openThreads(toOpen);
329 } else {
330 logFull(log, currentThreadCount, maxThreads);
331 // Wait for a thread to become idel.
332 try {
333 this.wait();
334 }
335 // was just catch Throwable -- but no other
336 // exceptions can be thrown by wait, right?
337 // So we catch and ignore this one, since
338 // it'll never actually happen, since nowhere
339 // do we say pool.interrupt().
340 catch(InterruptedException e) {
341 log.error("Unexpected exception", e);
342 }
343 if( log.isDebugEnabled() ) {
344 log.debug("Finished waiting: CTC="+currentThreadCount +
345 ", CTB=" + currentThreadsBusy);
346 }
347 // Pool was stopped. Get away of the pool.
348 if( stopThePool) {
349 break;
350 }
351 }
352 }
353 // Pool was stopped. Get away of the pool.
354 if(0 == currentThreadCount || stopThePool) {
355 throw new IllegalStateException();
356 }
357
358 // If we are here it means that there is a free thread. Take it.
359 int pos = currentThreadCount - currentThreadsBusy - 1;
360 c = pool[pos];
361 pool[pos] = null;
362 currentThreadsBusy++;
363
364 }
365 return c;
366 }
367
368 private static void logFull(Log loghelper, int currentThreadCount,
369 int maxThreads) {
370 if( logfull ) {
371 log.error(sm.getString("threadpool.busy",
372 new Integer(currentThreadCount),
373 new Integer(maxThreads)));
374 logfull=false;
375 } else if( log.isDebugEnabled() ) {
376 log.debug("All threads are busy " + currentThreadCount + " " +
377 maxThreads );
378 }
379 }
380
381 /**
382 * Stop the thread pool
383 */
384 public synchronized void shutdown() {
385 if(!stopThePool) {
386 stopThePool = true;
387 if (monitor != null) {
388 monitor.terminate();
389 monitor = null;
390 }
391 for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
392 try {
393 pool[i].terminate();
394 } catch(Throwable t) {
395 /*
396 * Do nothing... The show must go on, we are shutting
397 * down the pool and nothing should stop that.
398 */
399 log.error("Ignored exception while shutting down thread pool", t);
400 }
401 }
402 currentThreadsBusy = currentThreadCount = 0;
403 pool = null;
404 notifyAll();
405 }
406 }
407
408 /**
409 * Called by the monitor thread to harvest idle threads.
410 */
411 protected synchronized void checkSpareControllers() {
412
413 if(stopThePool) {
414 return;
415 }
416
417 if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
418 int toFree = currentThreadCount -
419 currentThreadsBusy -
420 maxSpareThreads;
421
422 for(int i = 0 ; i < toFree ; i++) {
423 ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1];
424 c.terminate();
425 pool[currentThreadCount - currentThreadsBusy - 1] = null;
426 currentThreadCount --;
427 }
428
429 }
430
431 }
432
433 /**
434 * Returns the thread to the pool.
435 * Called by threads as they are becoming idel.
436 */
437 protected synchronized void returnController(ControlRunnable c) {
438
439 if(0 == currentThreadCount || stopThePool) {
440 c.terminate();
441 return;
442 }
443
444 // atomic
445 currentThreadsBusy--;
446
447 pool[currentThreadCount - currentThreadsBusy - 1] = c;
448 notify();
449 }
450
451 /**
452 * Inform the pool that the specific thread finish.
453 *
454 * Called by the ControlRunnable.run() when the runnable
455 * throws an exception.
456 */
457 protected synchronized void notifyThreadEnd(ControlRunnable c) {
458 currentThreadsBusy--;
459 currentThreadCount --;
460 notify();
461 }
462
463
464 /*
465 * Checks for problematic configuration and fix it.
466 * The fix provides reasonable settings for a single CPU
467 * with medium load.
468 */
469 protected void adjustLimits() {
470 if(maxThreads <= 0) {
471 maxThreads = MAX_THREADS;
472 } else if (maxThreads < MAX_THREADS_MIN) {
473 log.warn(sm.getString("threadpool.max_threads_too_low",
474 new Integer(maxThreads),
475 new Integer(MAX_THREADS_MIN)));
476 maxThreads = MAX_THREADS_MIN;
477 }
478
479 if(maxSpareThreads >= maxThreads) {
480 maxSpareThreads = maxThreads;
481 }
482
483 if(maxSpareThreads <= 0) {
484 if(1 == maxThreads) {
485 maxSpareThreads = 1;
486 } else {
487 maxSpareThreads = maxThreads/2;
488 }
489 }
490
491 if(minSpareThreads > maxSpareThreads) {
492 minSpareThreads = maxSpareThreads;
493 }
494
495 if(minSpareThreads <= 0) {
496 if(1 == maxSpareThreads) {
497 minSpareThreads = 1;
498 } else {
499 minSpareThreads = maxSpareThreads/2;
500 }
501 }
502 }
503
504 /** Create missing threads.
505 *
506 * @param toOpen Total number of threads we'll have open
507 */
508 protected void openThreads(int toOpen) {
509
510 if(toOpen > maxThreads) {
511 toOpen = maxThreads;
512 }
513
514 for(int i = currentThreadCount ; i < toOpen ; i++) {
515 pool[i - currentThreadsBusy] = new ControlRunnable(this);
516 }
517
518 currentThreadCount = toOpen;
519 }
520
521 /** @deprecated */
522 void log( String s ) {
523 log.info(s);
524 //loghelper.flush();
525 }
526
527 /**
528 * Periodically execute an action - cleanup in this case
529 */
530 public static class MonitorRunnable implements Runnable {
531 ThreadPool p;
532 Thread t;
533 int interval=WORK_WAIT_TIMEOUT;
534 boolean shouldTerminate;
535
536 MonitorRunnable(ThreadPool p) {
537 this.p=p;
538 this.start();
539 }
540
541 public void start() {
542 shouldTerminate = false;
543 t = new Thread(this);
544 t.setDaemon(p.getDaemon() );
545 t.setName(p.getName() + "-Monitor");
546 t.start();
547 }
548
549 public void setInterval(int i ) {
550 this.interval=i;
551 }
552
553 public void run() {
554 while(true) {
555 try {
556
557 // Sleep for a while.
558 synchronized(this) {
559 this.wait(interval);
560 }
561
562 // Check if should terminate.
563 // termination happens when the pool is shutting down.
564 if(shouldTerminate) {
565 break;
566 }
567
568 // Harvest idle threads.
569 p.checkSpareControllers();
570
571 } catch(Throwable t) {
572 ThreadPool.log.error("Unexpected exception", t);
573 }
574 }
575 }
576
577 public void stop() {
578 this.terminate();
579 }
580
581 /** Stop the monitor
582 */
583 public synchronized void terminate() {
584 shouldTerminate = true;
585 this.notify();
586 }
587 }
588
589 /**
590 * A Thread object that executes various actions ( ThreadPoolRunnable )
591 * under control of ThreadPool
592 */
593 public static class ControlRunnable implements Runnable {
594 /**
595 * ThreadPool where this thread will be returned
596 */
597 private ThreadPool p;
598
599 /**
600 * The thread that executes the actions
601 */
602 private ThreadWithAttributes t;
603
604 /**
605 * The method that is executed in this thread
606 */
607
608 private ThreadPoolRunnable toRun;
609 private Runnable toRunRunnable;
610
611 /**
612 * Stop this thread
613 */
614 private boolean shouldTerminate;
615
616 /**
617 * Activate the execution of the action
618 */
619 private boolean shouldRun;
620
621 /**
622 * Per thread data - can be used only if all actions are
623 * of the same type.
624 * A better mechanism is possible ( that would allow association of
625 * thread data with action type ), but right now it's enough.
626 */
627 private boolean noThData;
628
629 /**
630 * Start a new thread, with no method in it
631 */
632 ControlRunnable(ThreadPool p) {
633 toRun = null;
634 shouldTerminate = false;
635 shouldRun = false;
636 this.p = p;
637 t = new ThreadWithAttributes(p, this);
638 t.setDaemon(true);
639 t.setName(p.getName() + "-Processor" + p.getSequence());
640 t.setPriority(p.getThreadPriority());
641 p.addThread( t, this );
642 noThData=true;
643 t.start();
644 }
645
646 public void run() {
647 boolean _shouldRun = false;
648 boolean _shouldTerminate = false;
649 ThreadPoolRunnable _toRun = null;
650 try {
651 while (true) {
652 try {
653 /* Wait for work. */
654 synchronized (this) {
655 while (!shouldRun && !shouldTerminate) {
656 this.wait();
657 }
658 _shouldRun = shouldRun;
659 _shouldTerminate = shouldTerminate;
660 _toRun = toRun;
661 }
662
663 if (_shouldTerminate) {
664 if (ThreadPool.log.isDebugEnabled())
665 ThreadPool.log.debug("Terminate");
666 break;
667 }
668
669 /* Check if should execute a runnable. */
670 try {
671 if (noThData) {
672 if (_toRun != null) {
673 Object thData[] = _toRun.getInitData();
674 t.setThreadData(p, thData);
675 if (ThreadPool.log.isDebugEnabled())
676 ThreadPool.log.debug(
677 "Getting new thread data");
678 }
679 noThData = false;
680 }
681
682 if (_shouldRun) {
683 if (_toRun != null) {
684 _toRun.runIt(t.getThreadData(p));
685 } else if (toRunRunnable != null) {
686 toRunRunnable.run();
687 } else {
688 if (ThreadPool.log.isDebugEnabled())
689 ThreadPool.log.debug("No toRun ???");
690 }
691 }
692 } catch (Throwable t) {
693 ThreadPool.log.error(sm.getString
694 ("threadpool.thread_error", t, toRun.toString()));
695 /*
696 * The runnable throw an exception (can be even a ThreadDeath),
697 * signalling that the thread die.
698 *
699 * The meaning is that we should release the thread from
700 * the pool.
701 */
702 _shouldTerminate = true;
703 _shouldRun = false;
704 p.notifyThreadEnd(this);
705 } finally {
706 if (_shouldRun) {
707 shouldRun = false;
708 /*
709 * Notify the pool that the thread is now idle.
710 */
711 p.returnController(this);
712 }
713 }
714
715 /*
716 * Check if should terminate.
717 * termination happens when the pool is shutting down.
718 */
719 if (_shouldTerminate) {
720 break;
721 }
722 } catch (InterruptedException ie) { /* for the wait operation */
723 // can never happen, since we don't call interrupt
724 ThreadPool.log.error("Unexpected exception", ie);
725 }
726 }
727 } finally {
728 p.removeThread(Thread.currentThread());
729 }
730 }
731 /** Run a task
732 *
733 * @param toRun
734 */
735 public synchronized void runIt(Runnable toRun) {
736 this.toRunRunnable = toRun;
737 // Do not re-init, the whole idea is to run init only once per
738 // thread - the pool is supposed to run a single task, that is
739 // initialized once.
740 // noThData = true;
741 shouldRun = true;
742 this.notify();
743 }
744
745 /** Run a task
746 *
747 * @param toRun
748 */
749 public synchronized void runIt(ThreadPoolRunnable toRun) {
750 this.toRun = toRun;
751 // Do not re-init, the whole idea is to run init only once per
752 // thread - the pool is supposed to run a single task, that is
753 // initialized once.
754 // noThData = true;
755 shouldRun = true;
756 this.notify();
757 }
758
759 public void stop() {
760 this.terminate();
761 }
762
763 public void kill() {
764 t.stop();
765 }
766
767 public synchronized void terminate() {
768 shouldTerminate = true;
769 this.notify();
770 }
771 }
772
773 /**
774 * Debug display of the stage of each thread. The return is html style,
775 * for display in the console ( it can be easily parsed too ).
776 *
777 * @return The thread status display
778 */
779 public String threadStatusString() {
780 StringBuffer sb=new StringBuffer();
781 Iterator it=threads.keySet().iterator();
782 sb.append("<ul>");
783 while( it.hasNext()) {
784 sb.append("<li>");
785 ThreadWithAttributes twa=(ThreadWithAttributes)
786 it.next();
787 sb.append(twa.getCurrentStage(this) ).append(" ");
788 sb.append( twa.getParam(this));
789 sb.append( "</li>\n");
790 }
791 sb.append("</ul>");
792 return sb.toString();
793 }
794
795 /** Return an array with the status of each thread. The status
796 * indicates the current request processing stage ( for tomcat ) or
797 * whatever the thread is doing ( if the application using TP provide
798 * this info )
799 *
800 * @return The status of all threads
801 */
802 public String[] getThreadStatus() {
803 String status[]=new String[ threads.size()];
804 Iterator it=threads.keySet().iterator();
805 for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
806 ThreadWithAttributes twa=(ThreadWithAttributes)
807 it.next();
808 status[i]=twa.getCurrentStage(this);
809 }
810 return status;
811 }
812
813 /** Return an array with the current "param" ( XXX better name ? )
814 * of each thread. This is typically the last request.
815 *
816 * @return The params of all threads
817 */
818 public String[] getThreadParam() {
819 String status[]=new String[ threads.size()];
820 Iterator it=threads.keySet().iterator();
821 for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
822 ThreadWithAttributes twa=(ThreadWithAttributes)
823 it.next();
824 Object o=twa.getParam(this);
825 status[i]=(o==null)? null : o.toString();
826 }
827 return status;
828 }
829
830 /** Interface to allow applications to be notified when
831 * a threads are created and stopped.
832 */
833 public static interface ThreadPoolListener {
834 public void threadStart( ThreadPool tp, Thread t);
835
836 public void threadEnd( ThreadPool tp, Thread t);
837 }
838 }