1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.tomcat.util.net;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.BindException;
23 import java.net.InetAddress;
24 import java.net.ServerSocket;
25 import java.net.Socket;
26 import java.net.SocketException;
27 import java.security.AccessControlException;
28 import java.util.Stack;
29 import java.util.Vector;
30
31 import org.apache.juli.logging.Log;
32 import org.apache.juli.logging.LogFactory;
33 import org.apache.tomcat.util.res.StringManager;
34 import org.apache.tomcat.util.threads.ThreadPool;
35 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
36
37 /* Similar with MPM module in Apache2.0. Handles all the details related with
38 "tcp server" functionality - thread management, accept policy, etc.
39 It should do nothing more - as soon as it get a socket ( and all socket options
40 are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin)
41 */
42
43
44
45 /**
46 * Handle incoming TCP connections.
47 *
48 * This class implement a simple server model: one listener thread accepts on a socket and
49 * creates a new worker thread for each incoming connection.
50 *
51 * More advanced Endpoints will reuse the threads, use queues, etc.
52 *
53 * @author James Duncan Davidson [duncan@eng.sun.com]
54 * @author Jason Hunter [jch@eng.sun.com]
55 * @author James Todd [gonzo@eng.sun.com]
56 * @author Costin@eng.sun.com
57 * @author Gal Shachor [shachor@il.ibm.com]
58 * @author Yoav Shapira <yoavs@apache.org>
59 */
60 public class PoolTcpEndpoint implements Runnable { // implements Endpoint {
61
62 static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
63
64 private StringManager sm =
65 StringManager.getManager("org.apache.tomcat.util.net.res");
66
67 private static final int BACKLOG = 100;
68 private static final int TIMEOUT = 1000;
69
70 private final Object threadSync = new Object();
71
72 private int backlog = BACKLOG;
73 private int serverTimeout = TIMEOUT;
74
75 private InetAddress inet;
76 private int port;
77
78 private ServerSocketFactory factory;
79 private ServerSocket serverSocket;
80
81 private volatile boolean running = false;
82 private volatile boolean paused = false;
83 private boolean initialized = false;
84 private boolean reinitializing = false;
85 static final int debug=0;
86
87 protected boolean tcpNoDelay=false;
88 protected int linger=100;
89 protected int socketTimeout=-1;
90 private boolean lf = true;
91
92
93 // ------ Leader follower fields
94
95
96 TcpConnectionHandler handler;
97 ThreadPoolRunnable listener;
98 ThreadPool tp;
99
100
101 // ------ Master slave fields
102
103 /* The background thread. */
104 private Thread thread = null;
105 /* Available processors. */
106 private Stack workerThreads = new Stack();
107 private int curThreads = 0;
108 private int maxThreads = 20;
109 /* All processors which have been created. */
110 private Vector created = new Vector();
111
112
113 public PoolTcpEndpoint() {
114 tp = new ThreadPool();
115 }
116
117 public PoolTcpEndpoint( ThreadPool tp ) {
118 this.tp=tp;
119 }
120
121 // -------------------- Configuration --------------------
122
123 public void setMaxThreads(int maxThreads) {
124 if( maxThreads > 0)
125 tp.setMaxThreads(maxThreads);
126 }
127
128 public int getMaxThreads() {
129 return tp.getMaxThreads();
130 }
131
132 public void setMaxSpareThreads(int maxThreads) {
133 if(maxThreads > 0)
134 tp.setMaxSpareThreads(maxThreads);
135 }
136
137 public int getMaxSpareThreads() {
138 return tp.getMaxSpareThreads();
139 }
140
141 public void setMinSpareThreads(int minThreads) {
142 if(minThreads > 0)
143 tp.setMinSpareThreads(minThreads);
144 }
145
146 public int getMinSpareThreads() {
147 return tp.getMinSpareThreads();
148 }
149
150 public void setThreadPriority(int threadPriority) {
151 tp.setThreadPriority(threadPriority);
152 }
153
154 public int getThreadPriority() {
155 return tp.getThreadPriority();
156 }
157
158 public int getPort() {
159 return port;
160 }
161
162 public void setPort(int port ) {
163 this.port=port;
164 }
165
166 public InetAddress getAddress() {
167 return inet;
168 }
169
170 public void setAddress(InetAddress inet) {
171 this.inet=inet;
172 }
173
174 public void setServerSocket(ServerSocket ss) {
175 serverSocket = ss;
176 }
177
178 public void setServerSocketFactory( ServerSocketFactory factory ) {
179 this.factory=factory;
180 }
181
182 ServerSocketFactory getServerSocketFactory() {
183 return factory;
184 }
185
186 public void setConnectionHandler( TcpConnectionHandler handler ) {
187 this.handler=handler;
188 }
189
190 public TcpConnectionHandler getConnectionHandler() {
191 return handler;
192 }
193
194 public boolean isRunning() {
195 return running;
196 }
197
198 public boolean isPaused() {
199 return paused;
200 }
201
202 /**
203 * Allows the server developer to specify the backlog that
204 * should be used for server sockets. By default, this value
205 * is 100.
206 */
207 public void setBacklog(int backlog) {
208 if( backlog>0)
209 this.backlog = backlog;
210 }
211
212 public int getBacklog() {
213 return backlog;
214 }
215
216 /**
217 * Sets the timeout in ms of the server sockets created by this
218 * server. This method allows the developer to make servers
219 * more or less responsive to having their server sockets
220 * shut down.
221 *
222 * <p>By default this value is 1000ms.
223 */
224 public void setServerTimeout(int timeout) {
225 this.serverTimeout = timeout;
226 }
227
228 public boolean getTcpNoDelay() {
229 return tcpNoDelay;
230 }
231
232 public void setTcpNoDelay( boolean b ) {
233 tcpNoDelay=b;
234 }
235
236 public int getSoLinger() {
237 return linger;
238 }
239
240 public void setSoLinger( int i ) {
241 linger=i;
242 }
243
244 public int getSoTimeout() {
245 return socketTimeout;
246 }
247
248 public void setSoTimeout( int i ) {
249 socketTimeout=i;
250 }
251
252 public int getServerSoTimeout() {
253 return serverTimeout;
254 }
255
256 public void setServerSoTimeout( int i ) {
257 serverTimeout=i;
258 }
259
260 public String getStrategy() {
261 if (lf) {
262 return "lf";
263 } else {
264 return "ms";
265 }
266 }
267
268 public void setStrategy(String strategy) {
269 if ("ms".equals(strategy)) {
270 lf = false;
271 } else {
272 lf = true;
273 }
274 }
275
276 public int getCurrentThreadCount() {
277 return curThreads;
278 }
279
280 public int getCurrentThreadsBusy() {
281 return curThreads - workerThreads.size();
282 }
283
284 // -------------------- Public methods --------------------
285
286 public void initEndpoint() throws IOException, InstantiationException {
287 try {
288 if(factory==null)
289 factory=ServerSocketFactory.getDefault();
290 if(serverSocket==null) {
291 try {
292 if (inet == null) {
293 serverSocket = factory.createSocket(port, backlog);
294 } else {
295 serverSocket = factory.createSocket(port, backlog, inet);
296 }
297 } catch ( BindException be ) {
298 throw new BindException(be.getMessage() + ":" + port);
299 }
300 }
301 if( serverTimeout >= 0 )
302 serverSocket.setSoTimeout( serverTimeout );
303 } catch( IOException ex ) {
304 throw ex;
305 } catch( InstantiationException ex1 ) {
306 throw ex1;
307 }
308 initialized = true;
309 }
310
311 public void startEndpoint() throws IOException, InstantiationException {
312 if (!initialized) {
313 initEndpoint();
314 }
315 if (lf) {
316 tp.start();
317 }
318 running = true;
319 paused = false;
320 if (lf) {
321 listener = new LeaderFollowerWorkerThread(this);
322 tp.runIt(listener);
323 } else {
324 maxThreads = getMaxThreads();
325 threadStart();
326 }
327 }
328
329 public void pauseEndpoint() {
330 if (running && !paused) {
331 paused = true;
332 unlockAccept();
333 }
334 }
335
336 public void resumeEndpoint() {
337 if (running) {
338 paused = false;
339 }
340 }
341
342 public void stopEndpoint() {
343 if (running) {
344 if (lf) {
345 tp.shutdown();
346 }
347 running = false;
348 if (serverSocket != null) {
349 closeServerSocket();
350 }
351 if (!lf) {
352 threadStop();
353 }
354 initialized=false ;
355 }
356 }
357
358 protected void closeServerSocket() {
359 if (!paused)
360 unlockAccept();
361 try {
362 if( serverSocket!=null)
363 serverSocket.close();
364 } catch(Exception e) {
365 log.error(sm.getString("endpoint.err.close"), e);
366 }
367 serverSocket = null;
368 }
369
370 protected void unlockAccept() {
371 Socket s = null;
372 try {
373 // Need to create a connection to unlock the accept();
374 if (inet == null) {
375 s = new Socket(InetAddress.getByName("localhost").getHostAddress(), port);
376 } else {
377 s = new Socket(inet, port);
378 // setting soLinger to a small value will help shutdown the
379 // connection quicker
380 s.setSoLinger(true, 0);
381 }
382 } catch(Exception e) {
383 if (log.isDebugEnabled()) {
384 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
385 }
386 } finally {
387 if (s != null) {
388 try {
389 s.close();
390 } catch (Exception e) {
391 // Ignore
392 }
393 }
394 }
395 }
396
397 // -------------------- Private methods
398
399 Socket acceptSocket() {
400 if( !running || serverSocket==null ) return null;
401
402 Socket accepted = null;
403
404 try {
405 if(factory==null) {
406 accepted = serverSocket.accept();
407 } else {
408 accepted = factory.acceptSocket(serverSocket);
409 }
410 if (null == accepted) {
411 log.warn(sm.getString("endpoint.warn.nullSocket"));
412 } else {
413 if (!running) {
414 accepted.close(); // rude, but unlikely!
415 accepted = null;
416 } else if (factory != null) {
417 factory.initSocket( accepted );
418 }
419 }
420 }
421 catch(InterruptedIOException iioe) {
422 // normal part -- should happen regularly so
423 // that the endpoint can release if the server
424 // is shutdown.
425 }
426 catch (AccessControlException ace) {
427 // When using the Java SecurityManager this exception
428 // can be thrown if you are restricting access to the
429 // socket with SocketPermission's.
430 // Log the unauthorized access and continue
431 String msg = sm.getString("endpoint.warn.security",
432 serverSocket, ace);
433 log.warn(msg);
434 }
435 catch (IOException e) {
436
437 String msg = null;
438
439 if (running) {
440 msg = sm.getString("endpoint.err.nonfatal",
441 serverSocket, e);
442 log.error(msg, e);
443 }
444
445 if (accepted != null) {
446 try {
447 accepted.close();
448 } catch(Throwable ex) {
449 msg = sm.getString("endpoint.err.nonfatal",
450 accepted, ex);
451 log.warn(msg, ex);
452 }
453 accepted = null;
454 }
455
456 if( ! running ) return null;
457 reinitializing = true;
458 // Restart endpoint when getting an IOException during accept
459 synchronized (threadSync) {
460 if (reinitializing) {
461 reinitializing = false;
462 // 1) Attempt to close server socket
463 closeServerSocket();
464 initialized = false;
465 // 2) Reinit endpoint (recreate server socket)
466 try {
467 msg = sm.getString("endpoint.warn.reinit");
468 log.warn(msg);
469 initEndpoint();
470 } catch (Throwable t) {
471 msg = sm.getString("endpoint.err.nonfatal",
472 serverSocket, t);
473 log.error(msg, t);
474 }
475 // 3) If failed, attempt to restart endpoint
476 if (!initialized) {
477 msg = sm.getString("endpoint.warn.restart");
478 log.warn(msg);
479 try {
480 stopEndpoint();
481 initEndpoint();
482 startEndpoint();
483 } catch (Throwable t) {
484 msg = sm.getString("endpoint.err.fatal",
485 serverSocket, t);
486 log.error(msg, t);
487 }
488 // Current thread is now invalid: kill it
489 throw new ThreadDeath();
490 }
491 }
492 }
493
494 }
495
496 return accepted;
497 }
498
499 void setSocketOptions(Socket socket)
500 throws SocketException {
501 if(linger >= 0 )
502 socket.setSoLinger( true, linger);
503 if( tcpNoDelay )
504 socket.setTcpNoDelay(tcpNoDelay);
505 if( socketTimeout > 0 )
506 socket.setSoTimeout( socketTimeout );
507 }
508
509
510 void processSocket(Socket s, TcpConnection con, Object[] threadData) {
511 // Process the connection
512 int step = 1;
513 try {
514
515 // 1: Set socket options: timeout, linger, etc
516 setSocketOptions(s);
517
518 // 2: SSL handshake
519 step = 2;
520 if (getServerSocketFactory() != null) {
521 getServerSocketFactory().handshake(s);
522 }
523
524 // 3: Process the connection
525 step = 3;
526 con.setEndpoint(this);
527 con.setSocket(s);
528 getConnectionHandler().processConnection(con, threadData);
529
530 } catch (SocketException se) {
531 log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()),
532 se);
533 // Try to close the socket
534 try {
535 s.close();
536 } catch (IOException e) {
537 }
538 } catch (Throwable t) {
539 if (step == 2) {
540 if (log.isDebugEnabled()) {
541 log.debug(sm.getString("endpoint.err.handshake"), t);
542 }
543 } else {
544 log.error(sm.getString("endpoint.err.unexpected"), t);
545 }
546 // Try to close the socket
547 try {
548 s.close();
549 } catch (IOException e) {
550 }
551 } finally {
552 if (con != null) {
553 con.recycle();
554 }
555 }
556 }
557
558
559 // -------------------------------------------------- Master Slave Methods
560
561
562 /**
563 * Create (or allocate) and return an available processor for use in
564 * processing a specific HTTP request, if possible. If the maximum
565 * allowed processors have already been created and are in use, return
566 * <code>null</code> instead.
567 */
568 private MasterSlaveWorkerThread createWorkerThread() {
569
570 synchronized (workerThreads) {
571 if (workerThreads.size() > 0) {
572 return ((MasterSlaveWorkerThread) workerThreads.pop());
573 }
574 if ((maxThreads > 0) && (curThreads < maxThreads)) {
575 return (newWorkerThread());
576 } else {
577 if (maxThreads < 0) {
578 return (newWorkerThread());
579 } else {
580 return (null);
581 }
582 }
583 }
584
585 }
586
587
588 /**
589 * Create and return a new processor suitable for processing HTTP
590 * requests and returning the corresponding responses.
591 */
592 private MasterSlaveWorkerThread newWorkerThread() {
593
594 MasterSlaveWorkerThread workerThread =
595 new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
596 workerThread.start();
597 created.addElement(workerThread);
598 return (workerThread);
599
600 }
601
602
603 /**
604 * Recycle the specified Processor so that it can be used again.
605 *
606 * @param processor The processor to be recycled
607 */
608 void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
609 workerThreads.push(workerThread);
610 }
611
612
613 /**
614 * The background thread that listens for incoming TCP/IP connections and
615 * hands them off to an appropriate processor.
616 */
617 public void run() {
618
619 // Loop until we receive a shutdown command
620 while (running) {
621
622 // Loop if endpoint is paused
623 while (paused) {
624 try {
625 Thread.sleep(1000);
626 } catch (InterruptedException e) {
627 // Ignore
628 }
629 }
630
631 // Allocate a new worker thread
632 MasterSlaveWorkerThread workerThread = createWorkerThread();
633 if (workerThread == null) {
634 try {
635 // Wait a little for load to go down: as a result,
636 // no accept will be made until the concurrency is
637 // lower than the specified maxThreads, and current
638 // connections will wait for a little bit instead of
639 // failing right away.
640 Thread.sleep(100);
641 } catch (InterruptedException e) {
642 // Ignore
643 }
644 continue;
645 }
646
647 // Accept the next incoming connection from the server socket
648 Socket socket = acceptSocket();
649
650 // Hand this socket off to an appropriate processor
651 workerThread.assign(socket);
652
653 // The processor will recycle itself when it finishes
654
655 }
656
657 // Notify the threadStop() method that we have shut ourselves down
658 synchronized (threadSync) {
659 threadSync.notifyAll();
660 }
661
662 }
663
664
665 /**
666 * Start the background processing thread.
667 */
668 private void threadStart() {
669 thread = new Thread(this, tp.getName());
670 thread.setPriority(getThreadPriority());
671 thread.setDaemon(true);
672 thread.start();
673 }
674
675
676 /**
677 * Stop the background processing thread.
678 */
679 private void threadStop() {
680 thread = null;
681 }
682
683
684 }