1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.tomcat.util.net; 19 20 import java.io.IOException; 21 import java.io.InterruptedIOException; 22 import java.net.BindException; 23 import java.net.InetAddress; 24 import java.net.ServerSocket; 25 import java.net.Socket; 26 import java.net.SocketException; 27 import java.security.AccessControlException; 28 import java.util.Stack; 29 import java.util.Vector; 30 31 import org.apache.juli.logging.Log; 32 import org.apache.juli.logging.LogFactory; 33 import org.apache.tomcat.util.res.StringManager; 34 import org.apache.tomcat.util.threads.ThreadPool; 35 import org.apache.tomcat.util.threads.ThreadPoolRunnable; 36 37 /* Similar with MPM module in Apache2.0. Handles all the details related with 38 "tcp server" functionality - thread management, accept policy, etc. 39 It should do nothing more - as soon as it get a socket ( and all socket options 40 are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin) 41 */ 42 43 44 45 /** 46 * Handle incoming TCP connections. 47 * 48 * This class implement a simple server model: one listener thread accepts on a socket and 49 * creates a new worker thread for each incoming connection. 50 * 51 * More advanced Endpoints will reuse the threads, use queues, etc. 52 * 53 * @author James Duncan Davidson [duncan@eng.sun.com] 54 * @author Jason Hunter [jch@eng.sun.com] 55 * @author James Todd [gonzo@eng.sun.com] 56 * @author Costin@eng.sun.com 57 * @author Gal Shachor [shachor@il.ibm.com] 58 * @author Yoav Shapira <yoavs@apache.org> 59 */ 60 public class PoolTcpEndpoint implements Runnable { // implements Endpoint { 61 62 static Log log=LogFactory.getLog(PoolTcpEndpoint.class ); 63 64 private StringManager sm = 65 StringManager.getManager("org.apache.tomcat.util.net.res"); 66 67 private static final int BACKLOG = 100; 68 private static final int TIMEOUT = 1000; 69 70 private final Object threadSync = new Object(); 71 72 private int backlog = BACKLOG; 73 private int serverTimeout = TIMEOUT; 74 75 private InetAddress inet; 76 private int port; 77 78 private ServerSocketFactory factory; 79 private ServerSocket serverSocket; 80 81 private volatile boolean running = false; 82 private volatile boolean paused = false; 83 private boolean initialized = false; 84 private boolean reinitializing = false; 85 static final int debug=0; 86 87 protected boolean tcpNoDelay=false; 88 protected int linger=100; 89 protected int socketTimeout=-1; 90 private boolean lf = true; 91 92 93 // ------ Leader follower fields 94 95 96 TcpConnectionHandler handler; 97 ThreadPoolRunnable listener; 98 ThreadPool tp; 99 100 101 // ------ Master slave fields 102 103 /* The background thread. */ 104 private Thread thread = null; 105 /* Available processors. */ 106 private Stack workerThreads = new Stack(); 107 private int curThreads = 0; 108 private int maxThreads = 20; 109 /* All processors which have been created. */ 110 private Vector created = new Vector(); 111 112 113 public PoolTcpEndpoint() { 114 tp = new ThreadPool(); 115 } 116 117 public PoolTcpEndpoint( ThreadPool tp ) { 118 this.tp=tp; 119 } 120 121 // -------------------- Configuration -------------------- 122 123 public void setMaxThreads(int maxThreads) { 124 if( maxThreads > 0) 125 tp.setMaxThreads(maxThreads); 126 } 127 128 public int getMaxThreads() { 129 return tp.getMaxThreads(); 130 } 131 132 public void setMaxSpareThreads(int maxThreads) { 133 if(maxThreads > 0) 134 tp.setMaxSpareThreads(maxThreads); 135 } 136 137 public int getMaxSpareThreads() { 138 return tp.getMaxSpareThreads(); 139 } 140 141 public void setMinSpareThreads(int minThreads) { 142 if(minThreads > 0) 143 tp.setMinSpareThreads(minThreads); 144 } 145 146 public int getMinSpareThreads() { 147 return tp.getMinSpareThreads(); 148 } 149 150 public void setThreadPriority(int threadPriority) { 151 tp.setThreadPriority(threadPriority); 152 } 153 154 public int getThreadPriority() { 155 return tp.getThreadPriority(); 156 } 157 158 public int getPort() { 159 return port; 160 } 161 162 public void setPort(int port ) { 163 this.port=port; 164 } 165 166 public InetAddress getAddress() { 167 return inet; 168 } 169 170 public void setAddress(InetAddress inet) { 171 this.inet=inet; 172 } 173 174 public void setServerSocket(ServerSocket ss) { 175 serverSocket = ss; 176 } 177 178 public void setServerSocketFactory( ServerSocketFactory factory ) { 179 this.factory=factory; 180 } 181 182 ServerSocketFactory getServerSocketFactory() { 183 return factory; 184 } 185 186 public void setConnectionHandler( TcpConnectionHandler handler ) { 187 this.handler=handler; 188 } 189 190 public TcpConnectionHandler getConnectionHandler() { 191 return handler; 192 } 193 194 public boolean isRunning() { 195 return running; 196 } 197 198 public boolean isPaused() { 199 return paused; 200 } 201 202 /** 203 * Allows the server developer to specify the backlog that 204 * should be used for server sockets. By default, this value 205 * is 100. 206 */ 207 public void setBacklog(int backlog) { 208 if( backlog>0) 209 this.backlog = backlog; 210 } 211 212 public int getBacklog() { 213 return backlog; 214 } 215 216 /** 217 * Sets the timeout in ms of the server sockets created by this 218 * server. This method allows the developer to make servers 219 * more or less responsive to having their server sockets 220 * shut down. 221 * 222 * <p>By default this value is 1000ms. 223 */ 224 public void setServerTimeout(int timeout) { 225 this.serverTimeout = timeout; 226 } 227 228 public boolean getTcpNoDelay() { 229 return tcpNoDelay; 230 } 231 232 public void setTcpNoDelay( boolean b ) { 233 tcpNoDelay=b; 234 } 235 236 public int getSoLinger() { 237 return linger; 238 } 239 240 public void setSoLinger( int i ) { 241 linger=i; 242 } 243 244 public int getSoTimeout() { 245 return socketTimeout; 246 } 247 248 public void setSoTimeout( int i ) { 249 socketTimeout=i; 250 } 251 252 public int getServerSoTimeout() { 253 return serverTimeout; 254 } 255 256 public void setServerSoTimeout( int i ) { 257 serverTimeout=i; 258 } 259 260 public String getStrategy() { 261 if (lf) { 262 return "lf"; 263 } else { 264 return "ms"; 265 } 266 } 267 268 public void setStrategy(String strategy) { 269 if ("ms".equals(strategy)) { 270 lf = false; 271 } else { 272 lf = true; 273 } 274 } 275 276 public int getCurrentThreadCount() { 277 return curThreads; 278 } 279 280 public int getCurrentThreadsBusy() { 281 return curThreads - workerThreads.size(); 282 } 283 284 // -------------------- Public methods -------------------- 285 286 public void initEndpoint() throws IOException, InstantiationException { 287 try { 288 if(factory==null) 289 factory=ServerSocketFactory.getDefault(); 290 if(serverSocket==null) { 291 try { 292 if (inet == null) { 293 serverSocket = factory.createSocket(port, backlog); 294 } else { 295 serverSocket = factory.createSocket(port, backlog, inet); 296 } 297 } catch (BindException orig) { 298 String msg; 299 if (inet == null) 300 msg = orig.getMessage() + "<null>:" + port; 301 else 302 msg = orig.getMessage() + " " + 303 inet.toString() + ":" + port; 304 BindException be = new BindException(msg); 305 be.initCause(orig); 306 throw be; 307 } 308 } 309 if( serverTimeout >= 0 ) 310 serverSocket.setSoTimeout( serverTimeout ); 311 } catch( IOException ex ) { 312 throw ex; 313 } catch( InstantiationException ex1 ) { 314 throw ex1; 315 } 316 initialized = true; 317 } 318 319 public void startEndpoint() throws IOException, InstantiationException { 320 if (!initialized) { 321 initEndpoint(); 322 } 323 if (lf) { 324 tp.start(); 325 } 326 running = true; 327 paused = false; 328 if (lf) { 329 listener = new LeaderFollowerWorkerThread(this); 330 tp.runIt(listener); 331 } else { 332 maxThreads = getMaxThreads(); 333 threadStart(); 334 } 335 } 336 337 public void pauseEndpoint() { 338 if (running && !paused) { 339 paused = true; 340 unlockAccept(); 341 } 342 } 343 344 public void resumeEndpoint() { 345 if (running) { 346 paused = false; 347 } 348 } 349 350 public void stopEndpoint() { 351 if (running) { 352 if (lf) { 353 tp.shutdown(); 354 } 355 running = false; 356 if (serverSocket != null) { 357 closeServerSocket(); 358 } 359 if (!lf) { 360 threadStop(); 361 } 362 initialized=false ; 363 } 364 } 365 366 protected void closeServerSocket() { 367 if (!paused) 368 unlockAccept(); 369 try { 370 if( serverSocket!=null) 371 serverSocket.close(); 372 } catch(Exception e) { 373 log.error(sm.getString("endpoint.err.close"), e); 374 } 375 serverSocket = null; 376 } 377 378 protected void unlockAccept() { 379 Socket s = null; 380 try { 381 // Need to create a connection to unlock the accept(); 382 if (inet == null) { 383 s = new Socket(InetAddress.getByName("localhost").getHostAddress(), port); 384 } else { 385 s = new Socket(inet, port); 386 // setting soLinger to a small value will help shutdown the 387 // connection quicker 388 s.setSoLinger(true, 0); 389 } 390 } catch(Exception e) { 391 if (log.isDebugEnabled()) { 392 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); 393 } 394 } finally { 395 if (s != null) { 396 try { 397 s.close(); 398 } catch (Exception e) { 399 // Ignore 400 } 401 } 402 } 403 } 404 405 // -------------------- Private methods 406 407 Socket acceptSocket() { 408 if( !running || serverSocket==null ) return null; 409 410 Socket accepted = null; 411 412 try { 413 if(factory==null) { 414 accepted = serverSocket.accept(); 415 } else { 416 accepted = factory.acceptSocket(serverSocket); 417 } 418 if (null == accepted) { 419 log.warn(sm.getString("endpoint.warn.nullSocket")); 420 } else { 421 if (!running) { 422 accepted.close(); // rude, but unlikely! 423 accepted = null; 424 } else if (factory != null) { 425 factory.initSocket( accepted ); 426 } 427 } 428 } 429 catch(InterruptedIOException iioe) { 430 // normal part -- should happen regularly so 431 // that the endpoint can release if the server 432 // is shutdown. 433 } 434 catch (AccessControlException ace) { 435 // When using the Java SecurityManager this exception 436 // can be thrown if you are restricting access to the 437 // socket with SocketPermission's. 438 // Log the unauthorized access and continue 439 String msg = sm.getString("endpoint.warn.security", 440 serverSocket, ace); 441 log.warn(msg); 442 } 443 catch (IOException e) { 444 445 String msg = null; 446 447 if (running) { 448 msg = sm.getString("endpoint.err.nonfatal", 449 serverSocket, e); 450 log.error(msg, e); 451 } 452 453 if (accepted != null) { 454 try { 455 accepted.close(); 456 } catch(Throwable ex) { 457 msg = sm.getString("endpoint.err.nonfatal", 458 accepted, ex); 459 log.warn(msg, ex); 460 } 461 accepted = null; 462 } 463 464 if( ! running ) return null; 465 reinitializing = true; 466 // Restart endpoint when getting an IOException during accept 467 synchronized (threadSync) { 468 if (reinitializing) { 469 reinitializing = false; 470 // 1) Attempt to close server socket 471 closeServerSocket(); 472 initialized = false; 473 // 2) Reinit endpoint (recreate server socket) 474 try { 475 msg = sm.getString("endpoint.warn.reinit"); 476 log.warn(msg); 477 initEndpoint(); 478 } catch (Throwable t) { 479 msg = sm.getString("endpoint.err.nonfatal", 480 serverSocket, t); 481 log.error(msg, t); 482 } 483 // 3) If failed, attempt to restart endpoint 484 if (!initialized) { 485 msg = sm.getString("endpoint.warn.restart"); 486 log.warn(msg); 487 try { 488 stopEndpoint(); 489 initEndpoint(); 490 startEndpoint(); 491 } catch (Throwable t) { 492 msg = sm.getString("endpoint.err.fatal", 493 serverSocket, t); 494 log.error(msg, t); 495 } 496 // Current thread is now invalid: kill it 497 throw new ThreadDeath(); 498 } 499 } 500 } 501 502 } 503 504 return accepted; 505 } 506 507 void setSocketOptions(Socket socket) 508 throws SocketException { 509 if(linger >= 0 ) 510 socket.setSoLinger( true, linger); 511 if( tcpNoDelay ) 512 socket.setTcpNoDelay(tcpNoDelay); 513 if( socketTimeout > 0 ) 514 socket.setSoTimeout( socketTimeout ); 515 } 516 517 518 void processSocket(Socket s, TcpConnection con, Object[] threadData) { 519 // Process the connection 520 int step = 1; 521 try { 522 523 // 1: Set socket options: timeout, linger, etc 524 setSocketOptions(s); 525 526 // 2: SSL handshake 527 step = 2; 528 if (getServerSocketFactory() != null) { 529 getServerSocketFactory().handshake(s); 530 } 531 532 // 3: Process the connection 533 step = 3; 534 con.setEndpoint(this); 535 con.setSocket(s); 536 getConnectionHandler().processConnection(con, threadData); 537 538 } catch (SocketException se) { 539 log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()), 540 se); 541 // Try to close the socket 542 try { 543 s.close(); 544 } catch (IOException e) { 545 } 546 } catch (Throwable t) { 547 if (step == 2) { 548 if (log.isDebugEnabled()) { 549 log.debug(sm.getString("endpoint.err.handshake"), t); 550 } 551 } else { 552 log.error(sm.getString("endpoint.err.unexpected"), t); 553 } 554 // Try to close the socket 555 try { 556 s.close(); 557 } catch (IOException e) { 558 } 559 } finally { 560 if (con != null) { 561 con.recycle(); 562 } 563 } 564 } 565 566 567 // -------------------------------------------------- Master Slave Methods 568 569 570 /** 571 * Create (or allocate) and return an available processor for use in 572 * processing a specific HTTP request, if possible. If the maximum 573 * allowed processors have already been created and are in use, return 574 * <code>null</code> instead. 575 */ 576 private MasterSlaveWorkerThread createWorkerThread() { 577 578 synchronized (workerThreads) { 579 if (workerThreads.size() > 0) { 580 return ((MasterSlaveWorkerThread) workerThreads.pop()); 581 } 582 if ((maxThreads > 0) && (curThreads < maxThreads)) { 583 return (newWorkerThread()); 584 } else { 585 if (maxThreads < 0) { 586 return (newWorkerThread()); 587 } else { 588 return (null); 589 } 590 } 591 } 592 593 } 594 595 596 /** 597 * Create and return a new processor suitable for processing HTTP 598 * requests and returning the corresponding responses. 599 */ 600 private MasterSlaveWorkerThread newWorkerThread() { 601 602 MasterSlaveWorkerThread workerThread = 603 new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads)); 604 workerThread.start(); 605 created.addElement(workerThread); 606 return (workerThread); 607 608 } 609 610 611 /** 612 * Recycle the specified Processor so that it can be used again. 613 * 614 * @param processor The processor to be recycled 615 */ 616 void recycleWorkerThread(MasterSlaveWorkerThread workerThread) { 617 workerThreads.push(workerThread); 618 } 619 620 621 /** 622 * The background thread that listens for incoming TCP/IP connections and 623 * hands them off to an appropriate processor. 624 */ 625 public void run() { 626 627 // Loop until we receive a shutdown command 628 while (running) { 629 630 // Loop if endpoint is paused 631 while (paused) { 632 try { 633 Thread.sleep(1000); 634 } catch (InterruptedException e) { 635 // Ignore 636 } 637 } 638 639 // Allocate a new worker thread 640 MasterSlaveWorkerThread workerThread = createWorkerThread(); 641 if (workerThread == null) { 642 try { 643 // Wait a little for load to go down: as a result, 644 // no accept will be made until the concurrency is 645 // lower than the specified maxThreads, and current 646 // connections will wait for a little bit instead of 647 // failing right away. 648 Thread.sleep(100); 649 } catch (InterruptedException e) { 650 // Ignore 651 } 652 continue; 653 } 654 655 // Accept the next incoming connection from the server socket 656 Socket socket = acceptSocket(); 657 658 // Hand this socket off to an appropriate processor 659 workerThread.assign(socket); 660 661 // The processor will recycle itself when it finishes 662 663 } 664 665 // Notify the threadStop() method that we have shut ourselves down 666 synchronized (threadSync) { 667 threadSync.notifyAll(); 668 } 669 670 } 671 672 673 /** 674 * Start the background processing thread. 675 */ 676 private void threadStart() { 677 thread = new Thread(this, tp.getName()); 678 thread.setPriority(getThreadPriority()); 679 thread.setDaemon(true); 680 thread.start(); 681 } 682 683 684 /** 685 * Stop the background processing thread. 686 */ 687 private void threadStop() { 688 thread = null; 689 } 690 691 692 }