Save This Page
Home » quartz-1.6.0 » org » quartz » core » [javadoc | source]
    1   
    2   /* 
    3    * Copyright 2004-2005 OpenSymphony 
    4    * 
    5    * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
    6    * use this file except in compliance with the License. You may obtain a copy 
    7    * of the License at 
    8    * 
    9    *   http://www.apache.org/licenses/LICENSE-2.0 
   10    *   
   11    * Unless required by applicable law or agreed to in writing, software 
   12    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
   13    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
   14    * License for the specific language governing permissions and limitations 
   15    * under the License.
   16    * 
   17    */
   18   
   19   /*
   20    * Previously Copyright (c) 2001-2004 James House
   21    */
   22   package org.quartz.core;
   23   
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   import org.quartz.JobPersistenceException;
   27   import org.quartz.SchedulerException;
   28   import org.quartz.Trigger;
   29   import org.quartz.spi.TriggerFiredBundle;
   30   
   31   import java.util.Random;
   32   
   33   /**
   34    * <p>
   35    * The thread responsible for performing the work of firing <code>{@link Trigger}</code>
   36    * s that are registered with the <code>{@link QuartzScheduler}</code>.
   37    * </p>
   38    * 
   39    * @see QuartzScheduler
   40    * @see org.quartz.Job
   41    * @see Trigger
   42    * 
   43    * @author James House
   44    */
   45   public class QuartzSchedulerThread extends Thread {
   46       /*
   47        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   48        * 
   49        * Data members.
   50        * 
   51        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   52        */
   53       private QuartzScheduler qs;
   54   
   55       private QuartzSchedulerResources qsRsrcs;
   56   
   57       private Object pauseLock = new Object();
   58   
   59       private Object idleLock = new Object();
   60   
   61       private boolean signaled;
   62   
   63       private boolean paused;
   64   
   65       private boolean halted;
   66   
   67       private SchedulingContext ctxt = null;
   68   
   69       private Random random = new Random(System.currentTimeMillis());
   70   
   71       // When the scheduler finds there is no current trigger to fire, how long
   72       // it should wait until checking again...
   73       private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
   74   
   75       private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
   76   
   77       private int idleWaitVariablness = 7 * 1000;
   78   
   79       private long dbFailureRetryInterval = 15L * 1000L;
   80   
   81       private final Log log = LogFactory.getLog(getClass());
   82   
   83       /*
   84        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   85        * 
   86        * Constructors.
   87        * 
   88        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   89        */
   90   
   91       /**
   92        * <p>
   93        * Construct a new <code>QuartzSchedulerThread</code> for the given
   94        * <code>QuartzScheduler</code> as a non-daemon <code>Thread</code>
   95        * with normal priority.
   96        * </p>
   97        */
   98       QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
   99               SchedulingContext ctxt) {
  100           this(qs, qsRsrcs, ctxt, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
  101       }
  102   
  103       /**
  104        * <p>
  105        * Construct a new <code>QuartzSchedulerThread</code> for the given
  106        * <code>QuartzScheduler</code> as a <code>Thread</code> with the given
  107        * attributes.
  108        * </p>
  109        */
  110       QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
  111               SchedulingContext ctxt, boolean setDaemon, int threadPrio) {
  112           super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
  113           this.qs = qs;
  114           this.qsRsrcs = qsRsrcs;
  115           this.ctxt = ctxt;
  116           this.setDaemon(setDaemon);
  117           this.setPriority(threadPrio);
  118   
  119           // start the underlying thread, but put this object into the 'paused'
  120           // state
  121           // so processing doesn't start yet...
  122           paused = true;
  123           halted = false;
  124           this.start();
  125       }
  126   
  127       /*
  128        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  129        * 
  130        * Interface.
  131        * 
  132        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  133        */
  134   
  135       void setIdleWaitTime(long waitTime) {
  136           idleWaitTime = waitTime;
  137           idleWaitVariablness = (int) (waitTime * 0.2);
  138       }
  139   
  140       private long getDbFailureRetryInterval() {
  141           return dbFailureRetryInterval;
  142       }
  143   
  144       public void setDbFailureRetryInterval(long dbFailureRetryInterval) {
  145           this.dbFailureRetryInterval = dbFailureRetryInterval;
  146       }
  147   
  148       private long getRandomizedIdleWaitTime() {
  149           return idleWaitTime - random.nextInt(idleWaitVariablness);
  150       }
  151   
  152       /**
  153        * <p>
  154        * Signals the main processing loop to pause at the next possible point.
  155        * </p>
  156        */
  157       void togglePause(boolean pause) {
  158           synchronized (pauseLock) {
  159               paused = pause;
  160   
  161               if (paused) {
  162                   signalSchedulingChange();
  163               } else {
  164                   pauseLock.notify();
  165               }
  166           }
  167       }
  168   
  169       /**
  170        * <p>
  171        * Signals the main processing loop to pause at the next possible point.
  172        * </p>
  173        */
  174       void halt() {
  175           synchronized (pauseLock) {
  176               halted = true;
  177   
  178               if (paused) {
  179                   pauseLock.notify();
  180               } else {
  181                   signalSchedulingChange();
  182               }
  183           }
  184       }
  185   
  186       boolean isPaused() {
  187           return paused;
  188       }
  189   
  190       /**
  191        * <p>
  192        * Signals the main processing loop that a change in scheduling has been
  193        * made - in order to interrupt any sleeping that may be occuring while
  194        * waiting for the fire time to arrive.
  195        * </p>
  196        */
  197       void signalSchedulingChange() {
  198           signaled = true;
  199       }
  200   
  201       /**
  202        * <p>
  203        * The main processing loop of the <code>QuartzSchedulerThread</code>.
  204        * </p>
  205        */
  206       public void run() {
  207           boolean lastAcquireFailed = false;
  208           
  209           while (!halted) {
  210               try {
  211                   // check if we're supposed to pause...
  212                   synchronized (pauseLock) {
  213                       while (paused && !halted) {
  214                           try {
  215                               // wait until togglePause(false) is called...
  216                               pauseLock.wait(100L);
  217                           } catch (InterruptedException ignore) {
  218                           }
  219                       }
  220       
  221                       if (halted) {
  222                           break;
  223                       }
  224                   }
  225   
  226                   int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  227                   if(availTreadCount > 0) {
  228   
  229                       Trigger trigger = null;
  230   
  231                       long now = System.currentTimeMillis();
  232   
  233                       signaled = false;
  234                       try {
  235                           trigger = qsRsrcs.getJobStore().acquireNextTrigger(
  236                                   ctxt, now + idleWaitTime);
  237                           lastAcquireFailed = false;
  238                       } catch (JobPersistenceException jpe) {
  239                           if(!lastAcquireFailed) {
  240                               qs.notifySchedulerListenersError(
  241                                   "An error occured while scanning for the next trigger to fire.",
  242                                   jpe);
  243                           }
  244                           lastAcquireFailed = true;
  245                       } catch (RuntimeException e) {
  246                           if(!lastAcquireFailed) {
  247                               getLog().error("quartzSchedulerThreadLoop: RuntimeException "
  248                                       +e.getMessage(), e);
  249                           }
  250                           lastAcquireFailed = true;
  251                       }
  252   
  253                       if (trigger != null) {
  254   
  255                           now = System.currentTimeMillis();
  256                           long triggerTime = trigger.getNextFireTime().getTime();
  257                           long timeUntilTrigger = triggerTime - now;
  258                           long spinInterval = 10;
  259   
  260                           // this looping may seem a bit silly, but it's the
  261                           // current work-around
  262                           // for a dead-lock that can occur if the Thread.sleep()
  263                           // is replaced with
  264                           // a obj.wait() that gets notified when the signal is
  265                           // set...
  266                           // so to be able to detect the signal change without
  267                           // sleeping the entire
  268                           // timeUntilTrigger, we spin here... don't worry
  269                           // though, this spinning
  270                           // doesn't even register 0.2% cpu usage on a pentium 4.
  271                           int numPauses = (int) (timeUntilTrigger / spinInterval);
  272                           while (numPauses >= 0 && !signaled) {
  273   
  274                               try {
  275                                   Thread.sleep(spinInterval);
  276                               } catch (InterruptedException ignore) {
  277                               }
  278   
  279                               now = System.currentTimeMillis();
  280                               timeUntilTrigger = triggerTime - now;
  281                               numPauses = (int) (timeUntilTrigger / spinInterval);
  282                           }
  283                           if (signaled) {
  284                               try {
  285                                   qsRsrcs.getJobStore().releaseAcquiredTrigger(
  286                                           ctxt, trigger);
  287                               } catch (JobPersistenceException jpe) {
  288                                   qs.notifySchedulerListenersError(
  289                                           "An error occured while releasing trigger '"
  290                                                   + trigger.getFullName() + "'",
  291                                           jpe);
  292                                   // db connection must have failed... keep
  293                                   // retrying until it's up...
  294                                   releaseTriggerRetryLoop(trigger);
  295                               } catch (RuntimeException e) {
  296                                   getLog().error(
  297                                       "releaseTriggerRetryLoop: RuntimeException "
  298                                       +e.getMessage(), e);
  299                                   // db connection must have failed... keep
  300                                   // retrying until it's up...
  301                                   releaseTriggerRetryLoop(trigger);
  302                               }
  303                               signaled = false;
  304                               continue;
  305                           }
  306   
  307                           // set trigger to 'executing'
  308                           TriggerFiredBundle bndle = null;
  309   
  310                           synchronized(pauseLock) {
  311                               if(!halted) {
  312                                   try {
  313                                       bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
  314                                               trigger);
  315                                   } catch (SchedulerException se) {
  316                                       qs.notifySchedulerListenersError(
  317                                               "An error occured while firing trigger '"
  318                                                       + trigger.getFullName() + "'", se);
  319                                   } catch (RuntimeException e) {
  320                                       getLog().error(
  321                                           "RuntimeException while firing trigger " +
  322                                           trigger.getFullName(), e);
  323                                       // db connection must have failed... keep
  324                                       // retrying until it's up...
  325                                       releaseTriggerRetryLoop(trigger);
  326                                   }
  327                               }
  328   
  329                               // it's possible to get 'null' if the trigger was paused,
  330                               // blocked, or other similar occurances that prevent it being
  331                               // fired at this time...  or if the scheduler was shutdown (halted)
  332                               if (bndle == null) {
  333                                   try {
  334                                       qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
  335                                               trigger);
  336                                   } catch (SchedulerException se) {
  337                                       qs.notifySchedulerListenersError(
  338                                               "An error occured while releasing trigger '"
  339                                                       + trigger.getFullName() + "'", se);
  340                                       // db connection must have failed... keep retrying
  341                                       // until it's up...
  342                                       releaseTriggerRetryLoop(trigger);
  343                                   }
  344                                   continue;
  345                               }
  346   
  347                               // TODO: improvements:
  348                               //
  349                               // 2- make sure we can get a job runshell before firing trigger, or
  350                               //   don't let that throw an exception (right now it never does,
  351                               //   but the signature says it can).
  352                               // 3- acquire more triggers at a time (based on num threads available?)
  353   
  354   
  355                               JobRunShell shell = null;
  356                               try {
  357                                   shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
  358                                   shell.initialize(qs, bndle);
  359                               } catch (SchedulerException se) {
  360                                   try {
  361                                       qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
  362                                               trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
  363                                   } catch (SchedulerException se2) {
  364                                       qs.notifySchedulerListenersError(
  365                                               "An error occured while placing job's triggers in error state '"
  366                                                       + trigger.getFullName() + "'", se2);
  367                                       // db connection must have failed... keep retrying
  368                                       // until it's up...
  369                                       errorTriggerRetryLoop(bndle);
  370                                   }
  371                                   continue;
  372                               }
  373   
  374                               if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
  375                                   try {
  376                                       // this case should never happen, as it is indicative of the
  377                                       // scheduler being shutdown or a bug in the thread pool or
  378                                       // a thread pool being used concurrently - which the docs
  379                                       // say not to do...
  380                                       getLog().error("ThreadPool.runInThread() return false!");
  381                                       qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
  382                                               trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
  383                                   } catch (SchedulerException se2) {
  384                                       qs.notifySchedulerListenersError(
  385                                               "An error occured while placing job's triggers in error state '"
  386                                                       + trigger.getFullName() + "'", se2);
  387                                       // db connection must have failed... keep retrying
  388                                       // until it's up...
  389                                       releaseTriggerRetryLoop(trigger);
  390                                   }
  391                               }
  392                           }
  393   
  394                           continue;
  395                       }
  396                   } else { // if(availTreadCount > 0)
  397                       continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
  398                   }
  399   
  400                   // this looping may seem a bit silly, but it's the current
  401                   // work-around
  402                   // for a dead-lock that can occur if the Thread.sleep() is replaced
  403                   // with
  404                   // a obj.wait() that gets notified when the signal is set...
  405                   // so to be able to detect the signal change without sleeping the
  406                   // entier
  407                   // getRandomizedIdleWaitTime(), we spin here... don't worry though,
  408                   // the
  409                   // CPU usage of this spinning can't even be measured on a pentium
  410                   // 4.
  411                   long now = System.currentTimeMillis();
  412                   long waitTime = now + getRandomizedIdleWaitTime();
  413                   long timeUntilContinue = waitTime - now;
  414                   long spinInterval = 10;
  415                   int numPauses = (int) (timeUntilContinue / spinInterval);
  416       
  417                   while (numPauses > 0 && !signaled) {
  418       
  419                       try {
  420                           Thread.sleep(10L);
  421                       } catch (InterruptedException ignore) {
  422                       }
  423       
  424                       now = System.currentTimeMillis();
  425                       timeUntilContinue = waitTime - now;
  426                       numPauses = (int) (timeUntilContinue / spinInterval);
  427                   }
  428               } catch(RuntimeException re) {
  429                   getLog().error("Runtime error occured in main trigger firing loop.", re);
  430               }
  431           } // loop...
  432   
  433           // drop references to scheduler stuff to aid garbage collection...
  434           qs = null;
  435           qsRsrcs = null;
  436       }
  437   
  438       public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
  439           int retryCount = 0;
  440           try {
  441               while (!halted) {
  442                   try {
  443                       Thread.sleep(getDbFailureRetryInterval()); // retry every N
  444                       // seconds (the db
  445                       // connection must
  446                       // be failed)
  447                       retryCount++;
  448                       qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
  449                               bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
  450                       retryCount = 0;
  451                       break;
  452                   } catch (JobPersistenceException jpe) {
  453                       if(retryCount % 4 == 0) {
  454                           qs.notifySchedulerListenersError(
  455                               "An error occured while releasing trigger '"
  456                                       + bndle.getTrigger().getFullName() + "'", jpe);
  457                       }
  458                   } catch (RuntimeException e) {
  459                       getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
  460                   } catch (InterruptedException e) {
  461                       getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
  462                   }
  463               }
  464           } finally {
  465               if(retryCount == 0) {
  466                   getLog().info("releaseTriggerRetryLoop: connection restored.");
  467               }
  468           }
  469       }
  470       
  471       public void releaseTriggerRetryLoop(Trigger trigger) {
  472           int retryCount = 0;
  473           try {
  474               while (!halted) {
  475                   try {
  476                       Thread.sleep(getDbFailureRetryInterval()); // retry every N
  477                       // seconds (the db
  478                       // connection must
  479                       // be failed)
  480                       retryCount++;
  481                       qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger);
  482                       retryCount = 0;
  483                       break;
  484                   } catch (JobPersistenceException jpe) {
  485                       if(retryCount % 4 == 0) {
  486                           qs.notifySchedulerListenersError(
  487                               "An error occured while releasing trigger '"
  488                                       + trigger.getFullName() + "'", jpe);
  489                       }
  490                   } catch (RuntimeException e) {
  491                       getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
  492                   } catch (InterruptedException e) {
  493                       getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
  494                   }
  495               }
  496           } finally {
  497               if(retryCount == 0) {
  498                   getLog().info("releaseTriggerRetryLoop: connection restored.");
  499               }
  500           }
  501       }
  502       
  503       public Log getLog() {
  504           return log;
  505       }
  506   
  507   } // end of QuartzSchedulerThread

Save This Page
Home » quartz-1.6.0 » org » quartz » core » [javadoc | source]