1 /*
2 * Copyright 2004-2005 OpenSymphony
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy
6 * of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 *
16 */
17
18 /*
19 * Previously Copyright (c) 2001-2004 James House
20 */
21 package org.quartz.simpl;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.quartz.SchedulerConfigException;
26 import org.quartz.spi.ThreadPool;
27
28 import java.util.Iterator;
29 import java.util.LinkedList;
30 import java.util.List;
31
32 /**
33 * <p>
34 * This is class is a simple implementation of a thread pool, based on the
35 * <code>{@link org.quartz.spi.ThreadPool}</code> interface.
36 * </p>
37 *
38 * <p>
39 * <CODE>Runnable</CODE> objects are sent to the pool with the <code>{@link #runInThread(Runnable)}</code>
40 * method, which blocks until a <code>Thread</code> becomes available.
41 * </p>
42 *
43 * <p>
44 * The pool has a fixed number of <code>Thread</code>s, and does not grow or
45 * shrink based on demand.
46 * </p>
47 *
48 * @author James House
49 * @author Juergen Donnerstag
50 */
51 public class SimpleThreadPool implements ThreadPool {
52
53 /*
54 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
55 *
56 * Data members.
57 *
58 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
59 */
60
61 private int count = -1;
62
63 private int prio = Thread.NORM_PRIORITY;
64
65 private boolean isShutdown = false;
66 private boolean handoffPending = false;
67
68 private boolean inheritLoader = false;
69
70 private boolean inheritGroup = true;
71
72 private boolean makeThreadsDaemons = false;
73
74 private ThreadGroup threadGroup;
75
76 private final Object nextRunnableLock = new Object();
77
78 private List workers;
79 private LinkedList availWorkers = new LinkedList();
80 private LinkedList busyWorkers = new LinkedList();
81
82 private String threadNamePrefix = "SimpleThreadPoolWorker";
83
84 private final Log log = LogFactory.getLog(getClass());
85
86 /*
87 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
88 *
89 * Constructors.
90 *
91 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
92 */
93
94 /**
95 * <p>
96 * Create a new (unconfigured) <code>SimpleThreadPool</code>.
97 * </p>
98 *
99 * @see #setThreadCount(int)
100 * @see #setThreadPriority(int)
101 */
102 public SimpleThreadPool() {
103 }
104
105 /**
106 * <p>
107 * Create a new <code>SimpleThreadPool</code> with the specified number
108 * of <code>Thread</code> s that have the given priority.
109 * </p>
110 *
111 * @param threadCount
112 * the number of worker <code>Threads</code> in the pool, must
113 * be > 0.
114 * @param threadPriority
115 * the thread priority for the worker threads.
116 *
117 * @see java.lang.Thread
118 */
119 public SimpleThreadPool(int threadCount, int threadPriority) {
120 setThreadCount(threadCount);
121 setThreadPriority(threadPriority);
122 }
123
124 /*
125 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
126 *
127 * Interface.
128 *
129 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
130 */
131
132 public Log getLog() {
133 return log;
134 }
135
136 public int getPoolSize() {
137 return getThreadCount();
138 }
139
140 /**
141 * <p>
142 * Set the number of worker threads in the pool - has no effect after
143 * <code>initialize()</code> has been called.
144 * </p>
145 */
146 public void setThreadCount(int count) {
147 this.count = count;
148 }
149
150 /**
151 * <p>
152 * Get the number of worker threads in the pool.
153 * </p>
154 */
155 public int getThreadCount() {
156 return count;
157 }
158
159 /**
160 * <p>
161 * Set the thread priority of worker threads in the pool - has no effect
162 * after <code>initialize()</code> has been called.
163 * </p>
164 */
165 public void setThreadPriority(int prio) {
166 this.prio = prio;
167 }
168
169 /**
170 * <p>
171 * Get the thread priority of worker threads in the pool.
172 * </p>
173 */
174 public int getThreadPriority() {
175 return prio;
176 }
177
178 public void setThreadNamePrefix(String prfx) {
179 this.threadNamePrefix = prfx;
180 }
181
182 public String getThreadNamePrefix() {
183 return threadNamePrefix;
184 }
185
186 /**
187 * @return Returns the
188 * threadsInheritContextClassLoaderOfInitializingThread.
189 */
190 public boolean isThreadsInheritContextClassLoaderOfInitializingThread() {
191 return inheritLoader;
192 }
193
194 /**
195 * @param inheritLoader
196 * The threadsInheritContextClassLoaderOfInitializingThread to
197 * set.
198 */
199 public void setThreadsInheritContextClassLoaderOfInitializingThread(
200 boolean inheritLoader) {
201 this.inheritLoader = inheritLoader;
202 }
203
204 public boolean isThreadsInheritGroupOfInitializingThread() {
205 return inheritGroup;
206 }
207
208 public void setThreadsInheritGroupOfInitializingThread(
209 boolean inheritGroup) {
210 this.inheritGroup = inheritGroup;
211 }
212
213
214 /**
215 * @return Returns the value of makeThreadsDaemons.
216 */
217 public boolean isMakeThreadsDaemons() {
218 return makeThreadsDaemons;
219 }
220
221 /**
222 * @param makeThreadsDaemons
223 * The value of makeThreadsDaemons to set.
224 */
225 public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
226 this.makeThreadsDaemons = makeThreadsDaemons;
227 }
228
229 public void initialize() throws SchedulerConfigException {
230
231 if (count <= 0) {
232 throw new SchedulerConfigException(
233 "Thread count must be > 0");
234 }
235 if (prio <= 0 || prio > 9) {
236 throw new SchedulerConfigException(
237 "Thread priority must be > 0 and <= 9");
238 }
239
240 if(isThreadsInheritGroupOfInitializingThread()) {
241 threadGroup = Thread.currentThread().getThreadGroup();
242 } else {
243 // follow the threadGroup tree to the root thread group.
244 threadGroup = Thread.currentThread().getThreadGroup();
245 ThreadGroup parent = threadGroup;
246 while ( !parent.getName().equals("main") ) {
247 threadGroup = parent;
248 parent = threadGroup.getParent();
249 }
250 threadGroup = new ThreadGroup(parent, "SimpleThreadPool");
251 if (isMakeThreadsDaemons()) {
252 threadGroup.setDaemon(true);
253 }
254 }
255
256
257 if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
258 getLog().info(
259 "Job execution threads will use class loader of thread: "
260 + Thread.currentThread().getName());
261 }
262
263 // create the worker threads and start them
264 Iterator workerThreads = createWorkerThreads(count).iterator();
265 while(workerThreads.hasNext()) {
266 WorkerThread wt = (WorkerThread) workerThreads.next();
267 wt.start();
268 availWorkers.add(wt);
269 }
270 }
271
272 protected List createWorkerThreads(int count) {
273 workers = new LinkedList();
274 for (int i = 1; i<= count; ++i) {
275 WorkerThread wt = new WorkerThread(this, threadGroup,
276 getThreadNamePrefix() + "-" + i,
277 getThreadPriority(),
278 isMakeThreadsDaemons());
279 if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
280 wt.setContextClassLoader(Thread.currentThread()
281 .getContextClassLoader());
282 }
283 workers.add(wt);
284 }
285
286 return workers;
287 }
288
289 /**
290 * <p>
291 * Terminate any worker threads in this thread group.
292 * </p>
293 *
294 * <p>
295 * Jobs currently in progress will complete.
296 * </p>
297 */
298 public void shutdown() {
299 shutdown(true);
300 }
301
302 /**
303 * <p>
304 * Terminate any worker threads in this thread group.
305 * </p>
306 *
307 * <p>
308 * Jobs currently in progress will complete.
309 * </p>
310 */
311 public void shutdown(boolean waitForJobsToComplete) {
312
313 synchronized (nextRunnableLock) {
314 isShutdown = true;
315
316 // signal each worker thread to shut down
317 Iterator workerThreads = workers.iterator();
318 while(workerThreads.hasNext()) {
319 WorkerThread wt = (WorkerThread) workerThreads.next();
320 wt.shutdown();
321 availWorkers.remove(wt);
322 }
323
324 // Give waiting (wait(1000)) worker threads a chance to shut down.
325 // Active worker threads will shut down after finishing their
326 // current job.
327 nextRunnableLock.notifyAll();
328
329 if (waitForJobsToComplete == true) {
330
331 // wait for hand-off in runInThread to complete...
332 while(handoffPending)
333 try { nextRunnableLock.wait(100); } catch(Throwable t) {}
334
335 // Wait until all worker threads are shut down
336 while (busyWorkers.size() > 0) {
337 WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
338 try {
339 getLog().debug(
340 "Waiting for thread " + wt.getName()
341 + " to shut down");
342
343 // note: with waiting infinite time the
344 // application may appear to 'hang'.
345 nextRunnableLock.wait(2000);
346 } catch (InterruptedException ex) {
347 }
348 }
349
350 int activeCount = threadGroup.activeCount();
351 if (activeCount > 0) {
352 getLog().info(
353 "There are still " + activeCount + " worker threads active."
354 + " See javadoc runInThread(Runnable) for a possible explanation");
355 }
356
357 getLog().debug("shutdown complete");
358 }
359 }
360 }
361
362 /**
363 * <p>
364 * Run the given <code>Runnable</code> object in the next available
365 * <code>Thread</code>. If while waiting the thread pool is asked to
366 * shut down, the Runnable is executed immediately within a new additional
367 * thread.
368 * </p>
369 *
370 * @param runnable
371 * the <code>Runnable</code> to be added.
372 */
373 public boolean runInThread(Runnable runnable) {
374 if (runnable == null) {
375 return false;
376 }
377
378 synchronized (nextRunnableLock) {
379
380 handoffPending = true;
381
382 // Wait until a worker thread is available
383 while ((availWorkers.size() < 1) && !isShutdown) {
384 try {
385 nextRunnableLock.wait(500);
386 } catch (InterruptedException ignore) {
387 }
388 }
389
390 if (!isShutdown) {
391 WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
392 busyWorkers.add(wt);
393 wt.run(runnable);
394 }
395 else {
396 // If the thread pool is going down, execute the Runnable
397 // within a new additional worker thread (no thread from the pool).
398 WorkerThread wt = new WorkerThread(this, threadGroup,
399 "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
400 busyWorkers.add(wt);
401 workers.add(wt);
402 wt.start();
403 }
404 nextRunnableLock.notifyAll();
405 handoffPending = false;
406 }
407
408 return true;
409 }
410
411 public int blockForAvailableThreads() {
412 synchronized(nextRunnableLock) {
413
414 while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
415 try {
416 nextRunnableLock.wait(500);
417 } catch (InterruptedException ignore) {
418 }
419 }
420
421 return availWorkers.size();
422 }
423 }
424
425 protected void makeAvailable(WorkerThread wt) {
426 synchronized(nextRunnableLock) {
427 if(!isShutdown)
428 availWorkers.add(wt);
429 busyWorkers.remove(wt);
430 nextRunnableLock.notifyAll();
431 }
432 }
433
434 /*
435 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
436 *
437 * WorkerThread Class.
438 *
439 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
440 */
441
442 /**
443 * <p>
444 * A Worker loops, waiting to execute tasks.
445 * </p>
446 */
447 class WorkerThread extends Thread {
448
449 // A flag that signals the WorkerThread to terminate.
450 private boolean run = true;
451
452 private SimpleThreadPool tp;
453
454 private Runnable runnable = null;
455
456 /**
457 * <p>
458 * Create a worker thread and start it. Waiting for the next Runnable,
459 * executing it, and waiting for the next Runnable, until the shutdown
460 * flag is set.
461 * </p>
462 */
463 WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
464 int prio, boolean isDaemon) {
465
466 this(tp, threadGroup, name, prio, isDaemon, null);
467 }
468
469 /**
470 * <p>
471 * Create a worker thread, start it, execute the runnable and terminate
472 * the thread (one time execution).
473 * </p>
474 */
475 WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
476 int prio, boolean isDaemon, Runnable runnable) {
477
478 super(threadGroup, name);
479 this.tp = tp;
480 this.runnable = runnable;
481 setPriority(prio);
482 setDaemon(isDaemon);
483 }
484
485 /**
486 * <p>
487 * Signal the thread that it should terminate.
488 * </p>
489 */
490 void shutdown() {
491 run = false;
492
493 // Javadoc mentions that it interrupts blocked I/O operations as
494 // well. Hence the job will most likely fail. I think we should
495 // shut the work thread gracefully, by letting the job finish
496 // uninterrupted. See SimpleThreadPool.shutdown()
497 //interrupt();
498 }
499
500 public void run(Runnable newRunnable) {
501 synchronized(this) {
502 if(runnable != null)
503 throw new IllegalStateException("Already running a Runnable!");
504
505 runnable = newRunnable;
506 this.notifyAll();
507 }
508 }
509
510 /**
511 * <p>
512 * Loop, executing targets as they are received.
513 * </p>
514 */
515 public void run() {
516 boolean runOnce = (runnable != null);
517
518 boolean ran = false;
519 while (run) {
520 try {
521 synchronized(this) {
522 while (runnable == null && run) {
523 this.wait(500);
524 }
525 }
526
527 if (runnable != null) {
528 ran = true;
529 runnable.run();
530 }
531 } catch (InterruptedException unblock) {
532 // do nothing (loop will terminate if shutdown() was called
533 try {
534 getLog().error("worker threat got 'interrupt'ed.", unblock);
535 } catch(Exception e) {
536 // ignore to help with a tomcat glitch
537 }
538 } catch (Exception exceptionInRunnable) {
539 try {
540 getLog().error("Error while executing the Runnable: ",
541 exceptionInRunnable);
542 } catch(Exception e) {
543 // ignore to help with a tomcat glitch
544 }
545 } finally {
546 runnable = null;
547 // repair the thread in case the runnable mucked it up...
548 if(getPriority() != tp.getThreadPriority())
549 setPriority(tp.getThreadPriority());
550
551 if (runOnce) {
552 run = false;
553 }
554 else if(ran) {
555 ran = false;
556 makeAvailable(this);
557 }
558
559 }
560 }
561
562 //if (log.isDebugEnabled())
563 try {
564 getLog().debug("WorkerThread is shutting down");
565 } catch(Exception e) {
566 // ignore to help with a tomcat glitch
567 }
568 }
569 }
570 }