Save This Page
Home » quartz-1.6.0 » org » quartz » simpl » [javadoc | source]
    1   /* 
    2    * Copyright 2004-2005 OpenSymphony 
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
    5    * use this file except in compliance with the License. You may obtain a copy 
    6    * of the License at 
    7    * 
    8    *   http://www.apache.org/licenses/LICENSE-2.0 
    9    *   
   10    * Unless required by applicable law or agreed to in writing, software 
   11    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
   12    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
   13    * License for the specific language governing permissions and limitations 
   14    * under the License.
   15    * 
   16    */
   17   
   18   /*
   19    * Previously Copyright (c) 2001-2004 James House
   20    */
   21   package org.quartz.simpl;
   22   
   23   import org.apache.commons.logging.Log;
   24   import org.apache.commons.logging.LogFactory;
   25   import org.quartz.SchedulerConfigException;
   26   import org.quartz.spi.ThreadPool;
   27   
   28   import java.util.Iterator;
   29   import java.util.LinkedList;
   30   import java.util.List;
   31   
   32   /**
   33    * <p>
   34    * This is class is a simple implementation of a thread pool, based on the
   35    * <code>{@link org.quartz.spi.ThreadPool}</code> interface.
   36    * </p>
   37    * 
   38    * <p>
   39    * <CODE>Runnable</CODE> objects are sent to the pool with the <code>{@link #runInThread(Runnable)}</code>
   40    * method, which blocks until a <code>Thread</code> becomes available.
   41    * </p>
   42    * 
   43    * <p>
   44    * The pool has a fixed number of <code>Thread</code>s, and does not grow or
   45    * shrink based on demand.
   46    * </p>
   47    * 
   48    * @author James House
   49    * @author Juergen Donnerstag
   50    */
   51   public class SimpleThreadPool implements ThreadPool {
   52   
   53       /*
   54        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   55        * 
   56        * Data members.
   57        * 
   58        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   59        */
   60   
   61       private int count = -1;
   62   
   63       private int prio = Thread.NORM_PRIORITY;
   64   
   65       private boolean isShutdown = false;
   66       private boolean handoffPending = false;
   67   
   68       private boolean inheritLoader = false;
   69   
   70       private boolean inheritGroup = true;
   71   
   72       private boolean makeThreadsDaemons = false;
   73   
   74       private ThreadGroup threadGroup;
   75   
   76       private final Object nextRunnableLock = new Object();
   77   
   78       private List workers;
   79       private LinkedList availWorkers = new LinkedList();
   80       private LinkedList busyWorkers = new LinkedList();
   81   
   82       private String threadNamePrefix = "SimpleThreadPoolWorker";
   83   
   84       private final Log log = LogFactory.getLog(getClass());
   85   
   86       /*
   87        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   88        * 
   89        * Constructors.
   90        * 
   91        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   92        */
   93   
   94       /**
   95        * <p>
   96        * Create a new (unconfigured) <code>SimpleThreadPool</code>.
   97        * </p>
   98        * 
   99        * @see #setThreadCount(int)
  100        * @see #setThreadPriority(int)
  101        */
  102       public SimpleThreadPool() {
  103       }
  104   
  105       /**
  106        * <p>
  107        * Create a new <code>SimpleThreadPool</code> with the specified number
  108        * of <code>Thread</code> s that have the given priority.
  109        * </p>
  110        * 
  111        * @param threadCount
  112        *          the number of worker <code>Threads</code> in the pool, must
  113        *          be > 0.
  114        * @param threadPriority
  115        *          the thread priority for the worker threads.
  116        * 
  117        * @see java.lang.Thread
  118        */
  119       public SimpleThreadPool(int threadCount, int threadPriority) {
  120           setThreadCount(threadCount);
  121           setThreadPriority(threadPriority);
  122       }
  123   
  124       /*
  125        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  126        * 
  127        * Interface.
  128        * 
  129        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  130        */
  131   
  132       public Log getLog() {
  133           return log;
  134       }
  135   
  136       public int getPoolSize() {
  137           return getThreadCount();
  138       }
  139   
  140       /**
  141        * <p>
  142        * Set the number of worker threads in the pool - has no effect after
  143        * <code>initialize()</code> has been called.
  144        * </p>
  145        */
  146       public void setThreadCount(int count) {
  147           this.count = count;
  148       }
  149   
  150       /**
  151        * <p>
  152        * Get the number of worker threads in the pool.
  153        * </p>
  154        */
  155       public int getThreadCount() {
  156           return count;
  157       }
  158   
  159       /**
  160        * <p>
  161        * Set the thread priority of worker threads in the pool - has no effect
  162        * after <code>initialize()</code> has been called.
  163        * </p>
  164        */
  165       public void setThreadPriority(int prio) {
  166           this.prio = prio;
  167       }
  168   
  169       /**
  170        * <p>
  171        * Get the thread priority of worker threads in the pool.
  172        * </p>
  173        */
  174       public int getThreadPriority() {
  175           return prio;
  176       }
  177   
  178       public void setThreadNamePrefix(String prfx) {
  179           this.threadNamePrefix = prfx;
  180       }
  181   
  182       public String getThreadNamePrefix() {
  183           return threadNamePrefix;
  184       }
  185   
  186       /**
  187        * @return Returns the
  188        *         threadsInheritContextClassLoaderOfInitializingThread.
  189        */
  190       public boolean isThreadsInheritContextClassLoaderOfInitializingThread() {
  191           return inheritLoader;
  192       }
  193   
  194       /**
  195        * @param inheritLoader
  196        *          The threadsInheritContextClassLoaderOfInitializingThread to
  197        *          set.
  198        */
  199       public void setThreadsInheritContextClassLoaderOfInitializingThread(
  200               boolean inheritLoader) {
  201           this.inheritLoader = inheritLoader;
  202       }
  203   
  204       public boolean isThreadsInheritGroupOfInitializingThread() {
  205           return inheritGroup;
  206       }
  207   
  208       public void setThreadsInheritGroupOfInitializingThread(
  209               boolean inheritGroup) {
  210           this.inheritGroup = inheritGroup;
  211       }
  212   
  213   
  214       /**
  215        * @return Returns the value of makeThreadsDaemons.
  216        */
  217       public boolean isMakeThreadsDaemons() {
  218           return makeThreadsDaemons;
  219       }
  220   
  221       /**
  222        * @param makeThreadsDaemons
  223        *          The value of makeThreadsDaemons to set.
  224        */
  225       public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
  226           this.makeThreadsDaemons = makeThreadsDaemons;
  227       }
  228   
  229       public void initialize() throws SchedulerConfigException {
  230   
  231           if (count <= 0) {
  232               throw new SchedulerConfigException(
  233                       "Thread count must be > 0");
  234           }
  235           if (prio <= 0 || prio > 9) {
  236               throw new SchedulerConfigException(
  237                       "Thread priority must be > 0 and <= 9");
  238           }
  239   
  240           if(isThreadsInheritGroupOfInitializingThread()) {
  241               threadGroup = Thread.currentThread().getThreadGroup();
  242           } else {
  243               // follow the threadGroup tree to the root thread group.
  244               threadGroup = Thread.currentThread().getThreadGroup();
  245               ThreadGroup parent = threadGroup;
  246               while ( !parent.getName().equals("main") ) {
  247                   threadGroup = parent;
  248                   parent = threadGroup.getParent();
  249               }
  250               threadGroup = new ThreadGroup(parent, "SimpleThreadPool");
  251               if (isMakeThreadsDaemons()) {
  252                   threadGroup.setDaemon(true);
  253               }
  254           }
  255   
  256   
  257           if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
  258               getLog().info(
  259                       "Job execution threads will use class loader of thread: "
  260                               + Thread.currentThread().getName());
  261           }
  262   
  263           // create the worker threads and start them
  264           Iterator workerThreads = createWorkerThreads(count).iterator();
  265           while(workerThreads.hasNext()) {
  266               WorkerThread wt = (WorkerThread) workerThreads.next();
  267               wt.start();
  268               availWorkers.add(wt);
  269           }
  270       }
  271   
  272       protected List createWorkerThreads(int count) {
  273           workers = new LinkedList();
  274           for (int i = 1; i<= count; ++i) {
  275               WorkerThread wt = new WorkerThread(this, threadGroup,
  276                   getThreadNamePrefix() + "-" + i,
  277                   getThreadPriority(),
  278                   isMakeThreadsDaemons());
  279               if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
  280                   wt.setContextClassLoader(Thread.currentThread()
  281                           .getContextClassLoader());
  282               }
  283               workers.add(wt);
  284           }
  285   
  286           return workers;
  287       }
  288   
  289       /**
  290        * <p>
  291        * Terminate any worker threads in this thread group.
  292        * </p>
  293        * 
  294        * <p>
  295        * Jobs currently in progress will complete.
  296        * </p>
  297        */
  298       public void shutdown() {
  299           shutdown(true);
  300       }
  301   
  302       /**
  303        * <p>
  304        * Terminate any worker threads in this thread group.
  305        * </p>
  306        * 
  307        * <p>
  308        * Jobs currently in progress will complete.
  309        * </p>
  310        */
  311       public void shutdown(boolean waitForJobsToComplete) {
  312   
  313           synchronized (nextRunnableLock) {
  314               isShutdown = true;
  315   
  316               // signal each worker thread to shut down
  317               Iterator workerThreads = workers.iterator();
  318               while(workerThreads.hasNext()) {
  319                   WorkerThread wt = (WorkerThread) workerThreads.next();
  320                   wt.shutdown();
  321                   availWorkers.remove(wt);
  322               }
  323   
  324               // Give waiting (wait(1000)) worker threads a chance to shut down.
  325               // Active worker threads will shut down after finishing their
  326               // current job.
  327               nextRunnableLock.notifyAll();
  328   
  329               if (waitForJobsToComplete == true) {
  330   
  331                   // wait for hand-off in runInThread to complete...
  332                   while(handoffPending)
  333                           try { nextRunnableLock.wait(100); } catch(Throwable t) {}
  334   
  335                   // Wait until all worker threads are shut down
  336                   while (busyWorkers.size() > 0) {
  337                       WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
  338                       try {
  339                           getLog().debug(
  340                                   "Waiting for thread " + wt.getName()
  341                                           + " to shut down");
  342   
  343                           // note: with waiting infinite time the
  344                           // application may appear to 'hang'.
  345                           nextRunnableLock.wait(2000);
  346                       } catch (InterruptedException ex) {
  347                       }
  348                   }
  349   
  350                   int activeCount = threadGroup.activeCount();
  351                   if (activeCount > 0) {
  352                       getLog().info(
  353                           "There are still " + activeCount + " worker threads active."
  354                           + " See javadoc runInThread(Runnable) for a possible explanation");
  355                   }
  356   
  357                   getLog().debug("shutdown complete");
  358               }
  359           }
  360       }
  361   
  362       /**
  363        * <p>
  364        * Run the given <code>Runnable</code> object in the next available
  365        * <code>Thread</code>. If while waiting the thread pool is asked to
  366        * shut down, the Runnable is executed immediately within a new additional
  367        * thread.
  368        * </p>
  369        * 
  370        * @param runnable
  371        *          the <code>Runnable</code> to be added.
  372        */
  373       public boolean runInThread(Runnable runnable) {
  374           if (runnable == null) {
  375               return false;
  376           }
  377   
  378           synchronized (nextRunnableLock) {
  379   
  380               handoffPending = true;
  381   
  382               // Wait until a worker thread is available
  383               while ((availWorkers.size() < 1) && !isShutdown) {
  384                   try {
  385                       nextRunnableLock.wait(500);
  386                   } catch (InterruptedException ignore) {
  387                   }
  388               }
  389   
  390               if (!isShutdown) {
  391                   WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
  392                   busyWorkers.add(wt);
  393                   wt.run(runnable);
  394               }
  395               else {
  396                   // If the thread pool is going down, execute the Runnable
  397                   // within a new additional worker thread (no thread from the pool).
  398                   WorkerThread wt = new WorkerThread(this, threadGroup,
  399                           "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
  400                   busyWorkers.add(wt);
  401                   workers.add(wt);
  402                   wt.start();
  403               }
  404               nextRunnableLock.notifyAll();
  405               handoffPending = false;
  406           }
  407   
  408           return true;
  409       }
  410   
  411       public int blockForAvailableThreads() {
  412           synchronized(nextRunnableLock) {
  413   
  414               while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
  415                   try {
  416                       nextRunnableLock.wait(500);
  417                   } catch (InterruptedException ignore) {
  418                   }
  419               }
  420   
  421               return availWorkers.size();
  422           }
  423       }
  424   
  425       protected void makeAvailable(WorkerThread wt) {
  426           synchronized(nextRunnableLock) {
  427               if(!isShutdown)
  428                   availWorkers.add(wt);
  429               busyWorkers.remove(wt);
  430               nextRunnableLock.notifyAll();
  431           }
  432       }
  433   
  434       /*
  435        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  436        * 
  437        * WorkerThread Class.
  438        * 
  439        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  440        */
  441   
  442       /**
  443        * <p>
  444        * A Worker loops, waiting to execute tasks.
  445        * </p>
  446        */
  447       class WorkerThread extends Thread {
  448   
  449           // A flag that signals the WorkerThread to terminate.
  450           private boolean run = true;
  451   
  452           private SimpleThreadPool tp;
  453   
  454           private Runnable runnable = null;
  455   
  456           /**
  457            * <p>
  458            * Create a worker thread and start it. Waiting for the next Runnable,
  459            * executing it, and waiting for the next Runnable, until the shutdown
  460            * flag is set.
  461            * </p>
  462            */
  463           WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
  464                        int prio, boolean isDaemon) {
  465   
  466               this(tp, threadGroup, name, prio, isDaemon, null);
  467           }
  468   
  469           /**
  470            * <p>
  471            * Create a worker thread, start it, execute the runnable and terminate
  472            * the thread (one time execution).
  473            * </p>
  474            */
  475           WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
  476                        int prio, boolean isDaemon, Runnable runnable) {
  477   
  478               super(threadGroup, name);
  479               this.tp = tp;
  480               this.runnable = runnable;
  481               setPriority(prio);
  482               setDaemon(isDaemon);
  483           }
  484   
  485           /**
  486            * <p>
  487            * Signal the thread that it should terminate.
  488            * </p>
  489            */
  490           void shutdown() {
  491               run = false;
  492   
  493               // Javadoc mentions that it interrupts blocked I/O operations as
  494               // well. Hence the job will most likely fail. I think we should
  495               // shut the work thread gracefully, by letting the job finish
  496               // uninterrupted. See SimpleThreadPool.shutdown()
  497               //interrupt();
  498           }
  499   
  500           public void run(Runnable newRunnable) {
  501               synchronized(this) {
  502                   if(runnable != null)
  503                       throw new IllegalStateException("Already running a Runnable!");
  504   
  505                   runnable = newRunnable;
  506                   this.notifyAll();
  507               }
  508           }
  509   
  510           /**
  511            * <p>
  512            * Loop, executing targets as they are received.
  513            * </p>
  514            */
  515           public void run() {
  516               boolean runOnce = (runnable != null);
  517   
  518               boolean ran = false;
  519               while (run) {
  520                   try {
  521                       synchronized(this) {
  522                           while (runnable == null && run) {
  523                               this.wait(500);
  524                           }
  525                       }
  526   
  527                       if (runnable != null) {
  528                           ran = true;
  529                           runnable.run();
  530                       }
  531                   } catch (InterruptedException unblock) {
  532                       // do nothing (loop will terminate if shutdown() was called
  533                       try {
  534                           getLog().error("worker threat got 'interrupt'ed.", unblock);
  535                       } catch(Exception e) {
  536                           // ignore to help with a tomcat glitch
  537                       }
  538                   } catch (Exception exceptionInRunnable) {
  539                       try {
  540                           getLog().error("Error while executing the Runnable: ",
  541                               exceptionInRunnable);
  542                       } catch(Exception e) {
  543                           // ignore to help with a tomcat glitch
  544                       }
  545                   } finally {
  546                       runnable = null;
  547                       // repair the thread in case the runnable mucked it up...
  548                       if(getPriority() != tp.getThreadPriority())
  549                           setPriority(tp.getThreadPriority());
  550   
  551                       if (runOnce) {
  552                           run = false;
  553                       }
  554                       else if(ran) {
  555                           ran = false;
  556                           makeAvailable(this);
  557                       }
  558   
  559                   }
  560               }
  561   
  562               //if (log.isDebugEnabled())
  563               try {
  564                   getLog().debug("WorkerThread is shutting down");
  565               } catch(Exception e) {
  566                   // ignore to help with a tomcat glitch
  567               }
  568           }
  569       }
  570   }

Save This Page
Home » quartz-1.6.0 » org » quartz » simpl » [javadoc | source]