1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.tomcat.util.threads;
19
20 import java.util;
21
22 import org.apache.juli.logging.Log;
23 import org.apache.juli.logging.LogFactory;
24 import org.apache.tomcat.util.res.StringManager;
25
26 /**
27 * A thread pool that is trying to copy the apache process management.
28 *
29 * Should we remove this in favor of Doug Lea's thread package?
30 *
31 * @author Gal Shachor
32 * @author Yoav Shapira <yoavs@apache.org>
33 */
34 public class ThreadPool {
35
36 private static Log log = LogFactory.getLog(ThreadPool.class);
37
38 private static StringManager sm =
39 StringManager.getManager("org.apache.tomcat.util.threads.res");
40
41 private static boolean logfull=true;
42
43 /*
44 * Default values ...
45 */
46 public static final int MAX_THREADS = 200;
47 public static final int MAX_THREADS_MIN = 10;
48 public static final int MAX_SPARE_THREADS = 50;
49 public static final int MIN_SPARE_THREADS = 4;
50 public static final int WORK_WAIT_TIMEOUT = 60*1000;
51
52 /*
53 * Where the threads are held.
54 */
55 protected ControlRunnable[] pool = null;
56
57 /*
58 * A monitor thread that monitors the pool for idel threads.
59 */
60 protected MonitorRunnable monitor;
61
62
63 /*
64 * Max number of threads that you can open in the pool.
65 */
66 protected int maxThreads;
67
68 /*
69 * Min number of idel threads that you can leave in the pool.
70 */
71 protected int minSpareThreads;
72
73 /*
74 * Max number of idel threads that you can leave in the pool.
75 */
76 protected int maxSpareThreads;
77
78 /*
79 * Number of threads in the pool.
80 */
81 protected int currentThreadCount;
82
83 /*
84 * Number of busy threads in the pool.
85 */
86 protected int currentThreadsBusy;
87
88 /*
89 * Flag that the pool should terminate all the threads and stop.
90 */
91 protected boolean stopThePool;
92
93 /* Flag to control if the main thread is 'daemon' */
94 protected boolean isDaemon=true;
95
96 /** The threads that are part of the pool.
97 * Key is Thread, value is the ControlRunnable
98 */
99 protected Hashtable threads=new Hashtable();
100
101 protected Vector listeners=new Vector();
102
103 /** Name of the threadpool
104 */
105 protected String name = "TP";
106
107 /**
108 * Sequence.
109 */
110 protected int sequence = 1;
111
112 /**
113 * Thread priority.
114 */
115 protected int threadPriority = Thread.NORM_PRIORITY;
116
117
118 /**
119 * Constructor.
120 */
121 public ThreadPool() {
122 maxThreads = MAX_THREADS;
123 maxSpareThreads = MAX_SPARE_THREADS;
124 minSpareThreads = MIN_SPARE_THREADS;
125 currentThreadCount = 0;
126 currentThreadsBusy = 0;
127 stopThePool = false;
128 }
129
130
131 /** Create a ThreadPool instance.
132 *
133 * @param jmx UNUSED
134 * @return ThreadPool instance. If JMX support is requested, you need to
135 * call register() in order to set a name.
136 */
137 public static ThreadPool createThreadPool(boolean jmx) {
138 return new ThreadPool();
139 }
140
141 public synchronized void start() {
142 stopThePool=false;
143 currentThreadCount = 0;
144 currentThreadsBusy = 0;
145
146 adjustLimits();
147
148 pool = new ControlRunnable[maxThreads];
149
150 openThreads(minSpareThreads);
151 if (maxSpareThreads < maxThreads) {
152 monitor = new MonitorRunnable(this);
153 }
154 }
155
156 public MonitorRunnable getMonitor() {
157 return monitor;
158 }
159
160 /**
161 * Sets the thread priority for current
162 * and future threads in this pool.
163 *
164 * @param threadPriority The new priority
165 * @throws IllegalArgumentException If the specified
166 * priority is less than Thread.MIN_PRIORITY or
167 * more than Thread.MAX_PRIORITY
168 */
169 public synchronized void setThreadPriority(int threadPriority) {
170 if(log.isDebugEnabled())
171 log.debug(getClass().getName() +
172 ": setPriority(" + threadPriority + "): here.");
173
174 if (threadPriority < Thread.MIN_PRIORITY) {
175 throw new IllegalArgumentException("new priority < MIN_PRIORITY");
176 } else if (threadPriority > Thread.MAX_PRIORITY) {
177 throw new IllegalArgumentException("new priority > MAX_PRIORITY");
178 }
179
180 // Set for future threads
181 this.threadPriority = threadPriority;
182
183 Enumeration currentThreads = getThreads();
184 Thread t = null;
185 while(currentThreads.hasMoreElements()) {
186 t = (Thread) currentThreads.nextElement();
187 t.setPriority(threadPriority);
188 }
189 }
190
191 /**
192 * Returns the priority level of current and
193 * future threads in this pool.
194 *
195 * @return The priority
196 */
197 public int getThreadPriority() {
198 return threadPriority;
199 }
200
201
202 public void setMaxThreads(int maxThreads) {
203 this.maxThreads = maxThreads;
204 }
205
206 public int getMaxThreads() {
207 return maxThreads;
208 }
209
210 public void setMinSpareThreads(int minSpareThreads) {
211 this.minSpareThreads = minSpareThreads;
212 }
213
214 public int getMinSpareThreads() {
215 return minSpareThreads;
216 }
217
218 public void setMaxSpareThreads(int maxSpareThreads) {
219 this.maxSpareThreads = maxSpareThreads;
220 }
221
222 public int getMaxSpareThreads() {
223 return maxSpareThreads;
224 }
225
226 public int getCurrentThreadCount() {
227 return currentThreadCount;
228 }
229
230 public int getCurrentThreadsBusy() {
231 return currentThreadsBusy;
232 }
233
234 public boolean isDaemon() {
235 return isDaemon;
236 }
237
238 public static int getDebug() {
239 return 0;
240 }
241
242 /** The default is true - the created threads will be
243 * in daemon mode. If set to false, the control thread
244 * will not be daemon - and will keep the process alive.
245 */
246 public void setDaemon( boolean b ) {
247 isDaemon=b;
248 }
249
250 public boolean getDaemon() {
251 return isDaemon;
252 }
253
254 public void setName(String name) {
255 this.name = name;
256 }
257
258 public String getName() {
259 return name;
260 }
261
262 public int getSequence() {
263 return sequence;
264 }
265
266 public int incSequence() {
267 return sequence++;
268 }
269
270 public void addThread( Thread t, ControlRunnable cr ) {
271 threads.put( t, cr );
272 for( int i=0; i<listeners.size(); i++ ) {
273 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
274 tpl.threadStart(this, t);
275 }
276 }
277
278 public void removeThread( Thread t ) {
279 threads.remove(t);
280 for( int i=0; i<listeners.size(); i++ ) {
281 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
282 tpl.threadEnd(this, t);
283 }
284 }
285
286 public void addThreadPoolListener( ThreadPoolListener tpl ) {
287 listeners.addElement( tpl );
288 }
289
290 public Enumeration getThreads(){
291 return threads.keys();
292 }
293
294 public void run(Runnable r) {
295 ControlRunnable c = findControlRunnable();
296 c.runIt(r);
297 }
298
299 //
300 // You may wonder what you see here ... basically I am trying
301 // to maintain a stack of threads. This way locality in time
302 // is kept and there is a better chance to find residues of the
303 // thread in memory next time it runs.
304 //
305
306 /**
307 * Executes a given Runnable on a thread in the pool, block if needed.
308 */
309 public void runIt(ThreadPoolRunnable r) {
310 if(null == r) {
311 throw new NullPointerException();
312 }
313
314 ControlRunnable c = findControlRunnable();
315 c.runIt(r);
316 }
317
318 private ControlRunnable findControlRunnable() {
319 ControlRunnable c=null;
320
321 if ( stopThePool ) {
322 throw new IllegalStateException();
323 }
324
325 // Obtain a free thread from the pool.
326 synchronized(this) {
327
328 while (currentThreadsBusy == currentThreadCount) {
329 // All threads are busy
330 if (currentThreadCount < maxThreads) {
331 // Not all threads were open,
332 // Open new threads up to the max number of idel threads
333 int toOpen = currentThreadCount + minSpareThreads;
334 openThreads(toOpen);
335 } else {
336 logFull(log, currentThreadCount, maxThreads);
337 // Wait for a thread to become idel.
338 try {
339 this.wait();
340 }
341 // was just catch Throwable -- but no other
342 // exceptions can be thrown by wait, right?
343 // So we catch and ignore this one, since
344 // it'll never actually happen, since nowhere
345 // do we say pool.interrupt().
346 catch(InterruptedException e) {
347 log.error("Unexpected exception", e);
348 }
349 if( log.isDebugEnabled() ) {
350 log.debug("Finished waiting: CTC="+currentThreadCount +
351 ", CTB=" + currentThreadsBusy);
352 }
353 // Pool was stopped. Get away of the pool.
354 if( stopThePool) {
355 break;
356 }
357 }
358 }
359 // Pool was stopped. Get away of the pool.
360 if(0 == currentThreadCount || stopThePool) {
361 throw new IllegalStateException();
362 }
363
364 // If we are here it means that there is a free thread. Take it.
365 int pos = currentThreadCount - currentThreadsBusy - 1;
366 c = pool[pos];
367 pool[pos] = null;
368 currentThreadsBusy++;
369
370 }
371 return c;
372 }
373
374 private static void logFull(Log loghelper, int currentThreadCount,
375 int maxThreads) {
376 if( logfull ) {
377 log.error(sm.getString("threadpool.busy",
378 new Integer(currentThreadCount),
379 new Integer(maxThreads)));
380 logfull=false;
381 } else if( log.isDebugEnabled() ) {
382 log.debug("All threads are busy " + currentThreadCount + " " +
383 maxThreads );
384 }
385 }
386
387 /**
388 * Stop the thread pool
389 */
390 public synchronized void shutdown() {
391 if(!stopThePool) {
392 stopThePool = true;
393 if (monitor != null) {
394 monitor.terminate();
395 monitor = null;
396 }
397 for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
398 try {
399 pool[i].terminate();
400 } catch(Throwable t) {
401 /*
402 * Do nothing... The show must go on, we are shutting
403 * down the pool and nothing should stop that.
404 */
405 log.error("Ignored exception while shutting down thread pool", t);
406 }
407 }
408 currentThreadsBusy = currentThreadCount = 0;
409 pool = null;
410 notifyAll();
411 }
412 }
413
414 /**
415 * Called by the monitor thread to harvest idle threads.
416 */
417 protected synchronized void checkSpareControllers() {
418
419 if(stopThePool) {
420 return;
421 }
422
423 if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
424 int toFree = currentThreadCount -
425 currentThreadsBusy -
426 maxSpareThreads;
427
428 for(int i = 0 ; i < toFree ; i++) {
429 ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1];
430 c.terminate();
431 pool[currentThreadCount - currentThreadsBusy - 1] = null;
432 currentThreadCount --;
433 }
434
435 }
436
437 }
438
439 /**
440 * Returns the thread to the pool.
441 * Called by threads as they are becoming idel.
442 */
443 protected synchronized void returnController(ControlRunnable c) {
444
445 if(0 == currentThreadCount || stopThePool) {
446 c.terminate();
447 return;
448 }
449
450 // atomic
451 currentThreadsBusy--;
452
453 pool[currentThreadCount - currentThreadsBusy - 1] = c;
454 notify();
455 }
456
457 /**
458 * Inform the pool that the specific thread finish.
459 *
460 * Called by the ControlRunnable.run() when the runnable
461 * throws an exception.
462 */
463 protected synchronized void notifyThreadEnd(ControlRunnable c) {
464 currentThreadsBusy--;
465 currentThreadCount --;
466 notify();
467 }
468
469
470 /*
471 * Checks for problematic configuration and fix it.
472 * The fix provides reasonable settings for a single CPU
473 * with medium load.
474 */
475 protected void adjustLimits() {
476 if(maxThreads <= 0) {
477 maxThreads = MAX_THREADS;
478 } else if (maxThreads < MAX_THREADS_MIN) {
479 log.warn(sm.getString("threadpool.max_threads_too_low",
480 new Integer(maxThreads),
481 new Integer(MAX_THREADS_MIN)));
482 maxThreads = MAX_THREADS_MIN;
483 }
484
485 if(maxSpareThreads >= maxThreads) {
486 maxSpareThreads = maxThreads;
487 }
488
489 if(maxSpareThreads <= 0) {
490 if(1 == maxThreads) {
491 maxSpareThreads = 1;
492 } else {
493 maxSpareThreads = maxThreads/2;
494 }
495 }
496
497 if(minSpareThreads > maxSpareThreads) {
498 minSpareThreads = maxSpareThreads;
499 }
500
501 if(minSpareThreads <= 0) {
502 if(1 == maxSpareThreads) {
503 minSpareThreads = 1;
504 } else {
505 minSpareThreads = maxSpareThreads/2;
506 }
507 }
508 }
509
510 /** Create missing threads.
511 *
512 * @param toOpen Total number of threads we'll have open
513 */
514 protected void openThreads(int toOpen) {
515
516 if(toOpen > maxThreads) {
517 toOpen = maxThreads;
518 }
519
520 for(int i = currentThreadCount ; i < toOpen ; i++) {
521 pool[i - currentThreadsBusy] = new ControlRunnable(this);
522 }
523
524 currentThreadCount = toOpen;
525 }
526
527 /** @deprecated */
528 void log( String s ) {
529 log.info(s);
530 //loghelper.flush();
531 }
532
533 /**
534 * Periodically execute an action - cleanup in this case
535 */
536 public static class MonitorRunnable implements Runnable {
537 ThreadPool p;
538 Thread t;
539 int interval=WORK_WAIT_TIMEOUT;
540 boolean shouldTerminate;
541
542 MonitorRunnable(ThreadPool p) {
543 this.p=p;
544 this.start();
545 }
546
547 public void start() {
548 shouldTerminate = false;
549 t = new Thread(this);
550 t.setDaemon(p.getDaemon() );
551 t.setName(p.getName() + "-Monitor");
552 t.start();
553 }
554
555 public void setInterval(int i ) {
556 this.interval=i;
557 }
558
559 public void run() {
560 while(true) {
561 try {
562
563 // Sleep for a while.
564 synchronized(this) {
565 this.wait(interval);
566 }
567
568 // Check if should terminate.
569 // termination happens when the pool is shutting down.
570 if(shouldTerminate) {
571 break;
572 }
573
574 // Harvest idle threads.
575 p.checkSpareControllers();
576
577 } catch(Throwable t) {
578 ThreadPool.log.error("Unexpected exception", t);
579 }
580 }
581 }
582
583 public void stop() {
584 this.terminate();
585 }
586
587 /** Stop the monitor
588 */
589 public synchronized void terminate() {
590 shouldTerminate = true;
591 this.notify();
592 }
593 }
594
595 /**
596 * A Thread object that executes various actions ( ThreadPoolRunnable )
597 * under control of ThreadPool
598 */
599 public static class ControlRunnable implements Runnable {
600 /**
601 * ThreadPool where this thread will be returned
602 */
603 private ThreadPool p;
604
605 /**
606 * The thread that executes the actions
607 */
608 private ThreadWithAttributes t;
609
610 /**
611 * The method that is executed in this thread
612 */
613
614 private ThreadPoolRunnable toRun;
615 private Runnable toRunRunnable;
616
617 /**
618 * Stop this thread
619 */
620 private boolean shouldTerminate;
621
622 /**
623 * Activate the execution of the action
624 */
625 private boolean shouldRun;
626
627 /**
628 * Per thread data - can be used only if all actions are
629 * of the same type.
630 * A better mechanism is possible ( that would allow association of
631 * thread data with action type ), but right now it's enough.
632 */
633 private boolean noThData;
634
635 /**
636 * Start a new thread, with no method in it
637 */
638 ControlRunnable(ThreadPool p) {
639 toRun = null;
640 shouldTerminate = false;
641 shouldRun = false;
642 this.p = p;
643 t = new ThreadWithAttributes(p, this);
644 t.setDaemon(true);
645 t.setName(p.getName() + "-Processor" + p.incSequence());
646 t.setPriority(p.getThreadPriority());
647 p.addThread( t, this );
648 noThData=true;
649 t.start();
650 }
651
652 public void run() {
653 boolean _shouldRun = false;
654 boolean _shouldTerminate = false;
655 ThreadPoolRunnable _toRun = null;
656 try {
657 while (true) {
658 try {
659 /* Wait for work. */
660 synchronized (this) {
661 while (!shouldRun && !shouldTerminate) {
662 this.wait();
663 }
664 _shouldRun = shouldRun;
665 _shouldTerminate = shouldTerminate;
666 _toRun = toRun;
667 }
668
669 if (_shouldTerminate) {
670 if (ThreadPool.log.isDebugEnabled())
671 ThreadPool.log.debug("Terminate");
672 break;
673 }
674
675 /* Check if should execute a runnable. */
676 try {
677 if (noThData) {
678 if (_toRun != null) {
679 Object thData[] = _toRun.getInitData();
680 t.setThreadData(p, thData);
681 if (ThreadPool.log.isDebugEnabled())
682 ThreadPool.log.debug(
683 "Getting new thread data");
684 }
685 noThData = false;
686 }
687
688 if (_shouldRun) {
689 if (_toRun != null) {
690 _toRun.runIt(t.getThreadData(p));
691 } else if (toRunRunnable != null) {
692 toRunRunnable.run();
693 } else {
694 if (ThreadPool.log.isDebugEnabled())
695 ThreadPool.log.debug("No toRun ???");
696 }
697 }
698 } catch (Throwable t) {
699 ThreadPool.log.error(sm.getString
700 ("threadpool.thread_error", t, toRun.toString()));
701 /*
702 * The runnable throw an exception (can be even a ThreadDeath),
703 * signalling that the thread die.
704 *
705 * The meaning is that we should release the thread from
706 * the pool.
707 */
708 _shouldTerminate = true;
709 _shouldRun = false;
710 p.notifyThreadEnd(this);
711 } finally {
712 if (_shouldRun) {
713 shouldRun = false;
714 /*
715 * Notify the pool that the thread is now idle.
716 */
717 p.returnController(this);
718 }
719 }
720
721 /*
722 * Check if should terminate.
723 * termination happens when the pool is shutting down.
724 */
725 if (_shouldTerminate) {
726 break;
727 }
728 } catch (InterruptedException ie) { /* for the wait operation */
729 // can never happen, since we don't call interrupt
730 ThreadPool.log.error("Unexpected exception", ie);
731 }
732 }
733 } finally {
734 p.removeThread(Thread.currentThread());
735 }
736 }
737 /** Run a task
738 *
739 * @param toRun
740 */
741 public synchronized void runIt(Runnable toRun) {
742 this.toRunRunnable = toRun;
743 // Do not re-init, the whole idea is to run init only once per
744 // thread - the pool is supposed to run a single task, that is
745 // initialized once.
746 // noThData = true;
747 shouldRun = true;
748 this.notify();
749 }
750
751 /** Run a task
752 *
753 * @param toRun
754 */
755 public synchronized void runIt(ThreadPoolRunnable toRun) {
756 this.toRun = toRun;
757 // Do not re-init, the whole idea is to run init only once per
758 // thread - the pool is supposed to run a single task, that is
759 // initialized once.
760 // noThData = true;
761 shouldRun = true;
762 this.notify();
763 }
764
765 public void stop() {
766 this.terminate();
767 }
768
769 public void kill() {
770 t.stop();
771 }
772
773 public synchronized void terminate() {
774 shouldTerminate = true;
775 this.notify();
776 }
777 }
778
779 /**
780 * Debug display of the stage of each thread. The return is html style,
781 * for display in the console ( it can be easily parsed too ).
782 *
783 * @return The thread status display
784 */
785 public String threadStatusString() {
786 StringBuffer sb=new StringBuffer();
787 Iterator it=threads.keySet().iterator();
788 sb.append("<ul>");
789 while( it.hasNext()) {
790 sb.append("<li>");
791 ThreadWithAttributes twa=(ThreadWithAttributes)
792 it.next();
793 sb.append(twa.getCurrentStage(this) ).append(" ");
794 sb.append( twa.getParam(this));
795 sb.append( "</li>\n");
796 }
797 sb.append("</ul>");
798 return sb.toString();
799 }
800
801 /** Return an array with the status of each thread. The status
802 * indicates the current request processing stage ( for tomcat ) or
803 * whatever the thread is doing ( if the application using TP provide
804 * this info )
805 *
806 * @return The status of all threads
807 */
808 public String[] getThreadStatus() {
809 String status[]=new String[ threads.size()];
810 Iterator it=threads.keySet().iterator();
811 for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
812 ThreadWithAttributes twa=(ThreadWithAttributes)
813 it.next();
814 status[i]=twa.getCurrentStage(this);
815 }
816 return status;
817 }
818
819 /** Return an array with the current "param" ( XXX better name ? )
820 * of each thread. This is typically the last request.
821 *
822 * @return The params of all threads
823 */
824 public String[] getThreadParam() {
825 String status[]=new String[ threads.size()];
826 Iterator it=threads.keySet().iterator();
827 for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
828 ThreadWithAttributes twa=(ThreadWithAttributes)
829 it.next();
830 Object o=twa.getParam(this);
831 status[i]=(o==null)? null : o.toString();
832 }
833 return status;
834 }
835
836 /** Interface to allow applications to be notified when
837 * a threads are created and stopped.
838 */
839 public static interface ThreadPoolListener {
840 public void threadStart( ThreadPool tp, Thread t);
841
842 public void threadEnd( ThreadPool tp, Thread t);
843 }
844 }