1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.tomcat.util.threads; 19 20 import java.util; 21 22 import org.apache.juli.logging.Log; 23 import org.apache.juli.logging.LogFactory; 24 import org.apache.tomcat.util.res.StringManager; 25 26 /** 27 * A thread pool that is trying to copy the apache process management. 28 * 29 * Should we remove this in favor of Doug Lea's thread package? 30 * 31 * @author Gal Shachor 32 * @author Yoav Shapira <yoavs@apache.org> 33 */ 34 public class ThreadPool { 35 36 private static Log log = LogFactory.getLog(ThreadPool.class); 37 38 private static StringManager sm = 39 StringManager.getManager("org.apache.tomcat.util.threads.res"); 40 41 private static boolean logfull=true; 42 43 /* 44 * Default values ... 45 */ 46 public static final int MAX_THREADS = 200; 47 public static final int MAX_THREADS_MIN = 10; 48 public static final int MAX_SPARE_THREADS = 50; 49 public static final int MIN_SPARE_THREADS = 4; 50 public static final int WORK_WAIT_TIMEOUT = 60*1000; 51 52 /* 53 * Where the threads are held. 54 */ 55 protected ControlRunnable[] pool = null; 56 57 /* 58 * A monitor thread that monitors the pool for idel threads. 59 */ 60 protected MonitorRunnable monitor; 61 62 63 /* 64 * Max number of threads that you can open in the pool. 65 */ 66 protected int maxThreads; 67 68 /* 69 * Min number of idel threads that you can leave in the pool. 70 */ 71 protected int minSpareThreads; 72 73 /* 74 * Max number of idel threads that you can leave in the pool. 75 */ 76 protected int maxSpareThreads; 77 78 /* 79 * Number of threads in the pool. 80 */ 81 protected int currentThreadCount; 82 83 /* 84 * Number of busy threads in the pool. 85 */ 86 protected int currentThreadsBusy; 87 88 /* 89 * Flag that the pool should terminate all the threads and stop. 90 */ 91 protected boolean stopThePool; 92 93 /* Flag to control if the main thread is 'daemon' */ 94 protected boolean isDaemon=true; 95 96 /** The threads that are part of the pool. 97 * Key is Thread, value is the ControlRunnable 98 */ 99 protected Hashtable threads=new Hashtable(); 100 101 protected Vector listeners=new Vector(); 102 103 /** Name of the threadpool 104 */ 105 protected String name = "TP"; 106 107 /** 108 * Sequence. 109 */ 110 protected int sequence = 1; 111 112 /** 113 * Thread priority. 114 */ 115 protected int threadPriority = Thread.NORM_PRIORITY; 116 117 118 /** 119 * Constructor. 120 */ 121 public ThreadPool() { 122 maxThreads = MAX_THREADS; 123 maxSpareThreads = MAX_SPARE_THREADS; 124 minSpareThreads = MIN_SPARE_THREADS; 125 currentThreadCount = 0; 126 currentThreadsBusy = 0; 127 stopThePool = false; 128 } 129 130 131 /** Create a ThreadPool instance. 132 * 133 * @param jmx UNUSED 134 * @return ThreadPool instance. If JMX support is requested, you need to 135 * call register() in order to set a name. 136 */ 137 public static ThreadPool createThreadPool(boolean jmx) { 138 return new ThreadPool(); 139 } 140 141 public synchronized void start() { 142 stopThePool=false; 143 currentThreadCount = 0; 144 currentThreadsBusy = 0; 145 146 adjustLimits(); 147 148 pool = new ControlRunnable[maxThreads]; 149 150 openThreads(minSpareThreads); 151 if (maxSpareThreads < maxThreads) { 152 monitor = new MonitorRunnable(this); 153 } 154 } 155 156 public MonitorRunnable getMonitor() { 157 return monitor; 158 } 159 160 /** 161 * Sets the thread priority for current 162 * and future threads in this pool. 163 * 164 * @param threadPriority The new priority 165 * @throws IllegalArgumentException If the specified 166 * priority is less than Thread.MIN_PRIORITY or 167 * more than Thread.MAX_PRIORITY 168 */ 169 public synchronized void setThreadPriority(int threadPriority) { 170 if(log.isDebugEnabled()) 171 log.debug(getClass().getName() + 172 ": setPriority(" + threadPriority + "): here."); 173 174 if (threadPriority < Thread.MIN_PRIORITY) { 175 throw new IllegalArgumentException("new priority < MIN_PRIORITY"); 176 } else if (threadPriority > Thread.MAX_PRIORITY) { 177 throw new IllegalArgumentException("new priority > MAX_PRIORITY"); 178 } 179 180 // Set for future threads 181 this.threadPriority = threadPriority; 182 183 Enumeration currentThreads = getThreads(); 184 Thread t = null; 185 while(currentThreads.hasMoreElements()) { 186 t = (Thread) currentThreads.nextElement(); 187 t.setPriority(threadPriority); 188 } 189 } 190 191 /** 192 * Returns the priority level of current and 193 * future threads in this pool. 194 * 195 * @return The priority 196 */ 197 public int getThreadPriority() { 198 return threadPriority; 199 } 200 201 202 public void setMaxThreads(int maxThreads) { 203 this.maxThreads = maxThreads; 204 } 205 206 public int getMaxThreads() { 207 return maxThreads; 208 } 209 210 public void setMinSpareThreads(int minSpareThreads) { 211 this.minSpareThreads = minSpareThreads; 212 } 213 214 public int getMinSpareThreads() { 215 return minSpareThreads; 216 } 217 218 public void setMaxSpareThreads(int maxSpareThreads) { 219 this.maxSpareThreads = maxSpareThreads; 220 } 221 222 public int getMaxSpareThreads() { 223 return maxSpareThreads; 224 } 225 226 public int getCurrentThreadCount() { 227 return currentThreadCount; 228 } 229 230 public int getCurrentThreadsBusy() { 231 return currentThreadsBusy; 232 } 233 234 public boolean isDaemon() { 235 return isDaemon; 236 } 237 238 public static int getDebug() { 239 return 0; 240 } 241 242 /** The default is true - the created threads will be 243 * in daemon mode. If set to false, the control thread 244 * will not be daemon - and will keep the process alive. 245 */ 246 public void setDaemon( boolean b ) { 247 isDaemon=b; 248 } 249 250 public boolean getDaemon() { 251 return isDaemon; 252 } 253 254 public void setName(String name) { 255 this.name = name; 256 } 257 258 public String getName() { 259 return name; 260 } 261 262 public int getSequence() { 263 return sequence; 264 } 265 266 public int incSequence() { 267 return sequence++; 268 } 269 270 public void addThread( Thread t, ControlRunnable cr ) { 271 threads.put( t, cr ); 272 for( int i=0; i<listeners.size(); i++ ) { 273 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i); 274 tpl.threadStart(this, t); 275 } 276 } 277 278 public void removeThread( Thread t ) { 279 threads.remove(t); 280 for( int i=0; i<listeners.size(); i++ ) { 281 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i); 282 tpl.threadEnd(this, t); 283 } 284 } 285 286 public void addThreadPoolListener( ThreadPoolListener tpl ) { 287 listeners.addElement( tpl ); 288 } 289 290 public Enumeration getThreads(){ 291 return threads.keys(); 292 } 293 294 public void run(Runnable r) { 295 ControlRunnable c = findControlRunnable(); 296 c.runIt(r); 297 } 298 299 // 300 // You may wonder what you see here ... basically I am trying 301 // to maintain a stack of threads. This way locality in time 302 // is kept and there is a better chance to find residues of the 303 // thread in memory next time it runs. 304 // 305 306 /** 307 * Executes a given Runnable on a thread in the pool, block if needed. 308 */ 309 public void runIt(ThreadPoolRunnable r) { 310 if(null == r) { 311 throw new NullPointerException(); 312 } 313 314 ControlRunnable c = findControlRunnable(); 315 c.runIt(r); 316 } 317 318 private ControlRunnable findControlRunnable() { 319 ControlRunnable c=null; 320 321 if ( stopThePool ) { 322 throw new IllegalStateException(); 323 } 324 325 // Obtain a free thread from the pool. 326 synchronized(this) { 327 328 while (currentThreadsBusy == currentThreadCount) { 329 // All threads are busy 330 if (currentThreadCount < maxThreads) { 331 // Not all threads were open, 332 // Open new threads up to the max number of idel threads 333 int toOpen = currentThreadCount + minSpareThreads; 334 openThreads(toOpen); 335 } else { 336 logFull(log, currentThreadCount, maxThreads); 337 // Wait for a thread to become idel. 338 try { 339 this.wait(); 340 } 341 // was just catch Throwable -- but no other 342 // exceptions can be thrown by wait, right? 343 // So we catch and ignore this one, since 344 // it'll never actually happen, since nowhere 345 // do we say pool.interrupt(). 346 catch(InterruptedException e) { 347 log.error("Unexpected exception", e); 348 } 349 if( log.isDebugEnabled() ) { 350 log.debug("Finished waiting: CTC="+currentThreadCount + 351 ", CTB=" + currentThreadsBusy); 352 } 353 // Pool was stopped. Get away of the pool. 354 if( stopThePool) { 355 break; 356 } 357 } 358 } 359 // Pool was stopped. Get away of the pool. 360 if(0 == currentThreadCount || stopThePool) { 361 throw new IllegalStateException(); 362 } 363 364 // If we are here it means that there is a free thread. Take it. 365 int pos = currentThreadCount - currentThreadsBusy - 1; 366 c = pool[pos]; 367 pool[pos] = null; 368 currentThreadsBusy++; 369 370 } 371 return c; 372 } 373 374 private static void logFull(Log loghelper, int currentThreadCount, 375 int maxThreads) { 376 if( logfull ) { 377 log.error(sm.getString("threadpool.busy", 378 new Integer(currentThreadCount), 379 new Integer(maxThreads))); 380 logfull=false; 381 } else if( log.isDebugEnabled() ) { 382 log.debug("All threads are busy " + currentThreadCount + " " + 383 maxThreads ); 384 } 385 } 386 387 /** 388 * Stop the thread pool 389 */ 390 public synchronized void shutdown() { 391 if(!stopThePool) { 392 stopThePool = true; 393 if (monitor != null) { 394 monitor.terminate(); 395 monitor = null; 396 } 397 for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) { 398 try { 399 pool[i].terminate(); 400 } catch(Throwable t) { 401 /* 402 * Do nothing... The show must go on, we are shutting 403 * down the pool and nothing should stop that. 404 */ 405 log.error("Ignored exception while shutting down thread pool", t); 406 } 407 } 408 currentThreadsBusy = currentThreadCount = 0; 409 pool = null; 410 notifyAll(); 411 } 412 } 413 414 /** 415 * Called by the monitor thread to harvest idle threads. 416 */ 417 protected synchronized void checkSpareControllers() { 418 419 if(stopThePool) { 420 return; 421 } 422 423 if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) { 424 int toFree = currentThreadCount - 425 currentThreadsBusy - 426 maxSpareThreads; 427 428 for(int i = 0 ; i < toFree ; i++) { 429 ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1]; 430 c.terminate(); 431 pool[currentThreadCount - currentThreadsBusy - 1] = null; 432 currentThreadCount --; 433 } 434 435 } 436 437 } 438 439 /** 440 * Returns the thread to the pool. 441 * Called by threads as they are becoming idel. 442 */ 443 protected synchronized void returnController(ControlRunnable c) { 444 445 if(0 == currentThreadCount || stopThePool) { 446 c.terminate(); 447 return; 448 } 449 450 // atomic 451 currentThreadsBusy--; 452 453 pool[currentThreadCount - currentThreadsBusy - 1] = c; 454 notify(); 455 } 456 457 /** 458 * Inform the pool that the specific thread finish. 459 * 460 * Called by the ControlRunnable.run() when the runnable 461 * throws an exception. 462 */ 463 protected synchronized void notifyThreadEnd(ControlRunnable c) { 464 currentThreadsBusy--; 465 currentThreadCount --; 466 notify(); 467 } 468 469 470 /* 471 * Checks for problematic configuration and fix it. 472 * The fix provides reasonable settings for a single CPU 473 * with medium load. 474 */ 475 protected void adjustLimits() { 476 if(maxThreads <= 0) { 477 maxThreads = MAX_THREADS; 478 } else if (maxThreads < MAX_THREADS_MIN) { 479 log.warn(sm.getString("threadpool.max_threads_too_low", 480 new Integer(maxThreads), 481 new Integer(MAX_THREADS_MIN))); 482 maxThreads = MAX_THREADS_MIN; 483 } 484 485 if(maxSpareThreads >= maxThreads) { 486 maxSpareThreads = maxThreads; 487 } 488 489 if(maxSpareThreads <= 0) { 490 if(1 == maxThreads) { 491 maxSpareThreads = 1; 492 } else { 493 maxSpareThreads = maxThreads/2; 494 } 495 } 496 497 if(minSpareThreads > maxSpareThreads) { 498 minSpareThreads = maxSpareThreads; 499 } 500 501 if(minSpareThreads <= 0) { 502 if(1 == maxSpareThreads) { 503 minSpareThreads = 1; 504 } else { 505 minSpareThreads = maxSpareThreads/2; 506 } 507 } 508 } 509 510 /** Create missing threads. 511 * 512 * @param toOpen Total number of threads we'll have open 513 */ 514 protected void openThreads(int toOpen) { 515 516 if(toOpen > maxThreads) { 517 toOpen = maxThreads; 518 } 519 520 for(int i = currentThreadCount ; i < toOpen ; i++) { 521 pool[i - currentThreadsBusy] = new ControlRunnable(this); 522 } 523 524 currentThreadCount = toOpen; 525 } 526 527 /** @deprecated */ 528 void log( String s ) { 529 log.info(s); 530 //loghelper.flush(); 531 } 532 533 /** 534 * Periodically execute an action - cleanup in this case 535 */ 536 public static class MonitorRunnable implements Runnable { 537 ThreadPool p; 538 Thread t; 539 int interval=WORK_WAIT_TIMEOUT; 540 boolean shouldTerminate; 541 542 MonitorRunnable(ThreadPool p) { 543 this.p=p; 544 this.start(); 545 } 546 547 public void start() { 548 shouldTerminate = false; 549 t = new Thread(this); 550 t.setDaemon(p.getDaemon() ); 551 t.setName(p.getName() + "-Monitor"); 552 t.start(); 553 } 554 555 public void setInterval(int i ) { 556 this.interval=i; 557 } 558 559 public void run() { 560 while(true) { 561 try { 562 563 // Sleep for a while. 564 synchronized(this) { 565 this.wait(interval); 566 } 567 568 // Check if should terminate. 569 // termination happens when the pool is shutting down. 570 if(shouldTerminate) { 571 break; 572 } 573 574 // Harvest idle threads. 575 p.checkSpareControllers(); 576 577 } catch(Throwable t) { 578 ThreadPool.log.error("Unexpected exception", t); 579 } 580 } 581 } 582 583 public void stop() { 584 this.terminate(); 585 } 586 587 /** Stop the monitor 588 */ 589 public synchronized void terminate() { 590 shouldTerminate = true; 591 this.notify(); 592 } 593 } 594 595 /** 596 * A Thread object that executes various actions ( ThreadPoolRunnable ) 597 * under control of ThreadPool 598 */ 599 public static class ControlRunnable implements Runnable { 600 /** 601 * ThreadPool where this thread will be returned 602 */ 603 private ThreadPool p; 604 605 /** 606 * The thread that executes the actions 607 */ 608 private ThreadWithAttributes t; 609 610 /** 611 * The method that is executed in this thread 612 */ 613 614 private ThreadPoolRunnable toRun; 615 private Runnable toRunRunnable; 616 617 /** 618 * Stop this thread 619 */ 620 private boolean shouldTerminate; 621 622 /** 623 * Activate the execution of the action 624 */ 625 private boolean shouldRun; 626 627 /** 628 * Per thread data - can be used only if all actions are 629 * of the same type. 630 * A better mechanism is possible ( that would allow association of 631 * thread data with action type ), but right now it's enough. 632 */ 633 private boolean noThData; 634 635 /** 636 * Start a new thread, with no method in it 637 */ 638 ControlRunnable(ThreadPool p) { 639 toRun = null; 640 shouldTerminate = false; 641 shouldRun = false; 642 this.p = p; 643 t = new ThreadWithAttributes(p, this); 644 t.setDaemon(true); 645 t.setName(p.getName() + "-Processor" + p.incSequence()); 646 t.setPriority(p.getThreadPriority()); 647 p.addThread( t, this ); 648 noThData=true; 649 t.start(); 650 } 651 652 public void run() { 653 boolean _shouldRun = false; 654 boolean _shouldTerminate = false; 655 ThreadPoolRunnable _toRun = null; 656 try { 657 while (true) { 658 try { 659 /* Wait for work. */ 660 synchronized (this) { 661 while (!shouldRun && !shouldTerminate) { 662 this.wait(); 663 } 664 _shouldRun = shouldRun; 665 _shouldTerminate = shouldTerminate; 666 _toRun = toRun; 667 } 668 669 if (_shouldTerminate) { 670 if (ThreadPool.log.isDebugEnabled()) 671 ThreadPool.log.debug("Terminate"); 672 break; 673 } 674 675 /* Check if should execute a runnable. */ 676 try { 677 if (noThData) { 678 if (_toRun != null) { 679 Object thData[] = _toRun.getInitData(); 680 t.setThreadData(p, thData); 681 if (ThreadPool.log.isDebugEnabled()) 682 ThreadPool.log.debug( 683 "Getting new thread data"); 684 } 685 noThData = false; 686 } 687 688 if (_shouldRun) { 689 if (_toRun != null) { 690 _toRun.runIt(t.getThreadData(p)); 691 } else if (toRunRunnable != null) { 692 toRunRunnable.run(); 693 } else { 694 if (ThreadPool.log.isDebugEnabled()) 695 ThreadPool.log.debug("No toRun ???"); 696 } 697 } 698 } catch (Throwable t) { 699 ThreadPool.log.error(sm.getString 700 ("threadpool.thread_error", t, toRun.toString())); 701 /* 702 * The runnable throw an exception (can be even a ThreadDeath), 703 * signalling that the thread die. 704 * 705 * The meaning is that we should release the thread from 706 * the pool. 707 */ 708 _shouldTerminate = true; 709 _shouldRun = false; 710 p.notifyThreadEnd(this); 711 } finally { 712 if (_shouldRun) { 713 shouldRun = false; 714 /* 715 * Notify the pool that the thread is now idle. 716 */ 717 p.returnController(this); 718 } 719 } 720 721 /* 722 * Check if should terminate. 723 * termination happens when the pool is shutting down. 724 */ 725 if (_shouldTerminate) { 726 break; 727 } 728 } catch (InterruptedException ie) { /* for the wait operation */ 729 // can never happen, since we don't call interrupt 730 ThreadPool.log.error("Unexpected exception", ie); 731 } 732 } 733 } finally { 734 p.removeThread(Thread.currentThread()); 735 } 736 } 737 /** Run a task 738 * 739 * @param toRun 740 */ 741 public synchronized void runIt(Runnable toRun) { 742 this.toRunRunnable = toRun; 743 // Do not re-init, the whole idea is to run init only once per 744 // thread - the pool is supposed to run a single task, that is 745 // initialized once. 746 // noThData = true; 747 shouldRun = true; 748 this.notify(); 749 } 750 751 /** Run a task 752 * 753 * @param toRun 754 */ 755 public synchronized void runIt(ThreadPoolRunnable toRun) { 756 this.toRun = toRun; 757 // Do not re-init, the whole idea is to run init only once per 758 // thread - the pool is supposed to run a single task, that is 759 // initialized once. 760 // noThData = true; 761 shouldRun = true; 762 this.notify(); 763 } 764 765 public void stop() { 766 this.terminate(); 767 } 768 769 public void kill() { 770 t.stop(); 771 } 772 773 public synchronized void terminate() { 774 shouldTerminate = true; 775 this.notify(); 776 } 777 } 778 779 /** 780 * Debug display of the stage of each thread. The return is html style, 781 * for display in the console ( it can be easily parsed too ). 782 * 783 * @return The thread status display 784 */ 785 public String threadStatusString() { 786 StringBuffer sb=new StringBuffer(); 787 Iterator it=threads.keySet().iterator(); 788 sb.append("<ul>"); 789 while( it.hasNext()) { 790 sb.append("<li>"); 791 ThreadWithAttributes twa=(ThreadWithAttributes) 792 it.next(); 793 sb.append(twa.getCurrentStage(this) ).append(" "); 794 sb.append( twa.getParam(this)); 795 sb.append( "</li>\n"); 796 } 797 sb.append("</ul>"); 798 return sb.toString(); 799 } 800 801 /** Return an array with the status of each thread. The status 802 * indicates the current request processing stage ( for tomcat ) or 803 * whatever the thread is doing ( if the application using TP provide 804 * this info ) 805 * 806 * @return The status of all threads 807 */ 808 public String[] getThreadStatus() { 809 String status[]=new String[ threads.size()]; 810 Iterator it=threads.keySet().iterator(); 811 for( int i=0; ( i<status.length && it.hasNext()); i++ ) { 812 ThreadWithAttributes twa=(ThreadWithAttributes) 813 it.next(); 814 status[i]=twa.getCurrentStage(this); 815 } 816 return status; 817 } 818 819 /** Return an array with the current "param" ( XXX better name ? ) 820 * of each thread. This is typically the last request. 821 * 822 * @return The params of all threads 823 */ 824 public String[] getThreadParam() { 825 String status[]=new String[ threads.size()]; 826 Iterator it=threads.keySet().iterator(); 827 for( int i=0; ( i<status.length && it.hasNext()); i++ ) { 828 ThreadWithAttributes twa=(ThreadWithAttributes) 829 it.next(); 830 Object o=twa.getParam(this); 831 status[i]=(o==null)? null : o.toString(); 832 } 833 return status; 834 } 835 836 /** Interface to allow applications to be notified when 837 * a threads are created and stopped. 838 */ 839 public static interface ThreadPoolListener { 840 public void threadStart( ThreadPool tp, Thread t); 841 842 public void threadEnd( ThreadPool tp, Thread t); 843 } 844 }