Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/mortbay/util/ThreadPool.java


1   // ========================================================================
2   // Copyright (c) 1999 Mort Bay Consulting (Australia) Pty. Ltd.
3   // $Id: ThreadPool.java,v 1.31 2003/11/16 11:54:30 gregwilkins Exp $
4   // ========================================================================
5   
6   package org.mortbay.util;
7   
8   import java.io.Serializable;
9   
10  import org.apache.commons.logging.Log;
11  import org.apache.commons.logging.LogFactory;
12  
13  /* ------------------------------------------------------------ */
14  /** A pool of threads.
15   * <p>
16   * Avoids the expense of thread creation by pooling threads after
17   * their run methods exit for reuse.
18   * <p>
19   * If the maximum pool size is reached, jobs wait for a free thread.
20   * By default there is no maximum pool size.  Idle threads timeout
21   * and terminate until the minimum number of threads are running.
22   * <p>
23   * This implementation uses the run(Object) method to place a
24   * job on a queue, which is read by the getJob(timeout) method.
25   * Derived implementations may specialize getJob(timeout) to
26   * obtain jobs from other sources without queing overheads.
27   *
28   * @version $Id: ThreadPool.java,v 1.31 2003/11/16 11:54:30 gregwilkins Exp $
29   * @author Juancarlo Aņez <juancarlo@modelistica.com>
30   * @author Greg Wilkins <gregw@mortbay.com>
31   */
32  public class ThreadPool
33      implements LifeCycle, Serializable
34  {
35      static Log log = LogFactory.getLog(ThreadPool.class);
36  
37      public static final String __DAEMON="org.mortbay.util.ThreadPool.daemon";
38      public static final String __PRIORITY="org.mortbay.util.ThreadPool.priority";
39      
40      /* ------------------------------------------------------------------- */
41      private String _name;
42      private Pool _pool;
43      private Object _join="";
44  
45      private transient boolean _started;
46      
47      /* ------------------------------------------------------------------- */
48      /* Construct
49       */
50      public ThreadPool()
51      {
52          _pool=new Pool();
53          _pool.setPoolClass(ThreadPool.PoolThread.class);
54          _name=this.getClass().getName();
55          int dot=_name.lastIndexOf('.');
56          if (dot>=0)
57              _name=_name.substring(dot+1);
58      }
59      
60      /* ------------------------------------------------------------ */
61      /** 
62       * @return The name of the ThreadPool.
63       */
64      public String getName()
65      {
66          return _name;
67      }
68  
69      /* ------------------------------------------------------------ */
70      /** 
71       * @param name Name of the ThreadPool to use when naming Threads.
72       */
73      public void setName(String name)
74      {
75          _name=name;
76      }
77      
78      /* ------------------------------------------------------------ */
79      /** 
80       * @return Name of the Pool instance this ThreadPool uses or null for
81       * an anonymous private pool.
82       */
83      public String getPoolName()
84      {
85          return _pool.getPoolName();
86      }
87  
88      /* ------------------------------------------------------------ */
89      /** Set the Pool name.
90       * All ThreadPool instances with the same Pool name will share the
91       * same Pool instance. Thus they will share the same max, min and
92       * available Threads.  The field values of the first ThreadPool to call
93       * setPoolName with a specific name are used for the named
94       * Pool. Subsequent ThreadPools that join the name pool will loose their
95       * private values.
96       * @param name Name of the Pool instance this ThreadPool uses or null for
97       * an anonymous private pool.
98       */
99      public void setPoolName(String name)
100     {
101         synchronized(Pool.class)
102         {
103             if (isStarted())
104             {
105                 if ((name==null && _pool.getPoolName()!=null) ||
106                     (name!=null && !name.equals(_pool.getPoolName())))
107                     throw new IllegalStateException("started");
108                 return;
109             }
110             
111             if (name==null)
112             {
113                 if (_pool.getPoolName()!=null)
114                     _pool=new Pool();
115             }
116             else
117             {
118                 Pool pool=Pool.getPool(name);
119                 if (pool==null)
120                     _pool.setPoolName(name);
121                 else
122                     _pool=pool;
123             }
124         }
125     }
126 
127     /* ------------------------------------------------------------ */
128     /** 
129      * Delegated to the named or anonymous Pool.
130      */
131     public boolean isDaemon()
132     {
133         return _pool.getAttribute(__DAEMON)!=null;
134     }
135 
136     /* ------------------------------------------------------------ */
137     /** 
138      * Delegated to the named or anonymous Pool.
139      */
140     public void setDaemon(boolean daemon)
141     {
142         _pool.setAttribute(__DAEMON,daemon?"true":null);
143     }
144     
145     /* ------------------------------------------------------------ */
146     /** Is the pool running jobs.
147      * @return True if start() has been called.
148      */
149     public boolean isStarted()
150     {
151         return _started;
152     }
153     
154     /* ------------------------------------------------------------ */
155     /** Get the number of threads in the pool.
156      * Delegated to the named or anonymous Pool.
157      * @see #getIdleThreads
158      * @return Number of threads
159      */
160     public int getThreads()
161     {
162         return _pool.size();
163     }
164     
165     /* ------------------------------------------------------------ */
166     /** Get the number of idle threads in the pool.
167      * Delegated to the named or anonymous Pool.
168      * @see #getThreads
169      * @return Number of threads
170      */
171     public int getIdleThreads()
172     {
173         return _pool.available();
174     }
175     
176     /* ------------------------------------------------------------ */
177     /** Get the minimum number of threads.
178      * Delegated to the named or anonymous Pool.
179      * @see #setMinThreads
180      * @return minimum number of threads.
181      */
182     public int getMinThreads()
183     {
184         return _pool.getMinSize();
185     }
186     
187     /* ------------------------------------------------------------ */
188     /** Set the minimum number of threads.
189      * Delegated to the named or anonymous Pool.
190      * @see #getMinThreads
191      * @param minThreads minimum number of threads
192      */
193     public void setMinThreads(int minThreads)
194     {
195         _pool.setMinSize(minThreads);
196     }
197     
198     /* ------------------------------------------------------------ */
199     /** Set the maximum number of threads.
200      * Delegated to the named or anonymous Pool.
201      * @see #setMaxThreads
202      * @return maximum number of threads.
203      */
204     public int getMaxThreads()
205     {
206         return _pool.getMaxSize();
207     }
208     
209     /* ------------------------------------------------------------ */
210     /** Set the maximum number of threads.
211      * Delegated to the named or anonymous Pool.
212      * @see #getMaxThreads
213      * @param maxThreads maximum number of threads.
214      */
215     public void setMaxThreads(int maxThreads)
216     {
217         _pool.setMaxSize(maxThreads);
218     }
219     
220     /* ------------------------------------------------------------ */
221     /** Get the maximum thread idle time.
222      * Delegated to the named or anonymous Pool.
223      * @see #setMaxIdleTimeMs
224      * @return Max idle time in ms.
225      */
226     public int getMaxIdleTimeMs()
227     {
228         return _pool.getMaxIdleTimeMs();
229     }
230     
231     /* ------------------------------------------------------------ */
232     /** Set the maximum thread idle time.
233      * Threads that are idle for longer than this period may be
234      * stopped.
235      * Delegated to the named or anonymous Pool.
236      * @see #getMaxIdleTimeMs
237      * @param maxIdleTimeMs Max idle time in ms.
238      */
239     public void setMaxIdleTimeMs(int maxIdleTimeMs)
240     {
241         _pool.setMaxIdleTimeMs(maxIdleTimeMs);
242     }
243     
244     /* ------------------------------------------------------------ */
245     /** Get the priority of the pool threads.
246      *  @return the priority of the pool threads.
247      */ 
248     public int getThreadsPriority()
249     {
250         int priority = Thread.NORM_PRIORITY;
251 
252         Object o = _pool.getAttribute(__PRIORITY);
253         if (o != null)
254         {
255             priority = ((Integer) o).intValue();
256         }
257 
258         return priority;
259     }
260     
261     /* ------------------------------------------------------------ */
262     /** Set the priority of the pool threads.
263      *  @param priority the new thread priority.
264      */
265     public void setThreadsPriority(int priority)
266     {
267         _pool.setAttribute(__PRIORITY, new Integer(priority));
268     }
269 
270     /* ------------------------------------------------------------ */
271     /** Set Max Read Time.
272      * @deprecated maxIdleTime is used instead.
273      */
274     public void setMaxStopTimeMs(int ms)
275     {
276         log.warn("setMaxStopTimeMs is deprecated. No longer required.");
277     }
278     
279     /* ------------------------------------------------------------ */
280     /* Start the ThreadPool.
281      * Construct the minimum number of threads.
282      */
283     public void start()
284         throws Exception
285     {
286         _started=true;
287         _pool.start();
288     }
289 
290     /* ------------------------------------------------------------ */
291     /** Stop the ThreadPool.
292      * New jobs are no longer accepted,idle threads are interrupted
293      * and stopJob is called on active threads.
294      * The method then waits 
295      * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
296      * stop, at which time killJob is called.
297      */
298     public void stop()
299         throws InterruptedException
300     {
301         _started=false;
302         _pool.stop();
303         synchronized(_join)
304         {
305             _join.notifyAll();
306         }
307     }
308     
309     /* ------------------------------------------------------------ */
310     public void join()
311     {
312         while(isStarted() && _pool!=null)
313         {
314             synchronized(_join)
315             {
316                 try{if (isStarted() && _pool!=null)_join.wait(30000);}
317                 catch (Exception e)
318                 {
319                     LogSupport.ignore(log,e);
320                 }
321             }
322         }
323     }
324     
325     /* ------------------------------------------------------------ */
326     public void shrink()
327         throws InterruptedException
328     {
329         _pool.shrink();
330     }
331     
332 
333     /* ------------------------------------------------------------ */
334     /** Run job.
335      * Give a job to the pool. 
336      * @param job  If the job is derived from Runnable, the run method
337      * is called, otherwise it is passed as the argument to the handle
338      * method.
339      */
340     public void run(Object job)
341         throws InterruptedException
342     {
343         if (job==null)
344             return;
345         
346         try
347         {
348             PoolThread thread=(PoolThread)_pool.get(getMaxIdleTimeMs());
349             
350             if (thread!=null)
351                 thread.run(this,job);
352             else
353             {
354                 log.warn("No thread for "+job);
355                 stopJob(null,job);
356             }
357         }
358         catch (InterruptedException e) {throw e;}
359         catch (Exception e){log.warn(LogSupport.EXCEPTION,e);}
360     }
361     
362 
363     /* ------------------------------------------------------------ */
364     /** Handle a job.
365      * Called by the allocated thread to handle a job. If the job is a
366      * Runnable, it's run method is called. Otherwise this method needs to be
367      * specialized by a derived class to provide specific handling.
368      * @param job The job to execute.
369      * @exception InterruptedException 
370      */
371     protected void handle(Object job)
372         throws InterruptedException
373     {
374         if (job!=null && job instanceof Runnable)
375             ((Runnable)job).run();
376         else
377             log.warn("Invalid job: "+job);
378     }
379     
380     /* ------------------------------------------------------------ */
381     /** Stop a Job.
382      * This method is called by the Pool if a job needs to be stopped.
383      * The default implementation does nothing and should be extended by a
384      * derived thread pool class if special action is required.
385      * @param thread The thread allocated to the job, or null if no thread allocated.
386      * @param job The job object passed to run.
387      */
388     protected void stopJob(Thread thread, Object job)
389     {
390     }
391     
392     /* ------------------------------------------------------------ */
393     /** Pool Thread class.
394      * The PoolThread allows the threads job to be
395      * retrieved and active status to be indicated.
396      */
397     public static class PoolThread extends Thread implements Pool.PondLife
398     {
399         ThreadPool _threadPool;
400         Pool _pool;
401         Object _job;
402         int _id;
403         String _name;
404         
405         /* ------------------------------------------------------------ */
406         public void enterPool(Pool pool,int id)
407         {
408             _pool=pool;
409             _id=id;
410             _name=_pool.getPoolName()==null
411                 ?("PoolThread-"+id):(_pool.getPoolName()+"-"+id);
412             this.setName(_name);
413             this.setDaemon(pool.getAttribute(__DAEMON)!=null);
414 
415             Object o = pool.getAttribute(__PRIORITY);
416             if (o != null)
417             {
418                 this.setPriority(((Integer) o).intValue());
419             }
420 
421             this.start();
422             if(LogSupport.isTraceEnabled(log))if(LogSupport.isTraceEnabled(log))log.trace("enterPool "+this+" -> "+pool);
423         }
424 
425         /* ------------------------------------------------------------ */
426         public int getID()
427         {
428             return _id;
429         }
430         
431         /* ------------------------------------------------------------ */
432         public void poolClosing()
433         {
434             synchronized(this)
435             {
436                 _pool=null;
437                 if (_job==null)
438                     notify();
439                 else
440                     interrupt();
441             }
442         }
443         /* ------------------------------------------------------------ */
444         public void leavePool()
445         {
446             if(LogSupport.isTraceEnabled(log))log.trace("leavePool "+this+" <- "+_pool);
447             synchronized(this)
448             {
449                 _pool=null;
450                 if (_job==null || _threadPool==null)
451                     notify();
452                 else
453                 {
454                     _threadPool.stopJob(this,_job);
455                     _job=null;
456                 }
457             }
458         }
459         
460         
461         /* ------------------------------------------------------------ */
462         public void run(ThreadPool pool, Object job)
463         {
464             synchronized(this)
465             {
466                 _threadPool=pool;
467                 _job=job;
468                 notify();
469             }
470         }
471         
472         /* ------------------------------------------------------------ */
473         /** ThreadPool run.
474          * Loop getting jobs and handling them until idle or stopped.
475          */
476         public void run() 
477         {
478       Object job=null;
479             while (_pool!=null && _pool.isStarted())
480             {
481                 try
482                 { 
483                     synchronized(this)
484                     {
485                         // Wait for a job.
486                         if (job==null && _pool!=null && _pool.isStarted() && _job==null)
487                             wait(_pool.getMaxIdleTimeMs());
488 
489       if (_job!=null)
490       {
491           job=_job;
492           _job=null;
493       }
494                     }
495                     
496                     // handle
497                     if (job!=null)
498                         _threadPool.handle(job);
499                 }
500                 catch (InterruptedException e)
501                 {
502                     LogSupport.ignore(log,e);
503                 }
504                 finally
505                 {
506                     synchronized(this)
507                     {
508                         boolean got=job!=null;
509                         job=null;
510                         _threadPool=null;
511                         try
512                         {
513                             if (got&&_pool!=null)
514                                 _pool.put(this);
515                         }
516                         catch (InterruptedException e){LogSupport.ignore(log,e);}
517                     }
518                 }
519             }
520         }
521 
522         public String toString()
523         {
524             return _name;
525         }
526     }    
527 }
528