1
2 /*
3 * Copyright 2004-2005 OpenSymphony
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
6 * use this file except in compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
15 * under the License.
16 *
17 */
18
19 /*
20 * Previously Copyright (c) 2001-2004 James House
21 */
22 package org.quartz.core;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.quartz.Job;
27 import org.quartz.JobDetail;
28 import org.quartz.JobExecutionContext;
29 import org.quartz.JobExecutionException;
30 import org.quartz.JobPersistenceException;
31 import org.quartz.Scheduler;
32 import org.quartz.SchedulerException;
33 import org.quartz.Trigger;
34 import org.quartz.spi.TriggerFiredBundle;
35
36 /**
37 * <p>
38 * JobRunShell instances are responsible for providing the 'safe' environment
39 * for <code>Job</code> s to run in, and for performing all of the work of
40 * executing the <code>Job</code>, catching ANY thrown exceptions, updating
41 * the <code>Trigger</code> with the <code>Job</code>'s completion code,
42 * etc.
43 * </p>
44 *
45 * <p>
46 * A <code>JobRunShell</code> instance is created by a <code>JobRunShellFactory</code>
47 * on behalf of the <code>QuartzSchedulerThread</code> which then runs the
48 * shell in a thread from the configured <code>ThreadPool</code> when the
49 * scheduler determines that a <code>Job</code> has been triggered.
50 * </p>
51 *
52 * @see JobRunShellFactory
53 * @see org.quartz.core.QuartzSchedulerThread
54 * @see org.quartz.Job
55 * @see org.quartz.Trigger
56 *
57 * @author James House
58 */
59 public class JobRunShell implements Runnable {
60 /*
61 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
62 *
63 * Data members.
64 *
65 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
66 */
67
68 protected JobExecutionContext jec = null;
69
70 protected QuartzScheduler qs = null;
71
72 protected Scheduler scheduler = null;
73
74 protected SchedulingContext schdCtxt = null;
75
76 protected JobRunShellFactory jobRunShellFactory = null;
77
78 protected boolean shutdownRequested = false;
79
80 private final Log log = LogFactory.getLog(getClass());
81
82 /*
83 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
84 *
85 * Constructors.
86 *
87 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
88 */
89
90 /**
91 * <p>
92 * Create a JobRunShell instance with the given settings.
93 * </p>
94 *
95 * @param jobRunShellFactory
96 * A handle to the <code>JobRunShellFactory</code> that produced
97 * this <code>JobRunShell</code>.
98 * @param scheduler
99 * The <code>Scheduler</code> instance that should be made
100 * available within the <code>JobExecutionContext</code>.
101 * @param schdCtxt
102 * the <code>SchedulingContext</code> that should be used by the
103 * <code>JobRunShell</code> when making updates to the <code>JobStore</code>.
104 */
105 public JobRunShell(JobRunShellFactory jobRunShellFactory,
106 Scheduler scheduler, SchedulingContext schdCtxt) {
107 this.jobRunShellFactory = jobRunShellFactory;
108 this.scheduler = scheduler;
109 this.schdCtxt = schdCtxt;
110 }
111
112 /*
113 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
114 *
115 * Interface.
116 *
117 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
118 */
119
120 protected Log getLog() {
121 return log;
122 }
123
124 public void initialize(QuartzScheduler qs, TriggerFiredBundle firedBundle)
125 throws SchedulerException {
126 this.qs = qs;
127
128 Job job = null;
129 JobDetail jobDetail = firedBundle.getJobDetail();
130
131 try {
132 job = qs.getJobFactory().newJob(firedBundle);
133 } catch (SchedulerException se) {
134 qs.notifySchedulerListenersError(
135 "An error occured instantiating job to be executed. job= '"
136 + jobDetail.getFullName() + "'", se);
137 throw se;
138 } catch (Throwable ncdfe) { // such as NoClassDefFoundError
139 SchedulerException se = new SchedulerException(
140 "Problem instantiating class '"
141 + jobDetail.getJobClass().getName() + "' - ", ncdfe);
142 qs.notifySchedulerListenersError(
143 "An error occured instantiating job to be executed. job= '"
144 + jobDetail.getFullName() + "'", se);
145 throw se;
146 }
147
148 this.jec = new JobExecutionContext(scheduler, firedBundle, job);
149 }
150
151 public void requestShutdown() {
152 shutdownRequested = true;
153 }
154
155 public void run() {
156 Trigger trigger = jec.getTrigger();
157 JobDetail jobDetail = jec.getJobDetail();
158
159 do {
160
161 JobExecutionException jobExEx = null;
162 Job job = jec.getJobInstance();
163
164 try {
165 begin();
166 } catch (SchedulerException se) {
167 qs.notifySchedulerListenersError("Error executing Job ("
168 + jec.getJobDetail().getFullName()
169 + ": couldn't begin execution.", se);
170 break;
171 }
172
173 // notify job & trigger listeners...
174 try {
175 if (!notifyListenersBeginning(jec)) {
176 break;
177 }
178 } catch(VetoedException ve) {
179 try {
180 int instCode = trigger.executionComplete(jec, null);
181 try {
182 qs.notifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
183 }
184 catch(JobPersistenceException jpe) {
185 vetoedJobRetryLoop(trigger, jobDetail, instCode);
186 }
187 complete(true);
188 } catch (SchedulerException se) {
189 qs.notifySchedulerListenersError("Error during veto of Job ("
190 + jec.getJobDetail().getFullName()
191 + ": couldn't finalize execution.", se);
192 }
193 break;
194 }
195
196 long startTime = System.currentTimeMillis();
197 long endTime = startTime;
198
199 // execute the job
200 try {
201 log.debug("Calling execute on job " + jobDetail.getFullName());
202 job.execute(jec);
203 endTime = System.currentTimeMillis();
204 } catch (JobExecutionException jee) {
205 endTime = System.currentTimeMillis();
206 jobExEx = jee;
207 getLog().info("Job " + jobDetail.getFullName() +
208 " threw a JobExecutionException: ", jobExEx);
209 } catch (Throwable e) {
210 endTime = System.currentTimeMillis();
211 getLog().error("Job " + jobDetail.getFullName() +
212 " threw an unhandled Exception: ", e);
213 SchedulerException se = new SchedulerException(
214 "Job threw an unhandled exception.", e);
215 se.setErrorCode(SchedulerException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
216 qs.notifySchedulerListenersError("Job ("
217 + jec.getJobDetail().getFullName()
218 + " threw an exception.", se);
219 jobExEx = new JobExecutionException(se, false);
220 jobExEx.setErrorCode(JobExecutionException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
221 }
222
223 jec.setJobRunTime(endTime - startTime);
224
225 // notify all job listeners
226 if (!notifyJobListenersComplete(jec, jobExEx)) {
227 break;
228 }
229
230 int instCode = Trigger.INSTRUCTION_NOOP;
231
232 // update the trigger
233 try {
234 instCode = trigger.executionComplete(jec, jobExEx);
235 } catch (Exception e) {
236 // If this happens, there's a bug in the trigger...
237 SchedulerException se = new SchedulerException(
238 "Trigger threw an unhandled exception.", e);
239 se.setErrorCode(SchedulerException.ERR_TRIGGER_THREW_EXCEPTION);
240 qs.notifySchedulerListenersError(
241 "Please report this error to the Quartz developers.",
242 se);
243 }
244
245 // notify all trigger listeners
246 if (!notifyTriggerListenersComplete(jec, instCode)) {
247 break;
248 }
249
250 // update job/trigger or re-execute job
251 if (instCode == Trigger.INSTRUCTION_RE_EXECUTE_JOB) {
252 jec.incrementRefireCount();
253 try {
254 complete(false);
255 } catch (SchedulerException se) {
256 qs.notifySchedulerListenersError("Error executing Job ("
257 + jec.getJobDetail().getFullName()
258 + ": couldn't finalize execution.", se);
259 }
260 continue;
261 }
262
263 try {
264 complete(true);
265 } catch (SchedulerException se) {
266 qs.notifySchedulerListenersError("Error executing Job ("
267 + jec.getJobDetail().getFullName()
268 + ": couldn't finalize execution.", se);
269 continue;
270 }
271
272 try {
273 qs.notifyJobStoreJobComplete(schdCtxt, trigger, jobDetail,
274 instCode);
275 } catch (JobPersistenceException jpe) {
276 qs.notifySchedulerListenersError(
277 "An error occured while marking executed job complete. job= '"
278 + jobDetail.getFullName() + "'", jpe);
279 if (!completeTriggerRetryLoop(trigger, jobDetail, instCode)) {
280 return;
281 }
282 }
283
284 break;
285 } while (true);
286
287 qs.notifySchedulerThread();
288
289 jobRunShellFactory.returnJobRunShell(this);
290 }
291
292 protected void begin() throws SchedulerException {
293 }
294
295 protected void complete(boolean successfulExecution)
296 throws SchedulerException {
297 }
298
299 public void passivate() {
300 jec = null;
301 qs = null;
302 }
303
304 private boolean notifyListenersBeginning(JobExecutionContext jec) throws VetoedException {
305
306 boolean vetoed = false;
307
308 // notify all trigger listeners
309 try {
310 vetoed = qs.notifyTriggerListenersFired(jec);
311 } catch (SchedulerException se) {
312 qs.notifySchedulerListenersError(
313 "Unable to notify TriggerListener(s) while firing trigger "
314 + "(Trigger and Job will NOT be fired!). trigger= "
315 + jec.getTrigger().getFullName() + " job= "
316 + jec.getJobDetail().getFullName(), se);
317
318 return false;
319 }
320
321 if(vetoed) {
322 try {
323 qs.notifyJobListenersWasVetoed(jec);
324 } catch (SchedulerException se) {
325 qs.notifySchedulerListenersError(
326 "Unable to notify JobListener(s) of vetoed execution " +
327 "while firing trigger (Trigger and Job will NOT be " +
328 "fired!). trigger= "
329 + jec.getTrigger().getFullName() + " job= "
330 + jec.getJobDetail().getFullName(), se);
331
332 }
333 throw new VetoedException();
334 }
335
336 // notify all job listeners
337 try {
338 qs.notifyJobListenersToBeExecuted(jec);
339 } catch (SchedulerException se) {
340 qs.notifySchedulerListenersError(
341 "Unable to notify JobListener(s) of Job to be executed: "
342 + "(Job will NOT be executed!). trigger= "
343 + jec.getTrigger().getFullName() + " job= "
344 + jec.getJobDetail().getFullName(), se);
345
346 return false;
347 }
348
349 return true;
350 }
351
352 private boolean notifyJobListenersComplete(JobExecutionContext jec,
353 JobExecutionException jobExEx) {
354 try {
355 qs.notifyJobListenersWasExecuted(jec, jobExEx);
356 } catch (SchedulerException se) {
357 qs.notifySchedulerListenersError(
358 "Unable to notify JobListener(s) of Job that was executed: "
359 + "(error will be ignored). trigger= "
360 + jec.getTrigger().getFullName() + " job= "
361 + jec.getJobDetail().getFullName(), se);
362
363 return false;
364 }
365
366 return true;
367 }
368
369 private boolean notifyTriggerListenersComplete(JobExecutionContext jec,
370 int instCode) {
371 try {
372 qs.notifyTriggerListenersComplete(jec, instCode);
373
374 } catch (SchedulerException se) {
375 qs.notifySchedulerListenersError(
376 "Unable to notify TriggerListener(s) of Job that was executed: "
377 + "(error will be ignored). trigger= "
378 + jec.getTrigger().getFullName() + " job= "
379 + jec.getJobDetail().getFullName(), se);
380
381 return false;
382 }
383 if (jec.getTrigger().getNextFireTime() == null) {
384 qs.notifySchedulerListenersFinalized(jec.getTrigger());
385 }
386
387 return true;
388 }
389
390 public boolean completeTriggerRetryLoop(Trigger trigger,
391 JobDetail jobDetail, int instCode) {
392 while (!shutdownRequested) {
393 try {
394 Thread.sleep(5 * 1000L); // retry every 5 seconds (the db
395 // connection must be failed)
396 qs.notifyJobStoreJobComplete(schdCtxt, trigger, jobDetail,
397 instCode);
398 return true;
399 } catch (JobPersistenceException jpe) {
400 qs.notifySchedulerListenersError(
401 "An error occured while marking executed job complete. job= '"
402 + jobDetail.getFullName() + "'", jpe);
403 } catch (InterruptedException ignore) {
404 }
405 }
406 return false;
407 }
408
409 public boolean vetoedJobRetryLoop(Trigger trigger, JobDetail jobDetail, int instCode) {
410 while (!shutdownRequested) {
411 try {
412 Thread.sleep(5 * 1000L); // retry every 5 seconds (the db
413 // connection must be failed)
414 qs.notifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
415 return true;
416 } catch (JobPersistenceException jpe) {
417 qs.notifySchedulerListenersError(
418 "An error occured while marking executed job vetoed. job= '"
419 + jobDetail.getFullName() + "'", jpe);
420 } catch (InterruptedException ignore) {
421 }
422 }
423 return false;
424 }
425
426 class VetoedException extends Exception {
427 public VetoedException() {
428 }
429 }
430 }