Source code: org/mortbay/util/ThreadPool.java
1 // ========================================================================
2 // Copyright (c) 1999 Mort Bay Consulting (Australia) Pty. Ltd.
3 // $Id: ThreadPool.java,v 1.31 2003/11/16 11:54:30 gregwilkins Exp $
4 // ========================================================================
5
6 package org.mortbay.util;
7
8 import java.io.Serializable;
9
10 import org.apache.commons.logging.Log;
11 import org.apache.commons.logging.LogFactory;
12
13 /* ------------------------------------------------------------ */
14 /** A pool of threads.
15 * <p>
16 * Avoids the expense of thread creation by pooling threads after
17 * their run methods exit for reuse.
18 * <p>
19 * If the maximum pool size is reached, jobs wait for a free thread.
20 * By default there is no maximum pool size. Idle threads timeout
21 * and terminate until the minimum number of threads are running.
22 * <p>
23 * This implementation uses the run(Object) method to place a
24 * job on a queue, which is read by the getJob(timeout) method.
25 * Derived implementations may specialize getJob(timeout) to
26 * obtain jobs from other sources without queing overheads.
27 *
28 * @version $Id: ThreadPool.java,v 1.31 2003/11/16 11:54:30 gregwilkins Exp $
29 * @author Juancarlo Aņez <juancarlo@modelistica.com>
30 * @author Greg Wilkins <gregw@mortbay.com>
31 */
32 public class ThreadPool
33 implements LifeCycle, Serializable
34 {
35 static Log log = LogFactory.getLog(ThreadPool.class);
36
37 public static final String __DAEMON="org.mortbay.util.ThreadPool.daemon";
38 public static final String __PRIORITY="org.mortbay.util.ThreadPool.priority";
39
40 /* ------------------------------------------------------------------- */
41 private String _name;
42 private Pool _pool;
43 private Object _join="";
44
45 private transient boolean _started;
46
47 /* ------------------------------------------------------------------- */
48 /* Construct
49 */
50 public ThreadPool()
51 {
52 _pool=new Pool();
53 _pool.setPoolClass(ThreadPool.PoolThread.class);
54 _name=this.getClass().getName();
55 int dot=_name.lastIndexOf('.');
56 if (dot>=0)
57 _name=_name.substring(dot+1);
58 }
59
60 /* ------------------------------------------------------------ */
61 /**
62 * @return The name of the ThreadPool.
63 */
64 public String getName()
65 {
66 return _name;
67 }
68
69 /* ------------------------------------------------------------ */
70 /**
71 * @param name Name of the ThreadPool to use when naming Threads.
72 */
73 public void setName(String name)
74 {
75 _name=name;
76 }
77
78 /* ------------------------------------------------------------ */
79 /**
80 * @return Name of the Pool instance this ThreadPool uses or null for
81 * an anonymous private pool.
82 */
83 public String getPoolName()
84 {
85 return _pool.getPoolName();
86 }
87
88 /* ------------------------------------------------------------ */
89 /** Set the Pool name.
90 * All ThreadPool instances with the same Pool name will share the
91 * same Pool instance. Thus they will share the same max, min and
92 * available Threads. The field values of the first ThreadPool to call
93 * setPoolName with a specific name are used for the named
94 * Pool. Subsequent ThreadPools that join the name pool will loose their
95 * private values.
96 * @param name Name of the Pool instance this ThreadPool uses or null for
97 * an anonymous private pool.
98 */
99 public void setPoolName(String name)
100 {
101 synchronized(Pool.class)
102 {
103 if (isStarted())
104 {
105 if ((name==null && _pool.getPoolName()!=null) ||
106 (name!=null && !name.equals(_pool.getPoolName())))
107 throw new IllegalStateException("started");
108 return;
109 }
110
111 if (name==null)
112 {
113 if (_pool.getPoolName()!=null)
114 _pool=new Pool();
115 }
116 else
117 {
118 Pool pool=Pool.getPool(name);
119 if (pool==null)
120 _pool.setPoolName(name);
121 else
122 _pool=pool;
123 }
124 }
125 }
126
127 /* ------------------------------------------------------------ */
128 /**
129 * Delegated to the named or anonymous Pool.
130 */
131 public boolean isDaemon()
132 {
133 return _pool.getAttribute(__DAEMON)!=null;
134 }
135
136 /* ------------------------------------------------------------ */
137 /**
138 * Delegated to the named or anonymous Pool.
139 */
140 public void setDaemon(boolean daemon)
141 {
142 _pool.setAttribute(__DAEMON,daemon?"true":null);
143 }
144
145 /* ------------------------------------------------------------ */
146 /** Is the pool running jobs.
147 * @return True if start() has been called.
148 */
149 public boolean isStarted()
150 {
151 return _started;
152 }
153
154 /* ------------------------------------------------------------ */
155 /** Get the number of threads in the pool.
156 * Delegated to the named or anonymous Pool.
157 * @see #getIdleThreads
158 * @return Number of threads
159 */
160 public int getThreads()
161 {
162 return _pool.size();
163 }
164
165 /* ------------------------------------------------------------ */
166 /** Get the number of idle threads in the pool.
167 * Delegated to the named or anonymous Pool.
168 * @see #getThreads
169 * @return Number of threads
170 */
171 public int getIdleThreads()
172 {
173 return _pool.available();
174 }
175
176 /* ------------------------------------------------------------ */
177 /** Get the minimum number of threads.
178 * Delegated to the named or anonymous Pool.
179 * @see #setMinThreads
180 * @return minimum number of threads.
181 */
182 public int getMinThreads()
183 {
184 return _pool.getMinSize();
185 }
186
187 /* ------------------------------------------------------------ */
188 /** Set the minimum number of threads.
189 * Delegated to the named or anonymous Pool.
190 * @see #getMinThreads
191 * @param minThreads minimum number of threads
192 */
193 public void setMinThreads(int minThreads)
194 {
195 _pool.setMinSize(minThreads);
196 }
197
198 /* ------------------------------------------------------------ */
199 /** Set the maximum number of threads.
200 * Delegated to the named or anonymous Pool.
201 * @see #setMaxThreads
202 * @return maximum number of threads.
203 */
204 public int getMaxThreads()
205 {
206 return _pool.getMaxSize();
207 }
208
209 /* ------------------------------------------------------------ */
210 /** Set the maximum number of threads.
211 * Delegated to the named or anonymous Pool.
212 * @see #getMaxThreads
213 * @param maxThreads maximum number of threads.
214 */
215 public void setMaxThreads(int maxThreads)
216 {
217 _pool.setMaxSize(maxThreads);
218 }
219
220 /* ------------------------------------------------------------ */
221 /** Get the maximum thread idle time.
222 * Delegated to the named or anonymous Pool.
223 * @see #setMaxIdleTimeMs
224 * @return Max idle time in ms.
225 */
226 public int getMaxIdleTimeMs()
227 {
228 return _pool.getMaxIdleTimeMs();
229 }
230
231 /* ------------------------------------------------------------ */
232 /** Set the maximum thread idle time.
233 * Threads that are idle for longer than this period may be
234 * stopped.
235 * Delegated to the named or anonymous Pool.
236 * @see #getMaxIdleTimeMs
237 * @param maxIdleTimeMs Max idle time in ms.
238 */
239 public void setMaxIdleTimeMs(int maxIdleTimeMs)
240 {
241 _pool.setMaxIdleTimeMs(maxIdleTimeMs);
242 }
243
244 /* ------------------------------------------------------------ */
245 /** Get the priority of the pool threads.
246 * @return the priority of the pool threads.
247 */
248 public int getThreadsPriority()
249 {
250 int priority = Thread.NORM_PRIORITY;
251
252 Object o = _pool.getAttribute(__PRIORITY);
253 if (o != null)
254 {
255 priority = ((Integer) o).intValue();
256 }
257
258 return priority;
259 }
260
261 /* ------------------------------------------------------------ */
262 /** Set the priority of the pool threads.
263 * @param priority the new thread priority.
264 */
265 public void setThreadsPriority(int priority)
266 {
267 _pool.setAttribute(__PRIORITY, new Integer(priority));
268 }
269
270 /* ------------------------------------------------------------ */
271 /** Set Max Read Time.
272 * @deprecated maxIdleTime is used instead.
273 */
274 public void setMaxStopTimeMs(int ms)
275 {
276 log.warn("setMaxStopTimeMs is deprecated. No longer required.");
277 }
278
279 /* ------------------------------------------------------------ */
280 /* Start the ThreadPool.
281 * Construct the minimum number of threads.
282 */
283 public void start()
284 throws Exception
285 {
286 _started=true;
287 _pool.start();
288 }
289
290 /* ------------------------------------------------------------ */
291 /** Stop the ThreadPool.
292 * New jobs are no longer accepted,idle threads are interrupted
293 * and stopJob is called on active threads.
294 * The method then waits
295 * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
296 * stop, at which time killJob is called.
297 */
298 public void stop()
299 throws InterruptedException
300 {
301 _started=false;
302 _pool.stop();
303 synchronized(_join)
304 {
305 _join.notifyAll();
306 }
307 }
308
309 /* ------------------------------------------------------------ */
310 public void join()
311 {
312 while(isStarted() && _pool!=null)
313 {
314 synchronized(_join)
315 {
316 try{if (isStarted() && _pool!=null)_join.wait(30000);}
317 catch (Exception e)
318 {
319 LogSupport.ignore(log,e);
320 }
321 }
322 }
323 }
324
325 /* ------------------------------------------------------------ */
326 public void shrink()
327 throws InterruptedException
328 {
329 _pool.shrink();
330 }
331
332
333 /* ------------------------------------------------------------ */
334 /** Run job.
335 * Give a job to the pool.
336 * @param job If the job is derived from Runnable, the run method
337 * is called, otherwise it is passed as the argument to the handle
338 * method.
339 */
340 public void run(Object job)
341 throws InterruptedException
342 {
343 if (job==null)
344 return;
345
346 try
347 {
348 PoolThread thread=(PoolThread)_pool.get(getMaxIdleTimeMs());
349
350 if (thread!=null)
351 thread.run(this,job);
352 else
353 {
354 log.warn("No thread for "+job);
355 stopJob(null,job);
356 }
357 }
358 catch (InterruptedException e) {throw e;}
359 catch (Exception e){log.warn(LogSupport.EXCEPTION,e);}
360 }
361
362
363 /* ------------------------------------------------------------ */
364 /** Handle a job.
365 * Called by the allocated thread to handle a job. If the job is a
366 * Runnable, it's run method is called. Otherwise this method needs to be
367 * specialized by a derived class to provide specific handling.
368 * @param job The job to execute.
369 * @exception InterruptedException
370 */
371 protected void handle(Object job)
372 throws InterruptedException
373 {
374 if (job!=null && job instanceof Runnable)
375 ((Runnable)job).run();
376 else
377 log.warn("Invalid job: "+job);
378 }
379
380 /* ------------------------------------------------------------ */
381 /** Stop a Job.
382 * This method is called by the Pool if a job needs to be stopped.
383 * The default implementation does nothing and should be extended by a
384 * derived thread pool class if special action is required.
385 * @param thread The thread allocated to the job, or null if no thread allocated.
386 * @param job The job object passed to run.
387 */
388 protected void stopJob(Thread thread, Object job)
389 {
390 }
391
392 /* ------------------------------------------------------------ */
393 /** Pool Thread class.
394 * The PoolThread allows the threads job to be
395 * retrieved and active status to be indicated.
396 */
397 public static class PoolThread extends Thread implements Pool.PondLife
398 {
399 ThreadPool _threadPool;
400 Pool _pool;
401 Object _job;
402 int _id;
403 String _name;
404
405 /* ------------------------------------------------------------ */
406 public void enterPool(Pool pool,int id)
407 {
408 _pool=pool;
409 _id=id;
410 _name=_pool.getPoolName()==null
411 ?("PoolThread-"+id):(_pool.getPoolName()+"-"+id);
412 this.setName(_name);
413 this.setDaemon(pool.getAttribute(__DAEMON)!=null);
414
415 Object o = pool.getAttribute(__PRIORITY);
416 if (o != null)
417 {
418 this.setPriority(((Integer) o).intValue());
419 }
420
421 this.start();
422 if(LogSupport.isTraceEnabled(log))if(LogSupport.isTraceEnabled(log))log.trace("enterPool "+this+" -> "+pool);
423 }
424
425 /* ------------------------------------------------------------ */
426 public int getID()
427 {
428 return _id;
429 }
430
431 /* ------------------------------------------------------------ */
432 public void poolClosing()
433 {
434 synchronized(this)
435 {
436 _pool=null;
437 if (_job==null)
438 notify();
439 else
440 interrupt();
441 }
442 }
443 /* ------------------------------------------------------------ */
444 public void leavePool()
445 {
446 if(LogSupport.isTraceEnabled(log))log.trace("leavePool "+this+" <- "+_pool);
447 synchronized(this)
448 {
449 _pool=null;
450 if (_job==null || _threadPool==null)
451 notify();
452 else
453 {
454 _threadPool.stopJob(this,_job);
455 _job=null;
456 }
457 }
458 }
459
460
461 /* ------------------------------------------------------------ */
462 public void run(ThreadPool pool, Object job)
463 {
464 synchronized(this)
465 {
466 _threadPool=pool;
467 _job=job;
468 notify();
469 }
470 }
471
472 /* ------------------------------------------------------------ */
473 /** ThreadPool run.
474 * Loop getting jobs and handling them until idle or stopped.
475 */
476 public void run()
477 {
478 Object job=null;
479 while (_pool!=null && _pool.isStarted())
480 {
481 try
482 {
483 synchronized(this)
484 {
485 // Wait for a job.
486 if (job==null && _pool!=null && _pool.isStarted() && _job==null)
487 wait(_pool.getMaxIdleTimeMs());
488
489 if (_job!=null)
490 {
491 job=_job;
492 _job=null;
493 }
494 }
495
496 // handle
497 if (job!=null)
498 _threadPool.handle(job);
499 }
500 catch (InterruptedException e)
501 {
502 LogSupport.ignore(log,e);
503 }
504 finally
505 {
506 synchronized(this)
507 {
508 boolean got=job!=null;
509 job=null;
510 _threadPool=null;
511 try
512 {
513 if (got&&_pool!=null)
514 _pool.put(this);
515 }
516 catch (InterruptedException e){LogSupport.ignore(log,e);}
517 }
518 }
519 }
520 }
521
522 public String toString()
523 {
524 return _name;
525 }
526 }
527 }
528