public void run() {
boolean lastAcquireFailed = false;
while (!halted) {
try {
// check if we're supposed to pause...
synchronized (pauseLock) {
while (paused && !halted) {
try {
// wait until togglePause(false) is called...
pauseLock.wait(100L);
} catch (InterruptedException ignore) {
}
}
if (halted) {
break;
}
}
int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availTreadCount > 0) {
Trigger trigger = null;
long now = System.currentTimeMillis();
signaled = false;
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
}
if (trigger != null) {
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
long spinInterval = 10;
// this looping may seem a bit silly, but it's the
// current work-around
// for a dead-lock that can occur if the Thread.sleep()
// is replaced with
// a obj.wait() that gets notified when the signal is
// set...
// so to be able to detect the signal change without
// sleeping the entire
// timeUntilTrigger, we spin here... don't worry
// though, this spinning
// doesn't even register 0.2% cpu usage on a pentium 4.
int numPauses = (int) (timeUntilTrigger / spinInterval);
while (numPauses >= 0 && !signaled) {
try {
Thread.sleep(spinInterval);
} catch (InterruptedException ignore) {
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
numPauses = (int) (timeUntilTrigger / spinInterval);
}
if (signaled) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(
ctxt, trigger);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'",
jpe);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
} catch (RuntimeException e) {
getLog().error(
"releaseTriggerRetryLoop: RuntimeException "
+e.getMessage(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
signaled = false;
continue;
}
// set trigger to 'executing'
TriggerFiredBundle bndle = null;
synchronized(pauseLock) {
if(!halted) {
try {
bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while firing trigger '"
+ trigger.getFullName() + "'", se);
} catch (RuntimeException e) {
getLog().error(
"RuntimeException while firing trigger " +
trigger.getFullName(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
}
// it's possible to get 'null' if the trigger was paused,
// blocked, or other similar occurances that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'", se);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
continue;
}
// TODO: improvements:
//
// 2- make sure we can get a job runshell before firing trigger, or
// don't let that throw an exception (right now it never does,
// but the signature says it can).
// 3- acquire more triggers at a time (based on num threads available?)
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
}
continue;
}
} else { // if(availTreadCount > 0)
continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
}
// this looping may seem a bit silly, but it's the current
// work-around
// for a dead-lock that can occur if the Thread.sleep() is replaced
// with
// a obj.wait() that gets notified when the signal is set...
// so to be able to detect the signal change without sleeping the
// entier
// getRandomizedIdleWaitTime(), we spin here... don't worry though,
// the
// CPU usage of this spinning can't even be measured on a pentium
// 4.
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
long spinInterval = 10;
int numPauses = (int) (timeUntilContinue / spinInterval);
while (numPauses > 0 && !signaled) {
try {
Thread.sleep(10L);
} catch (InterruptedException ignore) {
}
now = System.currentTimeMillis();
timeUntilContinue = waitTime - now;
numPauses = (int) (timeUntilContinue / spinInterval);
}
} catch(RuntimeException re) {
getLog().error("Runtime error occured in main trigger firing loop.", re);
}
} // loop...
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
|