Source code: org/mortbay/util/ThreadedServer.java
1 // ===========================================================================
2 // Copyright (c) 1996-2003 Mort Bay Consulting Pty. Ltd. All rights reserved.
3 // $Id: ThreadedServer.java,v 1.28 2003/10/05 23:46:29 gregwilkins Exp $
4 // ---------------------------------------------------------------------------
5
6 package org.mortbay.util;
7 import java.io.IOException;
8 import java.io.InputStream;
9 import java.io.InterruptedIOException;
10 import java.io.OutputStream;
11 import java.net.InetAddress;
12 import java.net.ServerSocket;
13 import java.net.Socket;
14 import java.net.UnknownHostException;
15
16 import org.apache.commons.logging.Log;
17 import org.apache.commons.logging.LogFactory;
18
19
20 /* ======================================================================= */
21 /** Threaded socket server.
22 * This class listens at a socket and gives the connections received
23 * to a pool of Threads
24 * <P>
25 * The class is abstract and derived classes must provide the handling
26 * for the connections.
27 * <P>
28 * The properties THREADED_SERVER_MIN_THREADS and THREADED_SERVER_MAX_THREADS
29 * can be set to control the number of threads created.
30 * <P>
31 * @version $Id: ThreadedServer.java,v 1.28 2003/10/05 23:46:29 gregwilkins Exp $
32 * @author Greg Wilkins
33 */
34 abstract public class ThreadedServer extends ThreadPool
35 {
36 private static Log log = LogFactory.getLog(ThreadedServer.class);
37
38 /* ------------------------------------------------------------------- */
39 private InetAddrPort _address = null;
40 private int _soTimeOut=-1;
41 private int _lingerTimeSecs=30;
42 private boolean _tcpNoDelay=true;
43
44 private transient Acceptor _acceptor=null;
45 private transient ServerSocket _listen = null;
46 private transient boolean _running=false;
47
48 /* ------------------------------------------------------------------- */
49 /* Construct
50 */
51 public ThreadedServer()
52 {}
53
54 /* ------------------------------------------------------------ */
55 /**
56 * @return The ServerSocket
57 */
58 public ServerSocket getServerSocket()
59 {
60 return _listen;
61 }
62
63 /* ------------------------------------------------------------------- */
64 /** Construct for specific port.
65 */
66 public ThreadedServer(int port)
67 {
68 setInetAddrPort(new InetAddrPort(port));
69 }
70
71 /* ------------------------------------------------------------------- */
72 /** Construct for specific address and port.
73 */
74 public ThreadedServer(InetAddress address, int port)
75 {
76 setInetAddrPort(new InetAddrPort(address,port));
77 }
78
79 /* ------------------------------------------------------------------- */
80 /** Construct for specific address and port.
81 */
82 public ThreadedServer(String host, int port)
83 throws UnknownHostException
84 {
85 setInetAddrPort(new InetAddrPort(host,port));
86 }
87
88 /* ------------------------------------------------------------------- */
89 /** Construct for specific address and port.
90 */
91 public ThreadedServer(InetAddrPort address)
92 {
93 setInetAddrPort(address);
94 }
95
96 /* ------------------------------------------------------------ */
97 /** Set the server InetAddress and port.
98 * @param address The Address to listen on, or 0.0.0.0:port for
99 * all interfaces.
100 */
101 public synchronized void setInetAddrPort(InetAddrPort address)
102 {
103 if (_address!=null && _address.equals(address))
104 return;
105
106 if (isStarted())
107 log.warn(this+" is started");
108
109 _address=address;
110 }
111
112 /* ------------------------------------------------------------ */
113 /**
114 * @return IP Address and port in a new Instance of InetAddrPort.
115 */
116 public InetAddrPort getInetAddrPort()
117 {
118 if (_address==null)
119 return null;
120 return new InetAddrPort(_address);
121 }
122
123 /* ------------------------------------------------------------ */
124 /**
125 * @param host
126 */
127 public synchronized void setHost(String host)
128 throws UnknownHostException
129 {
130 if (_address!=null && _address.getHost()!=null && _address.getHost().equals(host))
131 return;
132
133 if (isStarted())
134 log.warn(this+" is started");
135
136 if (_address==null)
137 _address=new InetAddrPort(host,0);
138 else
139 _address.setHost(host);
140 }
141
142 /* ------------------------------------------------------------ */
143 /**
144 * @return Host name
145 */
146 public String getHost()
147 {
148 if (_address==null || _address.getInetAddress()==null)
149 return null;
150 return _address.getHost();
151 }
152
153
154 /* ------------------------------------------------------------ */
155 /**
156 * @param addr
157 */
158 public synchronized void setInetAddress(InetAddress addr)
159 {
160 if (_address!=null &&
161 _address.getInetAddress()!=null &&
162 _address.getInetAddress().equals(addr))
163 return;
164
165 if (isStarted())
166 log.warn(this+" is started");
167
168 if (_address==null)
169 _address=new InetAddrPort(addr,0);
170 else
171 _address.setInetAddress(addr);
172 }
173
174 /* ------------------------------------------------------------ */
175 /**
176 * @return IP Address
177 */
178 public InetAddress getInetAddress()
179 {
180 if (_address==null)
181 return null;
182 return _address.getInetAddress();
183 }
184
185 /* ------------------------------------------------------------ */
186 /**
187 * @param port
188 */
189 public synchronized void setPort(int port)
190 {
191 if (_address!=null && _address.getPort()==port)
192 return;
193
194 if (isStarted())
195 log.warn(this+" is started");
196
197 if (_address==null)
198 _address=new InetAddrPort(port);
199 else
200 _address.setPort(port);
201 }
202
203 /* ------------------------------------------------------------ */
204 /**
205 * @return port number
206 */
207 public int getPort()
208 {
209 if (_address==null)
210 return 0;
211 return _address.getPort();
212 }
213
214 /* ------------------------------------------------------------ */
215 /** Set Max Read Time.
216 * @deprecated maxIdleTime is used instead.
217 */
218 public void setMaxReadTimeMs(int ms)
219 {
220 log.warn("setMaxReadTimeMs is deprecated. Use setMaxIdleTimeMs()");
221 }
222
223 /* ------------------------------------------------------------ */
224 /**
225 * @return milliseconds
226 */
227 public int getMaxReadTimeMs()
228 {
229 return getMaxIdleTimeMs();
230 }
231
232 /* ------------------------------------------------------------ */
233 /**
234 * @param ls seconds to linger or -1 to disable linger.
235 */
236 public void setLingerTimeSecs(int ls)
237 {
238 _lingerTimeSecs=ls;
239 }
240
241 /* ------------------------------------------------------------ */
242 /**
243 * @return seconds.
244 */
245 public int getLingerTimeSecs()
246 {
247 return _lingerTimeSecs;
248 }
249
250
251 /* ------------------------------------------------------------ */
252 /**
253 * @param tcpNoDelay if true then setTcpNoDelay(true) is called on accepted sockets.
254 */
255 public void setTcpNoDelay(boolean tcpNoDelay)
256 {
257 _tcpNoDelay=tcpNoDelay;
258 }
259
260 /* ------------------------------------------------------------ */
261 /**
262 * @return true if setTcpNoDelay(true) is called on accepted sockets.
263 */
264 public boolean getTcpNoDelay()
265 {
266 return _tcpNoDelay;
267 }
268
269
270 /* ------------------------------------------------------------------- */
271 /** Handle new connection.
272 * This method should be overridden by the derived class to implement
273 * the required handling. It is called by a thread created for it and
274 * does not need to return until it has finished it's task
275 */
276 protected void handleConnection(InputStream in,OutputStream out)
277 {
278 throw new Error("Either handlerConnection must be overridden");
279 }
280
281 /* ------------------------------------------------------------------- */
282 /** Handle new connection.
283 * If access is required to the actual socket, override this method
284 * instead of handleConnection(InputStream in,OutputStream out).
285 * The default implementation of this just calls
286 * handleConnection(InputStream in,OutputStream out).
287 */
288 protected void handleConnection(Socket connection)
289 throws IOException
290 {
291 if(log.isDebugEnabled())log.debug("Handle "+connection);
292 InputStream in = connection.getInputStream();
293 OutputStream out = connection.getOutputStream();
294
295 handleConnection(in,out);
296 out.flush();
297
298 in=null;
299 out=null;
300 connection.close();
301 }
302
303 /* ------------------------------------------------------------ */
304 /** Handle Job.
305 * Implementation of ThreadPool.handle(), calls handleConnection.
306 * @param job A Connection.
307 */
308 public void handle(Object job)
309 {
310 Socket socket =(Socket)job;
311 try
312 {
313 if (_tcpNoDelay)
314 socket.setTcpNoDelay(true);
315 handleConnection(socket);
316 }
317 catch(Exception e){log.warn("Connection problem",e);}
318 finally
319 {
320 try {socket.close();}
321 catch(Exception e){log.warn("Connection problem",e);}
322 }
323 }
324
325
326
327 /* ------------------------------------------------------------ */
328 /** New server socket.
329 * Creates a new servers socket. May be overriden by derived class
330 * to create specialist serversockets (eg SSL).
331 * @param address Address and port
332 * @param acceptQueueSize Accept queue size
333 * @return The new ServerSocket
334 * @exception java.io.IOException
335 */
336 protected ServerSocket newServerSocket(InetAddrPort address,
337 int acceptQueueSize)
338 throws java.io.IOException
339 {
340 if (address==null)
341 return new ServerSocket(0,acceptQueueSize);
342
343 return new ServerSocket(address.getPort(),
344 acceptQueueSize,
345 address.getInetAddress());
346 }
347
348 /* ------------------------------------------------------------ */
349 /** Accept socket connection.
350 * May be overriden by derived class
351 * to create specialist serversockets (eg SSL).
352 * @param serverSocket
353 * @param timeout The time to wait for a connection. Normally
354 * passed the ThreadPool maxIdleTime.
355 * @return Accepted Socket
356 */
357 protected Socket acceptSocket(ServerSocket serverSocket,
358 int timeout)
359 {
360 try
361 {
362 Socket s=null;
363
364 if (_soTimeOut!=timeout)
365 {
366 _soTimeOut=timeout;
367 _listen.setSoTimeout(_soTimeOut);
368 }
369
370 if (_listen!=null)
371 {
372 s=_listen.accept();
373
374 try {
375 if (getMaxIdleTimeMs()>=0)
376 s.setSoTimeout(getMaxIdleTimeMs());
377 if (_lingerTimeSecs>=0)
378 s.setSoLinger(true,_lingerTimeSecs);
379 else
380 s.setSoLinger(false,0);
381 }
382 catch(Exception e){LogSupport.ignore(log,e);}
383 }
384 return s;
385 }
386 catch(java.net.SocketException e)
387 {
388 // TODO - this is caught and ignored due strange
389 // exception from linux java1.2.v1a
390 LogSupport.ignore(log,e);
391 }
392 catch(InterruptedIOException e)
393 {
394 LogSupport.ignore(log,e);
395 }
396 catch(IOException e)
397 {
398 log.warn(LogSupport.EXCEPTION,e);
399 }
400 return null;
401 }
402
403 /* ------------------------------------------------------------------- */
404 /** Open the server socket.
405 * This method can be called to open the server socket in advance of starting the
406 * listener. This can be used to test if the port is available.
407 *
408 * @exception IOException if an error occurs
409 */
410 public void open()
411 throws IOException
412 {
413 if (_listen==null)
414 {
415 _listen=newServerSocket(_address,
416 (getMaxThreads()>0?(getMaxThreads()+1):50));
417 if (_address==null)
418 _address=new InetAddrPort(_listen.getInetAddress(),_listen.getLocalPort());
419 else
420 {
421 if(_address.getInetAddress()==null)
422 _address.setInetAddress(_listen.getInetAddress());
423 if(_address.getPort()==0)
424 _address.setPort(_listen.getLocalPort());
425 }
426
427 _soTimeOut=getMaxIdleTimeMs();
428 if (_soTimeOut>=0)
429 _listen.setSoTimeout(_soTimeOut);
430 }
431 }
432
433 /* ------------------------------------------------------------------- */
434 /* Start the ThreadedServer listening
435 */
436 synchronized public void start()
437 throws Exception
438 {
439 try
440 {
441 if (isStarted())
442 return;
443
444 open();
445
446 _running=true;
447 _acceptor=new Acceptor();
448 _acceptor.setDaemon(isDaemon());
449 _acceptor.start();
450
451 super.start();
452 }
453 catch(Exception e)
454 {
455 log.warn("Failed to start: "+this);
456 throw e;
457 }
458 }
459
460
461 /* --------------------------------------------------------------- */
462 public void stop()
463 throws InterruptedException
464 {
465 synchronized(this)
466 {
467 // Signal that we are stopping
468 _running=false;
469
470 // Close the listener socket.
471 if(log.isDebugEnabled())log.debug("closing "+_listen);
472 try {if (_listen!=null)_listen.close();}catch(IOException e){log.warn(LogSupport.EXCEPTION,e);}
473
474 // Do we have an acceptor thread (running or not)
475 Thread.yield();
476 if (_acceptor!=null)
477 {
478 // Tell the acceptor to exit and wake it up
479 _acceptor.interrupt();
480 wait(getMaxIdleTimeMs());
481
482 // Do we still have an acceptor thread? It is playing hard to stop!
483 // Try forcing the stop to be noticed by making a connection to self.
484 if (_acceptor!=null)
485 {
486 _acceptor.forceStop();
487 // Assume that worked and go on as if it did.
488 _acceptor=null;
489 }
490 }
491 }
492
493 // Stop the thread pool
494 try{super.stop();}catch(Exception e){log.warn(LogSupport.EXCEPTION,e);}
495
496 // Clean up
497 _listen=null;
498 _acceptor=null;
499 }
500
501 /* ------------------------------------------------------------ */
502 /* ------------------------------------------------------------ */
503 /** Kill a job.
504 * This method closes IDLE and socket associated with a job
505 * @param thread
506 * @param job
507 */
508 protected void stopJob(Thread thread,Object job)
509 {
510 if (job instanceof Socket)
511 {
512 try{((Socket)job).close();}
513 catch(Exception e){LogSupport.ignore(log,e);}
514 }
515 super.stopJob(thread,job);
516 }
517
518
519 /* ------------------------------------------------------------ */
520 public String toString()
521 {
522 if (_address==null)
523 return getName()+"@0.0.0.0:0";
524 if (_listen!=null)
525 return getName()+
526 "@"+_listen.getInetAddress().getHostAddress()+
527 ":"+_listen.getLocalPort();
528 return getName()+"@"+getInetAddrPort();
529 }
530
531 /* ------------------------------------------------------------ */
532 /* ------------------------------------------------------------ */
533 /* ------------------------------------------------------------ */
534 private class Acceptor extends Thread
535 {
536 /* ------------------------------------------------------------ */
537 public void run()
538 {
539 ThreadedServer threadedServer = ThreadedServer.this;
540 try
541 {
542 this.setName("Acceptor "+_listen);
543 while(_running)
544 {
545 try
546 {
547 // Accept a socket
548 Socket socket=acceptSocket(_listen,_soTimeOut);
549
550 // Handle the socket
551 if (_running)
552 {
553 if (socket==null)
554 threadedServer.shrink();
555 else
556 threadedServer.run(socket);
557 }
558 else if (socket!=null)
559 socket.close();
560 }
561 catch(InterruptedException e)
562 {}
563 catch(Exception e)
564 {
565 if (_running)
566 log.warn(LogSupport.EXCEPTION,e);
567 else
568 log.debug(LogSupport.EXCEPTION,e);
569 }
570 catch(Error e)
571 {
572 log.warn(LogSupport.EXCEPTION,e);
573 break;
574 }
575 }
576 }
577 finally
578 {
579 if (_running)
580 log.warn("Stopping "+this.getName());
581 else
582 log.info("Stopping "+this.getName());
583 synchronized(threadedServer)
584 {
585 _acceptor=null;
586 threadedServer.notifyAll();
587 }
588 }
589 }
590
591 /* ------------------------------------------------------------ */
592 void forceStop()
593 {
594 if(_listen!=null && _address!=null)
595 {
596 InetAddress addr=_address.getInetAddress();
597 try{
598 if (addr==null || addr.toString().startsWith("0.0.0.0"))
599 addr=InetAddress.getByName("127.0.0.1");
600 if(log.isDebugEnabled())log.debug("Self connect to close listener "+addr+
601 ":"+_address.getPort());
602 Socket socket = new
603 Socket(addr,_address.getPort());
604 Thread.yield();
605 socket.close();
606 Thread.yield();
607 }
608 catch(IOException e)
609 {
610 if(log.isDebugEnabled())log.debug("problem stopping acceptor "+addr+": ",e);
611 }
612 }
613 }
614 }
615 }