Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/mortbay/util/ThreadedServer.java


1   // ===========================================================================
2   // Copyright (c) 1996-2003 Mort Bay Consulting Pty. Ltd. All rights reserved.
3   // $Id: ThreadedServer.java,v 1.28 2003/10/05 23:46:29 gregwilkins Exp $
4   // ---------------------------------------------------------------------------
5   
6   package org.mortbay.util;
7   import java.io.IOException;
8   import java.io.InputStream;
9   import java.io.InterruptedIOException;
10  import java.io.OutputStream;
11  import java.net.InetAddress;
12  import java.net.ServerSocket;
13  import java.net.Socket;
14  import java.net.UnknownHostException;
15  
16  import org.apache.commons.logging.Log;
17  import org.apache.commons.logging.LogFactory;
18  
19  
20  /* ======================================================================= */
21  /** Threaded socket server.
22   * This class listens at a socket and gives the connections received
23   * to a pool of Threads
24   * <P>
25   * The class is abstract and derived classes must provide the handling
26   * for the connections.
27   * <P>
28   * The properties THREADED_SERVER_MIN_THREADS and THREADED_SERVER_MAX_THREADS
29   * can be set to control the number of threads created.
30   * <P>
31   * @version $Id: ThreadedServer.java,v 1.28 2003/10/05 23:46:29 gregwilkins Exp $
32   * @author Greg Wilkins
33   */
34  abstract public class ThreadedServer extends ThreadPool
35  {
36      private static Log log = LogFactory.getLog(ThreadedServer.class);
37      
38      /* ------------------------------------------------------------------- */
39      private InetAddrPort _address = null;  
40      private int _soTimeOut=-1;
41      private int _lingerTimeSecs=30;
42      private boolean _tcpNoDelay=true;
43      
44      private transient Acceptor _acceptor=null;  
45      private transient ServerSocket _listen = null;
46      private transient boolean _running=false;
47      
48      /* ------------------------------------------------------------------- */
49      /* Construct
50       */
51      public ThreadedServer() 
52      {}
53  
54      /* ------------------------------------------------------------ */
55      /** 
56       * @return The ServerSocket
57       */
58      public ServerSocket getServerSocket()
59      {
60          return _listen;
61      }
62      
63      /* ------------------------------------------------------------------- */
64      /** Construct for specific port.
65       */
66      public ThreadedServer(int port)
67      {
68          setInetAddrPort(new InetAddrPort(port));
69      }
70      
71      /* ------------------------------------------------------------------- */
72      /** Construct for specific address and port.
73       */
74      public ThreadedServer(InetAddress address, int port) 
75      {
76          setInetAddrPort(new InetAddrPort(address,port));
77      }
78      
79      /* ------------------------------------------------------------------- */
80      /** Construct for specific address and port.
81       */
82      public ThreadedServer(String host, int port) 
83          throws UnknownHostException
84      {
85          setInetAddrPort(new InetAddrPort(host,port));
86      }
87      
88      /* ------------------------------------------------------------------- */
89      /** Construct for specific address and port.
90       */
91      public ThreadedServer(InetAddrPort address) 
92      {
93          setInetAddrPort(address);
94      }    
95      
96      /* ------------------------------------------------------------ */
97      /** Set the server InetAddress and port.
98       * @param address The Address to listen on, or 0.0.0.0:port for
99       * all interfaces.
100      */
101     public synchronized void setInetAddrPort(InetAddrPort address) 
102     {
103         if (_address!=null && _address.equals(address))
104             return;
105 
106         if (isStarted())
107             log.warn(this+" is started");
108         
109         _address=address;
110     }
111 
112     /* ------------------------------------------------------------ */
113     /** 
114      * @return IP Address and port in a new Instance of InetAddrPort.
115      */
116     public InetAddrPort getInetAddrPort()
117     {
118         if (_address==null)
119             return null;
120         return new InetAddrPort(_address);
121     }
122     
123     /* ------------------------------------------------------------ */
124     /** 
125      * @param host 
126      */
127     public synchronized void setHost(String host)
128         throws UnknownHostException
129     {
130         if (_address!=null && _address.getHost()!=null && _address.getHost().equals(host))
131             return;
132 
133         if (isStarted())
134             log.warn(this+" is started");
135 
136         if (_address==null)
137             _address=new InetAddrPort(host,0);
138         else
139             _address.setHost(host);
140     }
141     
142     /* ------------------------------------------------------------ */
143     /** 
144      * @return Host name
145      */
146     public String getHost()
147     {
148         if (_address==null || _address.getInetAddress()==null)
149             return null;
150         return _address.getHost();
151     }
152 
153     
154     /* ------------------------------------------------------------ */
155     /** 
156      * @param addr 
157      */
158     public synchronized void setInetAddress(InetAddress addr)
159     {
160         if (_address!=null &&
161             _address.getInetAddress()!=null &&
162             _address.getInetAddress().equals(addr))
163             return;
164 
165         if (isStarted())
166             log.warn(this+" is started");
167 
168         if (_address==null)
169             _address=new InetAddrPort(addr,0);
170         else
171             _address.setInetAddress(addr);
172     }
173     
174     /* ------------------------------------------------------------ */
175     /** 
176      * @return IP Address
177      */
178     public InetAddress getInetAddress()
179     {
180         if (_address==null)
181             return null;
182         return _address.getInetAddress();
183     }
184     
185     /* ------------------------------------------------------------ */
186     /** 
187      * @param port 
188      */
189     public synchronized void setPort(int port)
190     {
191         if (_address!=null && _address.getPort()==port)
192             return;
193         
194         if (isStarted())
195             log.warn(this+" is started");
196         
197         if (_address==null)
198             _address=new InetAddrPort(port);
199         else
200             _address.setPort(port);
201     }
202 
203     /* ------------------------------------------------------------ */
204     /** 
205      * @return port number
206      */
207     public int getPort()
208     {
209         if (_address==null)
210             return 0;
211         return _address.getPort();
212     }
213     
214     /* ------------------------------------------------------------ */
215     /** Set Max Read Time.
216      * @deprecated maxIdleTime is used instead.
217      */
218     public void setMaxReadTimeMs(int ms)
219     {
220         log.warn("setMaxReadTimeMs is deprecated. Use setMaxIdleTimeMs()");
221     }
222     
223     /* ------------------------------------------------------------ */
224     /** 
225      * @return milliseconds
226      */
227     public int getMaxReadTimeMs()
228     {
229         return getMaxIdleTimeMs();
230     }
231     
232     /* ------------------------------------------------------------ */
233     /** 
234      * @param ls seconds to linger or -1 to disable linger.
235      */
236     public void setLingerTimeSecs(int ls)
237     {
238         _lingerTimeSecs=ls;
239     }
240     
241     /* ------------------------------------------------------------ */
242     /** 
243      * @return seconds.
244      */
245     public int getLingerTimeSecs()
246     {
247         return _lingerTimeSecs;
248     }
249     
250     
251     /* ------------------------------------------------------------ */
252     /** 
253      * @param tcpNoDelay if true then setTcpNoDelay(true) is called on accepted sockets.
254      */
255     public void setTcpNoDelay(boolean tcpNoDelay)
256     {
257         _tcpNoDelay=tcpNoDelay;
258     }
259     
260     /* ------------------------------------------------------------ */
261     /** 
262      * @return true if setTcpNoDelay(true) is called on accepted sockets.
263      */
264     public boolean getTcpNoDelay()
265     {
266         return _tcpNoDelay;
267     }
268     
269     
270     /* ------------------------------------------------------------------- */
271     /** Handle new connection.
272      * This method should be overridden by the derived class to implement
273      * the required handling.  It is called by a thread created for it and
274      * does not need to return until it has finished it's task
275      */
276     protected void handleConnection(InputStream in,OutputStream out)
277     {
278         throw new Error("Either handlerConnection must be overridden");
279     }
280 
281     /* ------------------------------------------------------------------- */
282     /** Handle new connection.
283      * If access is required to the actual socket, override this method
284      * instead of handleConnection(InputStream in,OutputStream out).
285      * The default implementation of this just calls
286      * handleConnection(InputStream in,OutputStream out).
287      */
288     protected void handleConnection(Socket connection)
289         throws IOException
290     {
291         if(log.isDebugEnabled())log.debug("Handle "+connection);
292         InputStream in  = connection.getInputStream();
293         OutputStream out = connection.getOutputStream();
294         
295         handleConnection(in,out);
296         out.flush();
297         
298         in=null;
299         out=null;
300         connection.close();
301     }
302     
303     /* ------------------------------------------------------------ */
304     /** Handle Job.
305      * Implementation of ThreadPool.handle(), calls handleConnection.
306      * @param job A Connection.
307      */
308     public void handle(Object job)
309     {
310         Socket socket =(Socket)job;
311         try
312         {
313             if (_tcpNoDelay)
314                 socket.setTcpNoDelay(true);
315             handleConnection(socket);
316         }
317         catch(Exception e){log.warn("Connection problem",e);}
318         finally
319         {
320             try {socket.close();}
321             catch(Exception e){log.warn("Connection problem",e);}
322         }
323     }
324     
325     
326     
327     /* ------------------------------------------------------------ */
328     /** New server socket.
329      * Creates a new servers socket. May be overriden by derived class
330      * to create specialist serversockets (eg SSL).
331      * @param address Address and port
332      * @param acceptQueueSize Accept queue size
333      * @return The new ServerSocket
334      * @exception java.io.IOException 
335      */
336     protected ServerSocket newServerSocket(InetAddrPort address,
337                                            int acceptQueueSize)
338          throws java.io.IOException
339     {
340         if (address==null)
341             return new ServerSocket(0,acceptQueueSize);
342 
343         return new ServerSocket(address.getPort(),
344                                 acceptQueueSize,
345                                 address.getInetAddress());
346     }
347     
348     /* ------------------------------------------------------------ */
349     /** Accept socket connection.
350      * May be overriden by derived class
351      * to create specialist serversockets (eg SSL).
352      * @param serverSocket
353      * @param timeout The time to wait for a connection. Normally
354      *                 passed the ThreadPool maxIdleTime.
355      * @return Accepted Socket
356      */
357     protected Socket acceptSocket(ServerSocket serverSocket,
358                                   int timeout)
359     {
360         try
361         {
362             Socket s=null;
363             
364             if (_soTimeOut!=timeout)
365             {
366                 _soTimeOut=timeout;
367                 _listen.setSoTimeout(_soTimeOut);
368             }
369 
370             if (_listen!=null)
371             {
372                 s=_listen.accept();
373                 
374                 try {
375                     if (getMaxIdleTimeMs()>=0)
376                         s.setSoTimeout(getMaxIdleTimeMs());
377                     if (_lingerTimeSecs>=0)
378                         s.setSoLinger(true,_lingerTimeSecs);
379                     else
380                         s.setSoLinger(false,0);
381                 }
382                 catch(Exception e){LogSupport.ignore(log,e);}
383             }
384             return s;
385         }
386         catch(java.net.SocketException e)
387         {
388             // TODO - this is caught and ignored due strange
389             // exception from linux java1.2.v1a
390             LogSupport.ignore(log,e);
391         }
392         catch(InterruptedIOException e)
393         {
394             LogSupport.ignore(log,e);
395         }
396         catch(IOException e)
397         {
398             log.warn(LogSupport.EXCEPTION,e);
399         }
400         return null;
401     }
402 
403     /* ------------------------------------------------------------------- */
404     /** Open the server socket.
405      * This method can be called to open the server socket in advance of starting the
406      * listener. This can be used to test if the port is available.
407      *
408      * @exception IOException if an error occurs
409      */
410     public void open()
411         throws IOException
412     {
413         if (_listen==null)
414         {
415             _listen=newServerSocket(_address,
416                                     (getMaxThreads()>0?(getMaxThreads()+1):50));
417             if (_address==null)
418                 _address=new InetAddrPort(_listen.getInetAddress(),_listen.getLocalPort());
419             else
420             {
421                 if(_address.getInetAddress()==null)
422                     _address.setInetAddress(_listen.getInetAddress());
423                 if(_address.getPort()==0)
424                     _address.setPort(_listen.getLocalPort());
425             }
426             
427             _soTimeOut=getMaxIdleTimeMs();
428             if (_soTimeOut>=0)
429                 _listen.setSoTimeout(_soTimeOut);
430         }
431     }
432     
433     /* ------------------------------------------------------------------- */
434     /* Start the ThreadedServer listening
435      */
436     synchronized public void start()
437         throws Exception
438     {
439         try
440         {
441             if (isStarted())
442                 return;
443 
444             open();
445             
446             _running=true;
447             _acceptor=new Acceptor();
448             _acceptor.setDaemon(isDaemon());
449             _acceptor.start();
450             
451             super.start();
452         }
453         catch(Exception e)
454         {
455             log.warn("Failed to start: "+this);
456             throw e;
457         }
458     }
459     
460 
461     /* --------------------------------------------------------------- */
462     public void stop()
463         throws InterruptedException
464     {
465         synchronized(this)
466         {
467             // Signal that we are stopping
468             _running=false;
469             
470             // Close the listener socket.
471             if(log.isDebugEnabled())log.debug("closing "+_listen);
472             try {if (_listen!=null)_listen.close();}catch(IOException e){log.warn(LogSupport.EXCEPTION,e);}
473             
474             // Do we have an acceptor thread (running or not)
475             Thread.yield();
476             if (_acceptor!=null)
477             {
478                 // Tell the acceptor to exit and wake it up
479                 _acceptor.interrupt();
480                 wait(getMaxIdleTimeMs());
481                 
482                 // Do we still have an acceptor thread? It is playing hard to stop!
483                 // Try forcing the stop to be noticed by making a connection to self.
484                 if (_acceptor!=null)
485                 {
486                     _acceptor.forceStop();     
487                     // Assume that worked and go on as if it did.
488                     _acceptor=null;
489                 }
490             }
491         }
492 
493         // Stop the thread pool
494         try{super.stop();}catch(Exception e){log.warn(LogSupport.EXCEPTION,e);}
495 
496         // Clean up
497         _listen=null;
498         _acceptor=null;
499     }
500     
501     /* ------------------------------------------------------------ */
502     /* ------------------------------------------------------------ */
503     /** Kill a job.
504      * This method closes IDLE and socket associated with a job
505      * @param thread 
506      * @param job 
507      */
508     protected void stopJob(Thread thread,Object job)
509     {
510          if (job instanceof Socket)
511          {
512              try{((Socket)job).close();}
513              catch(Exception e){LogSupport.ignore(log,e);}
514          }
515          super.stopJob(thread,job);
516     }
517 
518     
519     /* ------------------------------------------------------------ */
520     public String toString()
521     {
522         if (_address==null)    
523             return getName()+"@0.0.0.0:0";
524         if (_listen!=null)
525             return getName()+
526                 "@"+_listen.getInetAddress().getHostAddress()+
527                 ":"+_listen.getLocalPort();
528         return getName()+"@"+getInetAddrPort();
529     }
530 
531     /* ------------------------------------------------------------ */
532     /* ------------------------------------------------------------ */
533     /* ------------------------------------------------------------ */
534     private class Acceptor extends Thread
535     {
536         /* ------------------------------------------------------------ */
537         public void run()
538         {
539             ThreadedServer threadedServer = ThreadedServer.this;
540             try
541             {
542                 this.setName("Acceptor "+_listen);
543                 while(_running)
544                 {
545                     try
546                     {
547                         // Accept a socket
548                         Socket socket=acceptSocket(_listen,_soTimeOut);
549 
550                         // Handle the socket
551                         if (_running)
552                         {
553                             if (socket==null)
554                                 threadedServer.shrink();
555                             else
556                                 threadedServer.run(socket);
557                         }
558                         else if (socket!=null)
559                             socket.close();
560                     }
561                     catch(InterruptedException e)
562                     {}
563                     catch(Exception e)
564                     {
565                         if (_running)
566                             log.warn(LogSupport.EXCEPTION,e);
567                         else
568                             log.debug(LogSupport.EXCEPTION,e);
569                     }
570                     catch(Error e)
571                     {
572                         log.warn(LogSupport.EXCEPTION,e);
573                         break;
574                     }  
575                 }
576             }
577             finally
578             {
579                 if (_running)
580                     log.warn("Stopping "+this.getName());
581                 else
582                     log.info("Stopping "+this.getName());
583                 synchronized(threadedServer)
584                 {
585                     _acceptor=null;
586                     threadedServer.notifyAll();
587                 }
588             }
589         }
590 
591         /* ------------------------------------------------------------ */
592         void forceStop()
593         {
594             if(_listen!=null && _address!=null)
595             {
596     InetAddress addr=_address.getInetAddress();
597                 try{
598                     if (addr==null || addr.toString().startsWith("0.0.0.0"))
599                         addr=InetAddress.getByName("127.0.0.1");
600                     if(log.isDebugEnabled())log.debug("Self connect to close listener "+addr+
601                                ":"+_address.getPort());
602                     Socket socket = new
603                         Socket(addr,_address.getPort());
604                     Thread.yield();
605                     socket.close();
606                     Thread.yield();
607                 }
608                 catch(IOException e)
609                 {
610                     if(log.isDebugEnabled())log.debug("problem stopping acceptor "+addr+": ",e);
611                 }
612             }
613         }
614     }
615 }