1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.tomcat.util.net;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.BindException;
23 import java.net.InetAddress;
24 import java.net.ServerSocket;
25 import java.net.Socket;
26 import java.net.SocketException;
27 import java.security.AccessControlException;
28 import java.util.Stack;
29 import java.util.Vector;
30
31 import org.apache.juli.logging.Log;
32 import org.apache.juli.logging.LogFactory;
33 import org.apache.tomcat.util.res.StringManager;
34 import org.apache.tomcat.util.threads.ThreadPool;
35 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
36
37 /* Similar with MPM module in Apache2.0. Handles all the details related with
38 "tcp server" functionality - thread management, accept policy, etc.
39 It should do nothing more - as soon as it get a socket ( and all socket options
40 are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin)
41 */
42
43
44
45 /**
46 * Handle incoming TCP connections.
47 *
48 * This class implement a simple server model: one listener thread accepts on a socket and
49 * creates a new worker thread for each incoming connection.
50 *
51 * More advanced Endpoints will reuse the threads, use queues, etc.
52 *
53 * @author James Duncan Davidson [duncan@eng.sun.com]
54 * @author Jason Hunter [jch@eng.sun.com]
55 * @author James Todd [gonzo@eng.sun.com]
56 * @author Costin@eng.sun.com
57 * @author Gal Shachor [shachor@il.ibm.com]
58 * @author Yoav Shapira <yoavs@apache.org>
59 */
60 public class PoolTcpEndpoint implements Runnable { // implements Endpoint {
61
62 static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
63
64 private StringManager sm =
65 StringManager.getManager("org.apache.tomcat.util.net.res");
66
67 private static final int BACKLOG = 100;
68 private static final int TIMEOUT = 1000;
69
70 private final Object threadSync = new Object();
71
72 private int backlog = BACKLOG;
73 private int serverTimeout = TIMEOUT;
74
75 private InetAddress inet;
76 private int port;
77
78 private ServerSocketFactory factory;
79 private ServerSocket serverSocket;
80
81 private volatile boolean running = false;
82 private volatile boolean paused = false;
83 private boolean initialized = false;
84 private boolean reinitializing = false;
85 static final int debug=0;
86
87 protected boolean tcpNoDelay=false;
88 protected int linger=100;
89 protected int socketTimeout=-1;
90 private boolean lf = true;
91
92
93 // ------ Leader follower fields
94
95
96 TcpConnectionHandler handler;
97 ThreadPoolRunnable listener;
98 ThreadPool tp;
99
100
101 // ------ Master slave fields
102
103 /* The background thread. */
104 private Thread thread = null;
105 /* Available processors. */
106 private Stack workerThreads = new Stack();
107 private int curThreads = 0;
108 private int maxThreads = 20;
109 /* All processors which have been created. */
110 private Vector created = new Vector();
111
112
113 public PoolTcpEndpoint() {
114 tp = new ThreadPool();
115 }
116
117 public PoolTcpEndpoint( ThreadPool tp ) {
118 this.tp=tp;
119 }
120
121 // -------------------- Configuration --------------------
122
123 public void setMaxThreads(int maxThreads) {
124 if( maxThreads > 0)
125 tp.setMaxThreads(maxThreads);
126 }
127
128 public int getMaxThreads() {
129 return tp.getMaxThreads();
130 }
131
132 public void setMaxSpareThreads(int maxThreads) {
133 if(maxThreads > 0)
134 tp.setMaxSpareThreads(maxThreads);
135 }
136
137 public int getMaxSpareThreads() {
138 return tp.getMaxSpareThreads();
139 }
140
141 public void setMinSpareThreads(int minThreads) {
142 if(minThreads > 0)
143 tp.setMinSpareThreads(minThreads);
144 }
145
146 public int getMinSpareThreads() {
147 return tp.getMinSpareThreads();
148 }
149
150 public void setThreadPriority(int threadPriority) {
151 tp.setThreadPriority(threadPriority);
152 }
153
154 public int getThreadPriority() {
155 return tp.getThreadPriority();
156 }
157
158 public int getPort() {
159 return port;
160 }
161
162 public void setPort(int port ) {
163 this.port=port;
164 }
165
166 public InetAddress getAddress() {
167 return inet;
168 }
169
170 public void setAddress(InetAddress inet) {
171 this.inet=inet;
172 }
173
174 public void setServerSocket(ServerSocket ss) {
175 serverSocket = ss;
176 }
177
178 public void setServerSocketFactory( ServerSocketFactory factory ) {
179 this.factory=factory;
180 }
181
182 ServerSocketFactory getServerSocketFactory() {
183 return factory;
184 }
185
186 public void setConnectionHandler( TcpConnectionHandler handler ) {
187 this.handler=handler;
188 }
189
190 public TcpConnectionHandler getConnectionHandler() {
191 return handler;
192 }
193
194 public boolean isRunning() {
195 return running;
196 }
197
198 public boolean isPaused() {
199 return paused;
200 }
201
202 /**
203 * Allows the server developer to specify the backlog that
204 * should be used for server sockets. By default, this value
205 * is 100.
206 */
207 public void setBacklog(int backlog) {
208 if( backlog>0)
209 this.backlog = backlog;
210 }
211
212 public int getBacklog() {
213 return backlog;
214 }
215
216 /**
217 * Sets the timeout in ms of the server sockets created by this
218 * server. This method allows the developer to make servers
219 * more or less responsive to having their server sockets
220 * shut down.
221 *
222 * <p>By default this value is 1000ms.
223 */
224 public void setServerTimeout(int timeout) {
225 this.serverTimeout = timeout;
226 }
227
228 public boolean getTcpNoDelay() {
229 return tcpNoDelay;
230 }
231
232 public void setTcpNoDelay( boolean b ) {
233 tcpNoDelay=b;
234 }
235
236 public int getSoLinger() {
237 return linger;
238 }
239
240 public void setSoLinger( int i ) {
241 linger=i;
242 }
243
244 public int getSoTimeout() {
245 return socketTimeout;
246 }
247
248 public void setSoTimeout( int i ) {
249 socketTimeout=i;
250 }
251
252 public int getServerSoTimeout() {
253 return serverTimeout;
254 }
255
256 public void setServerSoTimeout( int i ) {
257 serverTimeout=i;
258 }
259
260 public String getStrategy() {
261 if (lf) {
262 return "lf";
263 } else {
264 return "ms";
265 }
266 }
267
268 public void setStrategy(String strategy) {
269 if ("ms".equals(strategy)) {
270 lf = false;
271 } else {
272 lf = true;
273 }
274 }
275
276 public int getCurrentThreadCount() {
277 return curThreads;
278 }
279
280 public int getCurrentThreadsBusy() {
281 return curThreads - workerThreads.size();
282 }
283
284 // -------------------- Public methods --------------------
285
286 public void initEndpoint() throws IOException, InstantiationException {
287 try {
288 if(factory==null)
289 factory=ServerSocketFactory.getDefault();
290 if(serverSocket==null) {
291 try {
292 if (inet == null) {
293 serverSocket = factory.createSocket(port, backlog);
294 } else {
295 serverSocket = factory.createSocket(port, backlog, inet);
296 }
297 } catch (BindException orig) {
298 String msg;
299 if (inet == null)
300 msg = orig.getMessage() + "<null>:" + port;
301 else
302 msg = orig.getMessage() + " " +
303 inet.toString() + ":" + port;
304 BindException be = new BindException(msg);
305 be.initCause(orig);
306 throw be;
307 }
308 }
309 if( serverTimeout >= 0 )
310 serverSocket.setSoTimeout( serverTimeout );
311 } catch( IOException ex ) {
312 throw ex;
313 } catch( InstantiationException ex1 ) {
314 throw ex1;
315 }
316 initialized = true;
317 }
318
319 public void startEndpoint() throws IOException, InstantiationException {
320 if (!initialized) {
321 initEndpoint();
322 }
323 if (lf) {
324 tp.start();
325 }
326 running = true;
327 paused = false;
328 if (lf) {
329 listener = new LeaderFollowerWorkerThread(this);
330 tp.runIt(listener);
331 } else {
332 maxThreads = getMaxThreads();
333 threadStart();
334 }
335 }
336
337 public void pauseEndpoint() {
338 if (running && !paused) {
339 paused = true;
340 unlockAccept();
341 }
342 }
343
344 public void resumeEndpoint() {
345 if (running) {
346 paused = false;
347 }
348 }
349
350 public void stopEndpoint() {
351 if (running) {
352 if (lf) {
353 tp.shutdown();
354 }
355 running = false;
356 if (serverSocket != null) {
357 closeServerSocket();
358 }
359 if (!lf) {
360 threadStop();
361 }
362 initialized=false ;
363 }
364 }
365
366 protected void closeServerSocket() {
367 if (!paused)
368 unlockAccept();
369 try {
370 if( serverSocket!=null)
371 serverSocket.close();
372 } catch(Exception e) {
373 log.error(sm.getString("endpoint.err.close"), e);
374 }
375 serverSocket = null;
376 }
377
378 protected void unlockAccept() {
379 Socket s = null;
380 try {
381 // Need to create a connection to unlock the accept();
382 if (inet == null) {
383 s = new Socket(InetAddress.getByName("localhost").getHostAddress(), port);
384 } else {
385 s = new Socket(inet, port);
386 // setting soLinger to a small value will help shutdown the
387 // connection quicker
388 s.setSoLinger(true, 0);
389 }
390 } catch(Exception e) {
391 if (log.isDebugEnabled()) {
392 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
393 }
394 } finally {
395 if (s != null) {
396 try {
397 s.close();
398 } catch (Exception e) {
399 // Ignore
400 }
401 }
402 }
403 }
404
405 // -------------------- Private methods
406
407 Socket acceptSocket() {
408 if( !running || serverSocket==null ) return null;
409
410 Socket accepted = null;
411
412 try {
413 if(factory==null) {
414 accepted = serverSocket.accept();
415 } else {
416 accepted = factory.acceptSocket(serverSocket);
417 }
418 if (null == accepted) {
419 log.warn(sm.getString("endpoint.warn.nullSocket"));
420 } else {
421 if (!running) {
422 accepted.close(); // rude, but unlikely!
423 accepted = null;
424 } else if (factory != null) {
425 factory.initSocket( accepted );
426 }
427 }
428 }
429 catch(InterruptedIOException iioe) {
430 // normal part -- should happen regularly so
431 // that the endpoint can release if the server
432 // is shutdown.
433 }
434 catch (AccessControlException ace) {
435 // When using the Java SecurityManager this exception
436 // can be thrown if you are restricting access to the
437 // socket with SocketPermission's.
438 // Log the unauthorized access and continue
439 String msg = sm.getString("endpoint.warn.security",
440 serverSocket, ace);
441 log.warn(msg);
442 }
443 catch (IOException e) {
444
445 String msg = null;
446
447 if (running) {
448 msg = sm.getString("endpoint.err.nonfatal",
449 serverSocket, e);
450 log.error(msg, e);
451 }
452
453 if (accepted != null) {
454 try {
455 accepted.close();
456 } catch(Throwable ex) {
457 msg = sm.getString("endpoint.err.nonfatal",
458 accepted, ex);
459 log.warn(msg, ex);
460 }
461 accepted = null;
462 }
463
464 if( ! running ) return null;
465 reinitializing = true;
466 // Restart endpoint when getting an IOException during accept
467 synchronized (threadSync) {
468 if (reinitializing) {
469 reinitializing = false;
470 // 1) Attempt to close server socket
471 closeServerSocket();
472 initialized = false;
473 // 2) Reinit endpoint (recreate server socket)
474 try {
475 msg = sm.getString("endpoint.warn.reinit");
476 log.warn(msg);
477 initEndpoint();
478 } catch (Throwable t) {
479 msg = sm.getString("endpoint.err.nonfatal",
480 serverSocket, t);
481 log.error(msg, t);
482 }
483 // 3) If failed, attempt to restart endpoint
484 if (!initialized) {
485 msg = sm.getString("endpoint.warn.restart");
486 log.warn(msg);
487 try {
488 stopEndpoint();
489 initEndpoint();
490 startEndpoint();
491 } catch (Throwable t) {
492 msg = sm.getString("endpoint.err.fatal",
493 serverSocket, t);
494 log.error(msg, t);
495 }
496 // Current thread is now invalid: kill it
497 throw new ThreadDeath();
498 }
499 }
500 }
501
502 }
503
504 return accepted;
505 }
506
507 void setSocketOptions(Socket socket)
508 throws SocketException {
509 if(linger >= 0 )
510 socket.setSoLinger( true, linger);
511 if( tcpNoDelay )
512 socket.setTcpNoDelay(tcpNoDelay);
513 if( socketTimeout > 0 )
514 socket.setSoTimeout( socketTimeout );
515 }
516
517
518 void processSocket(Socket s, TcpConnection con, Object[] threadData) {
519 // Process the connection
520 int step = 1;
521 try {
522
523 // 1: Set socket options: timeout, linger, etc
524 setSocketOptions(s);
525
526 // 2: SSL handshake
527 step = 2;
528 if (getServerSocketFactory() != null) {
529 getServerSocketFactory().handshake(s);
530 }
531
532 // 3: Process the connection
533 step = 3;
534 con.setEndpoint(this);
535 con.setSocket(s);
536 getConnectionHandler().processConnection(con, threadData);
537
538 } catch (SocketException se) {
539 log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()),
540 se);
541 // Try to close the socket
542 try {
543 s.close();
544 } catch (IOException e) {
545 }
546 } catch (Throwable t) {
547 if (step == 2) {
548 if (log.isDebugEnabled()) {
549 log.debug(sm.getString("endpoint.err.handshake"), t);
550 }
551 } else {
552 log.error(sm.getString("endpoint.err.unexpected"), t);
553 }
554 // Try to close the socket
555 try {
556 s.close();
557 } catch (IOException e) {
558 }
559 } finally {
560 if (con != null) {
561 con.recycle();
562 }
563 }
564 }
565
566
567 // -------------------------------------------------- Master Slave Methods
568
569
570 /**
571 * Create (or allocate) and return an available processor for use in
572 * processing a specific HTTP request, if possible. If the maximum
573 * allowed processors have already been created and are in use, return
574 * <code>null</code> instead.
575 */
576 private MasterSlaveWorkerThread createWorkerThread() {
577
578 synchronized (workerThreads) {
579 if (workerThreads.size() > 0) {
580 return ((MasterSlaveWorkerThread) workerThreads.pop());
581 }
582 if ((maxThreads > 0) && (curThreads < maxThreads)) {
583 return (newWorkerThread());
584 } else {
585 if (maxThreads < 0) {
586 return (newWorkerThread());
587 } else {
588 return (null);
589 }
590 }
591 }
592
593 }
594
595
596 /**
597 * Create and return a new processor suitable for processing HTTP
598 * requests and returning the corresponding responses.
599 */
600 private MasterSlaveWorkerThread newWorkerThread() {
601
602 MasterSlaveWorkerThread workerThread =
603 new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
604 workerThread.start();
605 created.addElement(workerThread);
606 return (workerThread);
607
608 }
609
610
611 /**
612 * Recycle the specified Processor so that it can be used again.
613 *
614 * @param processor The processor to be recycled
615 */
616 void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
617 workerThreads.push(workerThread);
618 }
619
620
621 /**
622 * The background thread that listens for incoming TCP/IP connections and
623 * hands them off to an appropriate processor.
624 */
625 public void run() {
626
627 // Loop until we receive a shutdown command
628 while (running) {
629
630 // Loop if endpoint is paused
631 while (paused) {
632 try {
633 Thread.sleep(1000);
634 } catch (InterruptedException e) {
635 // Ignore
636 }
637 }
638
639 // Allocate a new worker thread
640 MasterSlaveWorkerThread workerThread = createWorkerThread();
641 if (workerThread == null) {
642 try {
643 // Wait a little for load to go down: as a result,
644 // no accept will be made until the concurrency is
645 // lower than the specified maxThreads, and current
646 // connections will wait for a little bit instead of
647 // failing right away.
648 Thread.sleep(100);
649 } catch (InterruptedException e) {
650 // Ignore
651 }
652 continue;
653 }
654
655 // Accept the next incoming connection from the server socket
656 Socket socket = acceptSocket();
657
658 // Hand this socket off to an appropriate processor
659 workerThread.assign(socket);
660
661 // The processor will recycle itself when it finishes
662
663 }
664
665 // Notify the threadStop() method that we have shut ourselves down
666 synchronized (threadSync) {
667 threadSync.notifyAll();
668 }
669
670 }
671
672
673 /**
674 * Start the background processing thread.
675 */
676 private void threadStart() {
677 thread = new Thread(this, tp.getName());
678 thread.setPriority(getThreadPriority());
679 thread.setDaemon(true);
680 thread.start();
681 }
682
683
684 /**
685 * Stop the background processing thread.
686 */
687 private void threadStop() {
688 thread = null;
689 }
690
691
692 }