1
2 /*
3 * Copyright 2004-2005 OpenSymphony
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
6 * use this file except in compliance with the License. You may obtain a copy
7 * of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
15 * under the License.
16 *
17 */
18
19 /*
20 * Previously Copyright (c) 2001-2004 James House
21 */
22 package org.quartz.core;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.quartz.JobPersistenceException;
27 import org.quartz.SchedulerException;
28 import org.quartz.Trigger;
29 import org.quartz.spi.TriggerFiredBundle;
30
31 import java.util.Random;
32
33 /**
34 * <p>
35 * The thread responsible for performing the work of firing <code>{@link Trigger}</code>
36 * s that are registered with the <code>{@link QuartzScheduler}</code>.
37 * </p>
38 *
39 * @see QuartzScheduler
40 * @see org.quartz.Job
41 * @see Trigger
42 *
43 * @author James House
44 */
45 public class QuartzSchedulerThread extends Thread {
46 /*
47 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
48 *
49 * Data members.
50 *
51 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
52 */
53 private QuartzScheduler qs;
54
55 private QuartzSchedulerResources qsRsrcs;
56
57 private Object pauseLock = new Object();
58
59 private Object idleLock = new Object();
60
61 private boolean signaled;
62
63 private boolean paused;
64
65 private boolean halted;
66
67 private SchedulingContext ctxt = null;
68
69 private Random random = new Random(System.currentTimeMillis());
70
71 // When the scheduler finds there is no current trigger to fire, how long
72 // it should wait until checking again...
73 private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
74
75 private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
76
77 private int idleWaitVariablness = 7 * 1000;
78
79 private long dbFailureRetryInterval = 15L * 1000L;
80
81 private final Log log = LogFactory.getLog(getClass());
82
83 /*
84 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
85 *
86 * Constructors.
87 *
88 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
89 */
90
91 /**
92 * <p>
93 * Construct a new <code>QuartzSchedulerThread</code> for the given
94 * <code>QuartzScheduler</code> as a non-daemon <code>Thread</code>
95 * with normal priority.
96 * </p>
97 */
98 QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
99 SchedulingContext ctxt) {
100 this(qs, qsRsrcs, ctxt, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
101 }
102
103 /**
104 * <p>
105 * Construct a new <code>QuartzSchedulerThread</code> for the given
106 * <code>QuartzScheduler</code> as a <code>Thread</code> with the given
107 * attributes.
108 * </p>
109 */
110 QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
111 SchedulingContext ctxt, boolean setDaemon, int threadPrio) {
112 super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
113 this.qs = qs;
114 this.qsRsrcs = qsRsrcs;
115 this.ctxt = ctxt;
116 this.setDaemon(setDaemon);
117 this.setPriority(threadPrio);
118
119 // start the underlying thread, but put this object into the 'paused'
120 // state
121 // so processing doesn't start yet...
122 paused = true;
123 halted = false;
124 this.start();
125 }
126
127 /*
128 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
129 *
130 * Interface.
131 *
132 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
133 */
134
135 void setIdleWaitTime(long waitTime) {
136 idleWaitTime = waitTime;
137 idleWaitVariablness = (int) (waitTime * 0.2);
138 }
139
140 private long getDbFailureRetryInterval() {
141 return dbFailureRetryInterval;
142 }
143
144 public void setDbFailureRetryInterval(long dbFailureRetryInterval) {
145 this.dbFailureRetryInterval = dbFailureRetryInterval;
146 }
147
148 private long getRandomizedIdleWaitTime() {
149 return idleWaitTime - random.nextInt(idleWaitVariablness);
150 }
151
152 /**
153 * <p>
154 * Signals the main processing loop to pause at the next possible point.
155 * </p>
156 */
157 void togglePause(boolean pause) {
158 synchronized (pauseLock) {
159 paused = pause;
160
161 if (paused) {
162 signalSchedulingChange();
163 } else {
164 pauseLock.notify();
165 }
166 }
167 }
168
169 /**
170 * <p>
171 * Signals the main processing loop to pause at the next possible point.
172 * </p>
173 */
174 void halt() {
175 synchronized (pauseLock) {
176 halted = true;
177
178 if (paused) {
179 pauseLock.notify();
180 } else {
181 signalSchedulingChange();
182 }
183 }
184 }
185
186 boolean isPaused() {
187 return paused;
188 }
189
190 /**
191 * <p>
192 * Signals the main processing loop that a change in scheduling has been
193 * made - in order to interrupt any sleeping that may be occuring while
194 * waiting for the fire time to arrive.
195 * </p>
196 */
197 void signalSchedulingChange() {
198 signaled = true;
199 }
200
201 /**
202 * <p>
203 * The main processing loop of the <code>QuartzSchedulerThread</code>.
204 * </p>
205 */
206 public void run() {
207 boolean lastAcquireFailed = false;
208
209 while (!halted) {
210 try {
211 // check if we're supposed to pause...
212 synchronized (pauseLock) {
213 while (paused && !halted) {
214 try {
215 // wait until togglePause(false) is called...
216 pauseLock.wait(100L);
217 } catch (InterruptedException ignore) {
218 }
219 }
220
221 if (halted) {
222 break;
223 }
224 }
225
226 int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
227 if(availTreadCount > 0) {
228
229 Trigger trigger = null;
230
231 long now = System.currentTimeMillis();
232
233 signaled = false;
234 try {
235 trigger = qsRsrcs.getJobStore().acquireNextTrigger(
236 ctxt, now + idleWaitTime);
237 lastAcquireFailed = false;
238 } catch (JobPersistenceException jpe) {
239 if(!lastAcquireFailed) {
240 qs.notifySchedulerListenersError(
241 "An error occured while scanning for the next trigger to fire.",
242 jpe);
243 }
244 lastAcquireFailed = true;
245 } catch (RuntimeException e) {
246 if(!lastAcquireFailed) {
247 getLog().error("quartzSchedulerThreadLoop: RuntimeException "
248 +e.getMessage(), e);
249 }
250 lastAcquireFailed = true;
251 }
252
253 if (trigger != null) {
254
255 now = System.currentTimeMillis();
256 long triggerTime = trigger.getNextFireTime().getTime();
257 long timeUntilTrigger = triggerTime - now;
258 long spinInterval = 10;
259
260 // this looping may seem a bit silly, but it's the
261 // current work-around
262 // for a dead-lock that can occur if the Thread.sleep()
263 // is replaced with
264 // a obj.wait() that gets notified when the signal is
265 // set...
266 // so to be able to detect the signal change without
267 // sleeping the entire
268 // timeUntilTrigger, we spin here... don't worry
269 // though, this spinning
270 // doesn't even register 0.2% cpu usage on a pentium 4.
271 int numPauses = (int) (timeUntilTrigger / spinInterval);
272 while (numPauses >= 0 && !signaled) {
273
274 try {
275 Thread.sleep(spinInterval);
276 } catch (InterruptedException ignore) {
277 }
278
279 now = System.currentTimeMillis();
280 timeUntilTrigger = triggerTime - now;
281 numPauses = (int) (timeUntilTrigger / spinInterval);
282 }
283 if (signaled) {
284 try {
285 qsRsrcs.getJobStore().releaseAcquiredTrigger(
286 ctxt, trigger);
287 } catch (JobPersistenceException jpe) {
288 qs.notifySchedulerListenersError(
289 "An error occured while releasing trigger '"
290 + trigger.getFullName() + "'",
291 jpe);
292 // db connection must have failed... keep
293 // retrying until it's up...
294 releaseTriggerRetryLoop(trigger);
295 } catch (RuntimeException e) {
296 getLog().error(
297 "releaseTriggerRetryLoop: RuntimeException "
298 +e.getMessage(), e);
299 // db connection must have failed... keep
300 // retrying until it's up...
301 releaseTriggerRetryLoop(trigger);
302 }
303 signaled = false;
304 continue;
305 }
306
307 // set trigger to 'executing'
308 TriggerFiredBundle bndle = null;
309
310 synchronized(pauseLock) {
311 if(!halted) {
312 try {
313 bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
314 trigger);
315 } catch (SchedulerException se) {
316 qs.notifySchedulerListenersError(
317 "An error occured while firing trigger '"
318 + trigger.getFullName() + "'", se);
319 } catch (RuntimeException e) {
320 getLog().error(
321 "RuntimeException while firing trigger " +
322 trigger.getFullName(), e);
323 // db connection must have failed... keep
324 // retrying until it's up...
325 releaseTriggerRetryLoop(trigger);
326 }
327 }
328
329 // it's possible to get 'null' if the trigger was paused,
330 // blocked, or other similar occurances that prevent it being
331 // fired at this time... or if the scheduler was shutdown (halted)
332 if (bndle == null) {
333 try {
334 qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
335 trigger);
336 } catch (SchedulerException se) {
337 qs.notifySchedulerListenersError(
338 "An error occured while releasing trigger '"
339 + trigger.getFullName() + "'", se);
340 // db connection must have failed... keep retrying
341 // until it's up...
342 releaseTriggerRetryLoop(trigger);
343 }
344 continue;
345 }
346
347 // TODO: improvements:
348 //
349 // 2- make sure we can get a job runshell before firing trigger, or
350 // don't let that throw an exception (right now it never does,
351 // but the signature says it can).
352 // 3- acquire more triggers at a time (based on num threads available?)
353
354
355 JobRunShell shell = null;
356 try {
357 shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
358 shell.initialize(qs, bndle);
359 } catch (SchedulerException se) {
360 try {
361 qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
362 trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
363 } catch (SchedulerException se2) {
364 qs.notifySchedulerListenersError(
365 "An error occured while placing job's triggers in error state '"
366 + trigger.getFullName() + "'", se2);
367 // db connection must have failed... keep retrying
368 // until it's up...
369 errorTriggerRetryLoop(bndle);
370 }
371 continue;
372 }
373
374 if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
375 try {
376 // this case should never happen, as it is indicative of the
377 // scheduler being shutdown or a bug in the thread pool or
378 // a thread pool being used concurrently - which the docs
379 // say not to do...
380 getLog().error("ThreadPool.runInThread() return false!");
381 qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
382 trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
383 } catch (SchedulerException se2) {
384 qs.notifySchedulerListenersError(
385 "An error occured while placing job's triggers in error state '"
386 + trigger.getFullName() + "'", se2);
387 // db connection must have failed... keep retrying
388 // until it's up...
389 releaseTriggerRetryLoop(trigger);
390 }
391 }
392 }
393
394 continue;
395 }
396 } else { // if(availTreadCount > 0)
397 continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
398 }
399
400 // this looping may seem a bit silly, but it's the current
401 // work-around
402 // for a dead-lock that can occur if the Thread.sleep() is replaced
403 // with
404 // a obj.wait() that gets notified when the signal is set...
405 // so to be able to detect the signal change without sleeping the
406 // entier
407 // getRandomizedIdleWaitTime(), we spin here... don't worry though,
408 // the
409 // CPU usage of this spinning can't even be measured on a pentium
410 // 4.
411 long now = System.currentTimeMillis();
412 long waitTime = now + getRandomizedIdleWaitTime();
413 long timeUntilContinue = waitTime - now;
414 long spinInterval = 10;
415 int numPauses = (int) (timeUntilContinue / spinInterval);
416
417 while (numPauses > 0 && !signaled) {
418
419 try {
420 Thread.sleep(10L);
421 } catch (InterruptedException ignore) {
422 }
423
424 now = System.currentTimeMillis();
425 timeUntilContinue = waitTime - now;
426 numPauses = (int) (timeUntilContinue / spinInterval);
427 }
428 } catch(RuntimeException re) {
429 getLog().error("Runtime error occured in main trigger firing loop.", re);
430 }
431 } // loop...
432
433 // drop references to scheduler stuff to aid garbage collection...
434 qs = null;
435 qsRsrcs = null;
436 }
437
438 public void errorTriggerRetryLoop(TriggerFiredBundle bndle) {
439 int retryCount = 0;
440 try {
441 while (!halted) {
442 try {
443 Thread.sleep(getDbFailureRetryInterval()); // retry every N
444 // seconds (the db
445 // connection must
446 // be failed)
447 retryCount++;
448 qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
449 bndle.getTrigger(), bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
450 retryCount = 0;
451 break;
452 } catch (JobPersistenceException jpe) {
453 if(retryCount % 4 == 0) {
454 qs.notifySchedulerListenersError(
455 "An error occured while releasing trigger '"
456 + bndle.getTrigger().getFullName() + "'", jpe);
457 }
458 } catch (RuntimeException e) {
459 getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
460 } catch (InterruptedException e) {
461 getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
462 }
463 }
464 } finally {
465 if(retryCount == 0) {
466 getLog().info("releaseTriggerRetryLoop: connection restored.");
467 }
468 }
469 }
470
471 public void releaseTriggerRetryLoop(Trigger trigger) {
472 int retryCount = 0;
473 try {
474 while (!halted) {
475 try {
476 Thread.sleep(getDbFailureRetryInterval()); // retry every N
477 // seconds (the db
478 // connection must
479 // be failed)
480 retryCount++;
481 qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt, trigger);
482 retryCount = 0;
483 break;
484 } catch (JobPersistenceException jpe) {
485 if(retryCount % 4 == 0) {
486 qs.notifySchedulerListenersError(
487 "An error occured while releasing trigger '"
488 + trigger.getFullName() + "'", jpe);
489 }
490 } catch (RuntimeException e) {
491 getLog().error("releaseTriggerRetryLoop: RuntimeException "+e.getMessage(), e);
492 } catch (InterruptedException e) {
493 getLog().error("releaseTriggerRetryLoop: InterruptedException "+e.getMessage(), e);
494 }
495 }
496 } finally {
497 if(retryCount == 0) {
498 getLog().info("releaseTriggerRetryLoop: connection restored.");
499 }
500 }
501 }
502
503 public Log getLog() {
504 return log;
505 }
506
507 } // end of QuartzSchedulerThread