Save This Page
Home » quartz-1.6.0 » org » quartz » core » [javadoc | source]
    1   
    2   /* 
    3    * Copyright 2004-2005 OpenSymphony 
    4    * 
    5    * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
    6    * use this file except in compliance with the License. You may obtain a copy 
    7    * of the License at 
    8    * 
    9    *   http://www.apache.org/licenses/LICENSE-2.0 
   10    *   
   11    * Unless required by applicable law or agreed to in writing, software 
   12    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
   13    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
   14    * License for the specific language governing permissions and limitations 
   15    * under the License.
   16    * 
   17    */
   18   
   19   /*
   20    * Previously Copyright (c) 2001-2004 James House
   21    */
   22   package org.quartz.core;
   23   
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   import org.quartz.Job;
   27   import org.quartz.JobDetail;
   28   import org.quartz.JobExecutionContext;
   29   import org.quartz.JobExecutionException;
   30   import org.quartz.JobPersistenceException;
   31   import org.quartz.Scheduler;
   32   import org.quartz.SchedulerException;
   33   import org.quartz.Trigger;
   34   import org.quartz.spi.TriggerFiredBundle;
   35   
   36   /**
   37    * <p>
   38    * JobRunShell instances are responsible for providing the 'safe' environment
   39    * for <code>Job</code> s to run in, and for performing all of the work of
   40    * executing the <code>Job</code>, catching ANY thrown exceptions, updating
   41    * the <code>Trigger</code> with the <code>Job</code>'s completion code,
   42    * etc.
   43    * </p>
   44    * 
   45    * <p>
   46    * A <code>JobRunShell</code> instance is created by a <code>JobRunShellFactory</code>
   47    * on behalf of the <code>QuartzSchedulerThread</code> which then runs the
   48    * shell in a thread from the configured <code>ThreadPool</code> when the
   49    * scheduler determines that a <code>Job</code> has been triggered.
   50    * </p>
   51    * 
   52    * @see JobRunShellFactory
   53    * @see org.quartz.core.QuartzSchedulerThread
   54    * @see org.quartz.Job
   55    * @see org.quartz.Trigger
   56    * 
   57    * @author James House
   58    */
   59   public class JobRunShell implements Runnable {
   60       /*
   61        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   62        * 
   63        * Data members.
   64        * 
   65        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   66        */
   67   
   68       protected JobExecutionContext jec = null;
   69   
   70       protected QuartzScheduler qs = null;
   71   
   72       protected Scheduler scheduler = null;
   73   
   74       protected SchedulingContext schdCtxt = null;
   75   
   76       protected JobRunShellFactory jobRunShellFactory = null;
   77   
   78       protected boolean shutdownRequested = false;
   79   
   80       private final Log log = LogFactory.getLog(getClass());
   81       
   82       /*
   83        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   84        * 
   85        * Constructors.
   86        * 
   87        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   88        */
   89   
   90       /**
   91        * <p>
   92        * Create a JobRunShell instance with the given settings.
   93        * </p>
   94        * 
   95        * @param jobRunShellFactory
   96        *          A handle to the <code>JobRunShellFactory</code> that produced
   97        *          this <code>JobRunShell</code>.
   98        * @param scheduler
   99        *          The <code>Scheduler</code> instance that should be made
  100        *          available within the <code>JobExecutionContext</code>.
  101        * @param schdCtxt
  102        *          the <code>SchedulingContext</code> that should be used by the
  103        *          <code>JobRunShell</code> when making updates to the <code>JobStore</code>.
  104        */
  105       public JobRunShell(JobRunShellFactory jobRunShellFactory,
  106               Scheduler scheduler, SchedulingContext schdCtxt) {
  107           this.jobRunShellFactory = jobRunShellFactory;
  108           this.scheduler = scheduler;
  109           this.schdCtxt = schdCtxt;
  110       }
  111   
  112       /*
  113        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  114        * 
  115        * Interface.
  116        * 
  117        * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  118        */
  119   
  120       protected Log getLog() {
  121           return log;
  122       }
  123       
  124       public void initialize(QuartzScheduler qs, TriggerFiredBundle firedBundle)
  125           throws SchedulerException {
  126           this.qs = qs;
  127   
  128           Job job = null;
  129           JobDetail jobDetail = firedBundle.getJobDetail();
  130   
  131           try {
  132               job = qs.getJobFactory().newJob(firedBundle);
  133           } catch (SchedulerException se) {
  134               qs.notifySchedulerListenersError(
  135                       "An error occured instantiating job to be executed. job= '"
  136                               + jobDetail.getFullName() + "'", se);
  137               throw se;
  138           } catch (Throwable ncdfe) { // such as NoClassDefFoundError
  139               SchedulerException se = new SchedulerException(
  140                       "Problem instantiating class '"
  141                               + jobDetail.getJobClass().getName() + "' - ", ncdfe);
  142               qs.notifySchedulerListenersError(
  143                       "An error occured instantiating job to be executed. job= '"
  144                               + jobDetail.getFullName() + "'", se);
  145               throw se;
  146           }
  147   
  148           this.jec = new JobExecutionContext(scheduler, firedBundle, job);
  149       }
  150   
  151       public void requestShutdown() {
  152           shutdownRequested = true;
  153       }
  154   
  155       public void run() {
  156           Trigger trigger = jec.getTrigger();
  157           JobDetail jobDetail = jec.getJobDetail();
  158   
  159           do {
  160   
  161               JobExecutionException jobExEx = null;
  162               Job job = jec.getJobInstance();
  163   
  164               try {
  165                   begin();
  166               } catch (SchedulerException se) {
  167                   qs.notifySchedulerListenersError("Error executing Job ("
  168                           + jec.getJobDetail().getFullName()
  169                           + ": couldn't begin execution.", se);
  170                   break;
  171               }
  172   
  173               // notify job & trigger listeners...
  174               try {
  175                   if (!notifyListenersBeginning(jec)) {
  176                       break;
  177                   }
  178               } catch(VetoedException ve) {
  179                   try {
  180                       int instCode = trigger.executionComplete(jec, null);
  181                       try {
  182                           qs.notifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
  183                       }
  184                       catch(JobPersistenceException jpe) {
  185                           vetoedJobRetryLoop(trigger, jobDetail, instCode);
  186                       }
  187                       complete(true);
  188                   } catch (SchedulerException se) {
  189                       qs.notifySchedulerListenersError("Error during veto of Job ("
  190                               + jec.getJobDetail().getFullName()
  191                               + ": couldn't finalize execution.", se);
  192                   }
  193                   break;
  194               }
  195   
  196               long startTime = System.currentTimeMillis();
  197               long endTime = startTime;
  198               
  199               // execute the job
  200               try {
  201                   log.debug("Calling execute on job " + jobDetail.getFullName());
  202                   job.execute(jec);
  203                   endTime = System.currentTimeMillis();
  204               } catch (JobExecutionException jee) {
  205                   endTime = System.currentTimeMillis();
  206                   jobExEx = jee;
  207                   getLog().info("Job " + jobDetail.getFullName() + 
  208                           " threw a JobExecutionException: ", jobExEx);
  209               } catch (Throwable e) {
  210                   endTime = System.currentTimeMillis();
  211                   getLog().error("Job " + jobDetail.getFullName() + 
  212                           " threw an unhandled Exception: ", e);
  213                   SchedulerException se = new SchedulerException(
  214                           "Job threw an unhandled exception.", e);
  215                   se.setErrorCode(SchedulerException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
  216                   qs.notifySchedulerListenersError("Job ("
  217                           + jec.getJobDetail().getFullName()
  218                           + " threw an exception.", se);
  219                   jobExEx = new JobExecutionException(se, false);
  220                   jobExEx.setErrorCode(JobExecutionException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
  221               } 
  222               
  223               jec.setJobRunTime(endTime - startTime);
  224   
  225               // notify all job listeners
  226               if (!notifyJobListenersComplete(jec, jobExEx)) {
  227                   break;
  228               }
  229   
  230               int instCode = Trigger.INSTRUCTION_NOOP;
  231   
  232               // update the trigger
  233               try {
  234                   instCode = trigger.executionComplete(jec, jobExEx);
  235               } catch (Exception e) {
  236                   // If this happens, there's a bug in the trigger...
  237                   SchedulerException se = new SchedulerException(
  238                           "Trigger threw an unhandled exception.", e);
  239                   se.setErrorCode(SchedulerException.ERR_TRIGGER_THREW_EXCEPTION);
  240                   qs.notifySchedulerListenersError(
  241                           "Please report this error to the Quartz developers.",
  242                           se);
  243               }
  244   
  245               // notify all trigger listeners
  246               if (!notifyTriggerListenersComplete(jec, instCode)) {
  247                   break;
  248               }
  249   
  250               // update job/trigger or re-execute job
  251               if (instCode == Trigger.INSTRUCTION_RE_EXECUTE_JOB) {
  252                   jec.incrementRefireCount();
  253                   try {
  254                       complete(false);
  255                   } catch (SchedulerException se) {
  256                       qs.notifySchedulerListenersError("Error executing Job ("
  257                               + jec.getJobDetail().getFullName()
  258                               + ": couldn't finalize execution.", se);
  259                   }
  260                   continue;
  261               }
  262   
  263               try {
  264                   complete(true);
  265               } catch (SchedulerException se) {
  266                   qs.notifySchedulerListenersError("Error executing Job ("
  267                           + jec.getJobDetail().getFullName()
  268                           + ": couldn't finalize execution.", se);
  269                   continue;
  270               }
  271   
  272               try {
  273                   qs.notifyJobStoreJobComplete(schdCtxt, trigger, jobDetail,
  274                           instCode);
  275               } catch (JobPersistenceException jpe) {
  276                   qs.notifySchedulerListenersError(
  277                           "An error occured while marking executed job complete. job= '"
  278                                   + jobDetail.getFullName() + "'", jpe);
  279                   if (!completeTriggerRetryLoop(trigger, jobDetail, instCode)) {
  280                       return;
  281                   }
  282               }
  283   
  284               break;
  285           } while (true);
  286   
  287           qs.notifySchedulerThread();
  288   
  289           jobRunShellFactory.returnJobRunShell(this);
  290       }
  291   
  292       protected void begin() throws SchedulerException {
  293       }
  294   
  295       protected void complete(boolean successfulExecution)
  296           throws SchedulerException {
  297       }
  298   
  299       public void passivate() {
  300           jec = null;
  301           qs = null;
  302       }
  303   
  304       private boolean notifyListenersBeginning(JobExecutionContext jec) throws VetoedException {
  305           
  306           boolean vetoed = false;
  307           
  308           // notify all trigger listeners
  309           try {
  310               vetoed = qs.notifyTriggerListenersFired(jec);
  311           } catch (SchedulerException se) {
  312               qs.notifySchedulerListenersError(
  313                       "Unable to notify TriggerListener(s) while firing trigger "
  314                               + "(Trigger and Job will NOT be fired!). trigger= "
  315                               + jec.getTrigger().getFullName() + " job= "
  316                               + jec.getJobDetail().getFullName(), se);
  317   
  318               return false;
  319           }
  320   
  321           if(vetoed) {
  322               try {
  323                   qs.notifyJobListenersWasVetoed(jec);
  324               } catch (SchedulerException se) {
  325                   qs.notifySchedulerListenersError(
  326                           "Unable to notify JobListener(s) of vetoed execution " +
  327                           "while firing trigger (Trigger and Job will NOT be " +
  328                           "fired!). trigger= "
  329                           + jec.getTrigger().getFullName() + " job= "
  330                           + jec.getJobDetail().getFullName(), se);
  331   
  332               }
  333               throw new VetoedException();
  334           }
  335               
  336           // notify all job listeners
  337           try {
  338               qs.notifyJobListenersToBeExecuted(jec);
  339           } catch (SchedulerException se) {
  340               qs.notifySchedulerListenersError(
  341                       "Unable to notify JobListener(s) of Job to be executed: "
  342                               + "(Job will NOT be executed!). trigger= "
  343                               + jec.getTrigger().getFullName() + " job= "
  344                               + jec.getJobDetail().getFullName(), se);
  345   
  346               return false;
  347           }
  348   
  349           return true;
  350       }
  351   
  352       private boolean notifyJobListenersComplete(JobExecutionContext jec,
  353               JobExecutionException jobExEx) {
  354           try {
  355               qs.notifyJobListenersWasExecuted(jec, jobExEx);
  356           } catch (SchedulerException se) {
  357               qs.notifySchedulerListenersError(
  358                       "Unable to notify JobListener(s) of Job that was executed: "
  359                               + "(error will be ignored). trigger= "
  360                               + jec.getTrigger().getFullName() + " job= "
  361                               + jec.getJobDetail().getFullName(), se);
  362   
  363               return false;
  364           }
  365   
  366           return true;
  367       }
  368   
  369       private boolean notifyTriggerListenersComplete(JobExecutionContext jec,
  370               int instCode) {
  371           try {
  372               qs.notifyTriggerListenersComplete(jec, instCode);
  373   
  374           } catch (SchedulerException se) {
  375               qs.notifySchedulerListenersError(
  376                       "Unable to notify TriggerListener(s) of Job that was executed: "
  377                               + "(error will be ignored). trigger= "
  378                               + jec.getTrigger().getFullName() + " job= "
  379                               + jec.getJobDetail().getFullName(), se);
  380   
  381               return false;
  382           }
  383           if (jec.getTrigger().getNextFireTime() == null) {
  384               qs.notifySchedulerListenersFinalized(jec.getTrigger());
  385           }
  386   
  387           return true;
  388       }
  389   
  390       public boolean completeTriggerRetryLoop(Trigger trigger,
  391               JobDetail jobDetail, int instCode) {
  392           while (!shutdownRequested) {
  393               try {
  394                   Thread.sleep(5 * 1000L); // retry every 5 seconds (the db
  395                   // connection must be failed)
  396                   qs.notifyJobStoreJobComplete(schdCtxt, trigger, jobDetail,
  397                           instCode);
  398                   return true;
  399               } catch (JobPersistenceException jpe) {
  400                   qs.notifySchedulerListenersError(
  401                           "An error occured while marking executed job complete. job= '"
  402                                   + jobDetail.getFullName() + "'", jpe);
  403               } catch (InterruptedException ignore) {
  404               }
  405           }
  406           return false;
  407       }
  408   
  409       public boolean vetoedJobRetryLoop(Trigger trigger, JobDetail jobDetail, int instCode) {
  410           while (!shutdownRequested) {
  411               try {
  412                   Thread.sleep(5 * 1000L); // retry every 5 seconds (the db
  413                   // connection must be failed)
  414                   qs.notifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
  415                   return true;
  416               } catch (JobPersistenceException jpe) {
  417                   qs.notifySchedulerListenersError(
  418                           "An error occured while marking executed job vetoed. job= '"
  419                                   + jobDetail.getFullName() + "'", jpe);
  420               } catch (InterruptedException ignore) {
  421               }
  422           }
  423           return false;
  424       }
  425   
  426       class VetoedException extends Exception {
  427           public VetoedException() {
  428           }
  429       }
  430   }

Save This Page
Home » quartz-1.6.0 » org » quartz » core » [javadoc | source]