Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » tomcat » util » net » [javadoc | source]
    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one or more
    3    *  contributor license agreements.  See the NOTICE file distributed with
    4    *  this work for additional information regarding copyright ownership.
    5    *  The ASF licenses this file to You under the Apache License, Version 2.0
    6    *  (the "License"); you may not use this file except in compliance with
    7    *  the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   
   18   package org.apache.tomcat.util.net;
   19   
   20   import java.io.IOException;
   21   import java.io.InterruptedIOException;
   22   import java.net.BindException;
   23   import java.net.InetAddress;
   24   import java.net.ServerSocket;
   25   import java.net.Socket;
   26   import java.net.SocketException;
   27   import java.security.AccessControlException;
   28   import java.util.Stack;
   29   import java.util.Vector;
   30   
   31   import org.apache.juli.logging.Log;
   32   import org.apache.juli.logging.LogFactory;
   33   import org.apache.tomcat.util.res.StringManager;
   34   import org.apache.tomcat.util.threads.ThreadPool;
   35   import org.apache.tomcat.util.threads.ThreadPoolRunnable;
   36   
   37   /* Similar with MPM module in Apache2.0. Handles all the details related with
   38      "tcp server" functionality - thread management, accept policy, etc.
   39      It should do nothing more - as soon as it get a socket ( and all socket options
   40      are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin)
   41   */
   42   
   43   
   44   
   45   /**
   46    * Handle incoming TCP connections.
   47    *
   48    * This class implement a simple server model: one listener thread accepts on a socket and
   49    * creates a new worker thread for each incoming connection.
   50    *
   51    * More advanced Endpoints will reuse the threads, use queues, etc.
   52    *
   53    * @author James Duncan Davidson [duncan@eng.sun.com]
   54    * @author Jason Hunter [jch@eng.sun.com]
   55    * @author James Todd [gonzo@eng.sun.com]
   56    * @author Costin@eng.sun.com
   57    * @author Gal Shachor [shachor@il.ibm.com]
   58    * @author Yoav Shapira <yoavs@apache.org>
   59    */
   60   public class PoolTcpEndpoint implements Runnable { // implements Endpoint {
   61   
   62       static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
   63   
   64       private StringManager sm = 
   65           StringManager.getManager("org.apache.tomcat.util.net.res");
   66   
   67       private static final int BACKLOG = 100;
   68       private static final int TIMEOUT = 1000;
   69   
   70       private final Object threadSync = new Object();
   71   
   72       private int backlog = BACKLOG;
   73       private int serverTimeout = TIMEOUT;
   74   
   75       private InetAddress inet;
   76       private int port;
   77   
   78       private ServerSocketFactory factory;
   79       private ServerSocket serverSocket;
   80   
   81       private volatile boolean running = false;
   82       private volatile boolean paused = false;
   83       private boolean initialized = false;
   84       private boolean reinitializing = false;
   85       static final int debug=0;
   86   
   87       protected boolean tcpNoDelay=false;
   88       protected int linger=100;
   89       protected int socketTimeout=-1;
   90       private boolean lf = true;
   91   
   92       
   93       // ------ Leader follower fields
   94   
   95       
   96       TcpConnectionHandler handler;
   97       ThreadPoolRunnable listener;
   98       ThreadPool tp;
   99   
  100       
  101       // ------ Master slave fields
  102   
  103       /* The background thread. */
  104       private Thread thread = null;
  105       /* Available processors. */
  106       private Stack workerThreads = new Stack();
  107       private int curThreads = 0;
  108       private int maxThreads = 20;
  109       /* All processors which have been created. */
  110       private Vector created = new Vector();
  111   
  112       
  113       public PoolTcpEndpoint() {
  114   	tp = new ThreadPool();
  115       }
  116   
  117       public PoolTcpEndpoint( ThreadPool tp ) {
  118           this.tp=tp;
  119       }
  120   
  121       // -------------------- Configuration --------------------
  122   
  123       public void setMaxThreads(int maxThreads) {
  124   	if( maxThreads > 0)
  125   	    tp.setMaxThreads(maxThreads);
  126       }
  127   
  128       public int getMaxThreads() {
  129           return tp.getMaxThreads();
  130       }
  131   
  132       public void setMaxSpareThreads(int maxThreads) {
  133   	if(maxThreads > 0) 
  134   	    tp.setMaxSpareThreads(maxThreads);
  135       }
  136   
  137       public int getMaxSpareThreads() {
  138           return tp.getMaxSpareThreads();
  139       }
  140   
  141       public void setMinSpareThreads(int minThreads) {
  142   	if(minThreads > 0) 
  143   	    tp.setMinSpareThreads(minThreads);
  144       }
  145   
  146       public int getMinSpareThreads() {
  147           return tp.getMinSpareThreads();
  148       }
  149   
  150       public void setThreadPriority(int threadPriority) {
  151         tp.setThreadPriority(threadPriority);
  152       }
  153   
  154       public int getThreadPriority() {
  155         return tp.getThreadPriority();
  156       }
  157   
  158       public int getPort() {
  159           return port;
  160       }
  161   
  162       public void setPort(int port ) {
  163           this.port=port;
  164       }
  165   
  166       public InetAddress getAddress() {
  167   	    return inet;
  168       }
  169   
  170       public void setAddress(InetAddress inet) {
  171   	    this.inet=inet;
  172       }
  173   
  174       public void setServerSocket(ServerSocket ss) {
  175   	    serverSocket = ss;
  176       }
  177   
  178       public void setServerSocketFactory(  ServerSocketFactory factory ) {
  179   	    this.factory=factory;
  180       }
  181   
  182      ServerSocketFactory getServerSocketFactory() {
  183    	    return factory;
  184      }
  185   
  186       public void setConnectionHandler( TcpConnectionHandler handler ) {
  187       	this.handler=handler;
  188       }
  189   
  190       public TcpConnectionHandler getConnectionHandler() {
  191   	    return handler;
  192       }
  193   
  194       public boolean isRunning() {
  195   	return running;
  196       }
  197       
  198       public boolean isPaused() {
  199   	return paused;
  200       }
  201       
  202       /**
  203        * Allows the server developer to specify the backlog that
  204        * should be used for server sockets. By default, this value
  205        * is 100.
  206        */
  207       public void setBacklog(int backlog) {
  208   	if( backlog>0)
  209   	    this.backlog = backlog;
  210       }
  211   
  212       public int getBacklog() {
  213           return backlog;
  214       }
  215   
  216       /**
  217        * Sets the timeout in ms of the server sockets created by this
  218        * server. This method allows the developer to make servers
  219        * more or less responsive to having their server sockets
  220        * shut down.
  221        *
  222        * <p>By default this value is 1000ms.
  223        */
  224       public void setServerTimeout(int timeout) {
  225   	this.serverTimeout = timeout;
  226       }
  227   
  228       public boolean getTcpNoDelay() {
  229           return tcpNoDelay;
  230       }
  231       
  232       public void setTcpNoDelay( boolean b ) {
  233   	tcpNoDelay=b;
  234       }
  235   
  236       public int getSoLinger() {
  237           return linger;
  238       }
  239       
  240       public void setSoLinger( int i ) {
  241   	linger=i;
  242       }
  243   
  244       public int getSoTimeout() {
  245           return socketTimeout;
  246       }
  247       
  248       public void setSoTimeout( int i ) {
  249   	socketTimeout=i;
  250       }
  251       
  252       public int getServerSoTimeout() {
  253           return serverTimeout;
  254       }  
  255       
  256       public void setServerSoTimeout( int i ) {
  257   	serverTimeout=i;
  258       }
  259   
  260       public String getStrategy() {
  261           if (lf) {
  262               return "lf";
  263           } else {
  264               return "ms";
  265           }
  266       }
  267       
  268       public void setStrategy(String strategy) {
  269           if ("ms".equals(strategy)) {
  270               lf = false;
  271           } else {
  272               lf = true;
  273           }
  274       }
  275   
  276       public int getCurrentThreadCount() {
  277           return curThreads;
  278       }
  279       
  280       public int getCurrentThreadsBusy() {
  281           return curThreads - workerThreads.size();
  282       }
  283       
  284       // -------------------- Public methods --------------------
  285   
  286       public void initEndpoint() throws IOException, InstantiationException {
  287           try {
  288               if(factory==null)
  289                   factory=ServerSocketFactory.getDefault();
  290               if(serverSocket==null) {
  291                   try {
  292                       if (inet == null) {
  293                           serverSocket = factory.createSocket(port, backlog);
  294                       } else {
  295                           serverSocket = factory.createSocket(port, backlog, inet);
  296                       }
  297                   } catch ( BindException be ) {
  298                       throw new BindException(be.getMessage() + ":" + port);
  299                   }
  300               }
  301               if( serverTimeout >= 0 )
  302                   serverSocket.setSoTimeout( serverTimeout );
  303           } catch( IOException ex ) {
  304               throw ex;
  305           } catch( InstantiationException ex1 ) {
  306               throw ex1;
  307           }
  308           initialized = true;
  309       }
  310       
  311       public void startEndpoint() throws IOException, InstantiationException {
  312           if (!initialized) {
  313               initEndpoint();
  314           }
  315           if (lf) {
  316               tp.start();
  317           }
  318           running = true;
  319           paused = false;
  320           if (lf) {
  321               listener = new LeaderFollowerWorkerThread(this);
  322               tp.runIt(listener);
  323           } else {
  324               maxThreads = getMaxThreads();
  325               threadStart();
  326           }
  327       }
  328   
  329       public void pauseEndpoint() {
  330           if (running && !paused) {
  331               paused = true;
  332               unlockAccept();
  333           }
  334       }
  335   
  336       public void resumeEndpoint() {
  337           if (running) {
  338               paused = false;
  339           }
  340       }
  341   
  342       public void stopEndpoint() {
  343           if (running) {
  344               if (lf) {
  345                   tp.shutdown();
  346               }
  347               running = false;
  348               if (serverSocket != null) {
  349                   closeServerSocket();
  350               }
  351               if (!lf) {
  352                   threadStop();
  353               }
  354               initialized=false ;
  355           }
  356       }
  357   
  358       protected void closeServerSocket() {
  359           if (!paused)
  360               unlockAccept();
  361           try {
  362               if( serverSocket!=null)
  363                   serverSocket.close();
  364           } catch(Exception e) {
  365               log.error(sm.getString("endpoint.err.close"), e);
  366           }
  367           serverSocket = null;
  368       }
  369   
  370       protected void unlockAccept() {
  371           Socket s = null;
  372           try {
  373               // Need to create a connection to unlock the accept();
  374               if (inet == null) {
  375                   s = new Socket(InetAddress.getByName("localhost").getHostAddress(), port);
  376               } else {
  377                   s = new Socket(inet, port);
  378                       // setting soLinger to a small value will help shutdown the
  379                       // connection quicker
  380                   s.setSoLinger(true, 0);
  381               }
  382           } catch(Exception e) {
  383               if (log.isDebugEnabled()) {
  384                   log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
  385               }
  386           } finally {
  387               if (s != null) {
  388                   try {
  389                       s.close();
  390                   } catch (Exception e) {
  391                       // Ignore
  392                   }
  393               }
  394           }
  395       }
  396   
  397       // -------------------- Private methods
  398   
  399       Socket acceptSocket() {
  400           if( !running || serverSocket==null ) return null;
  401   
  402           Socket accepted = null;
  403   
  404       	try {
  405               if(factory==null) {
  406                   accepted = serverSocket.accept();
  407               } else {
  408                   accepted = factory.acceptSocket(serverSocket);
  409               }
  410               if (null == accepted) {
  411                   log.warn(sm.getString("endpoint.warn.nullSocket"));
  412               } else {
  413                   if (!running) {
  414                       accepted.close();  // rude, but unlikely!
  415                       accepted = null;
  416                   } else if (factory != null) {
  417                       factory.initSocket( accepted );
  418                   }
  419               }
  420           }
  421           catch(InterruptedIOException iioe) {
  422               // normal part -- should happen regularly so
  423               // that the endpoint can release if the server
  424               // is shutdown.
  425           }
  426           catch (AccessControlException ace) {
  427               // When using the Java SecurityManager this exception
  428               // can be thrown if you are restricting access to the
  429               // socket with SocketPermission's.
  430               // Log the unauthorized access and continue
  431               String msg = sm.getString("endpoint.warn.security",
  432                                         serverSocket, ace);
  433               log.warn(msg);
  434           }
  435           catch (IOException e) {
  436   
  437               String msg = null;
  438   
  439               if (running) {
  440                   msg = sm.getString("endpoint.err.nonfatal",
  441                           serverSocket, e);
  442                   log.error(msg, e);
  443               }
  444   
  445               if (accepted != null) {
  446                   try {
  447                       accepted.close();
  448                   } catch(Throwable ex) {
  449                       msg = sm.getString("endpoint.err.nonfatal",
  450                                          accepted, ex);
  451                       log.warn(msg, ex);
  452                   }
  453                   accepted = null;
  454               }
  455   
  456               if( ! running ) return null;
  457               reinitializing = true;
  458               // Restart endpoint when getting an IOException during accept
  459               synchronized (threadSync) {
  460                   if (reinitializing) {
  461                       reinitializing = false;
  462                       // 1) Attempt to close server socket
  463                       closeServerSocket();
  464                       initialized = false;
  465                       // 2) Reinit endpoint (recreate server socket)
  466                       try {
  467                           msg = sm.getString("endpoint.warn.reinit");
  468                           log.warn(msg);
  469                           initEndpoint();
  470                       } catch (Throwable t) {
  471                           msg = sm.getString("endpoint.err.nonfatal",
  472                                              serverSocket, t);
  473                           log.error(msg, t);
  474                       }
  475                       // 3) If failed, attempt to restart endpoint
  476                       if (!initialized) {
  477                           msg = sm.getString("endpoint.warn.restart");
  478                           log.warn(msg);
  479                           try {
  480                               stopEndpoint();
  481                               initEndpoint();
  482                               startEndpoint();
  483                           } catch (Throwable t) {
  484                               msg = sm.getString("endpoint.err.fatal",
  485                                                  serverSocket, t);
  486                               log.error(msg, t);
  487                           }
  488                           // Current thread is now invalid: kill it
  489                           throw new ThreadDeath();
  490                       }
  491                   }
  492               }
  493   
  494           }
  495   
  496           return accepted;
  497       }
  498   
  499       void setSocketOptions(Socket socket)
  500           throws SocketException {
  501           if(linger >= 0 ) 
  502               socket.setSoLinger( true, linger);
  503           if( tcpNoDelay )
  504               socket.setTcpNoDelay(tcpNoDelay);
  505           if( socketTimeout > 0 )
  506               socket.setSoTimeout( socketTimeout );
  507       }
  508   
  509       
  510       void processSocket(Socket s, TcpConnection con, Object[] threadData) {
  511           // Process the connection
  512           int step = 1;
  513           try {
  514               
  515               // 1: Set socket options: timeout, linger, etc
  516               setSocketOptions(s);
  517               
  518               // 2: SSL handshake
  519               step = 2;
  520               if (getServerSocketFactory() != null) {
  521                   getServerSocketFactory().handshake(s);
  522               }
  523               
  524               // 3: Process the connection
  525               step = 3;
  526               con.setEndpoint(this);
  527               con.setSocket(s);
  528               getConnectionHandler().processConnection(con, threadData);
  529               
  530           } catch (SocketException se) {
  531               log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()),
  532                       se);
  533               // Try to close the socket
  534               try {
  535                   s.close();
  536               } catch (IOException e) {
  537               }
  538           } catch (Throwable t) {
  539               if (step == 2) {
  540                   if (log.isDebugEnabled()) {
  541                       log.debug(sm.getString("endpoint.err.handshake"), t);
  542                   }
  543               } else {
  544                   log.error(sm.getString("endpoint.err.unexpected"), t);
  545               }
  546               // Try to close the socket
  547               try {
  548                   s.close();
  549               } catch (IOException e) {
  550               }
  551           } finally {
  552               if (con != null) {
  553                   con.recycle();
  554               }
  555           }
  556       }
  557       
  558   
  559       // -------------------------------------------------- Master Slave Methods
  560   
  561   
  562       /**
  563        * Create (or allocate) and return an available processor for use in
  564        * processing a specific HTTP request, if possible.  If the maximum
  565        * allowed processors have already been created and are in use, return
  566        * <code>null</code> instead.
  567        */
  568       private MasterSlaveWorkerThread createWorkerThread() {
  569   
  570           synchronized (workerThreads) {
  571               if (workerThreads.size() > 0) {
  572                   return ((MasterSlaveWorkerThread) workerThreads.pop());
  573               }
  574               if ((maxThreads > 0) && (curThreads < maxThreads)) {
  575                   return (newWorkerThread());
  576               } else {
  577                   if (maxThreads < 0) {
  578                       return (newWorkerThread());
  579                   } else {
  580                       return (null);
  581                   }
  582               }
  583           }
  584   
  585       }
  586   
  587       
  588       /**
  589        * Create and return a new processor suitable for processing HTTP
  590        * requests and returning the corresponding responses.
  591        */
  592       private MasterSlaveWorkerThread newWorkerThread() {
  593   
  594           MasterSlaveWorkerThread workerThread = 
  595               new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
  596           workerThread.start();
  597           created.addElement(workerThread);
  598           return (workerThread);
  599   
  600       }
  601   
  602   
  603       /**
  604        * Recycle the specified Processor so that it can be used again.
  605        *
  606        * @param processor The processor to be recycled
  607        */
  608       void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
  609           workerThreads.push(workerThread);
  610       }
  611   
  612       
  613       /**
  614        * The background thread that listens for incoming TCP/IP connections and
  615        * hands them off to an appropriate processor.
  616        */
  617       public void run() {
  618   
  619           // Loop until we receive a shutdown command
  620           while (running) {
  621   
  622               // Loop if endpoint is paused
  623               while (paused) {
  624                   try {
  625                       Thread.sleep(1000);
  626                   } catch (InterruptedException e) {
  627                       // Ignore
  628                   }
  629               }
  630   
  631               // Allocate a new worker thread
  632               MasterSlaveWorkerThread workerThread = createWorkerThread();
  633               if (workerThread == null) {
  634                   try {
  635                       // Wait a little for load to go down: as a result, 
  636                       // no accept will be made until the concurrency is
  637                       // lower than the specified maxThreads, and current
  638                       // connections will wait for a little bit instead of
  639                       // failing right away.
  640                       Thread.sleep(100);
  641                   } catch (InterruptedException e) {
  642                       // Ignore
  643                   }
  644                   continue;
  645               }
  646               
  647               // Accept the next incoming connection from the server socket
  648               Socket socket = acceptSocket();
  649   
  650               // Hand this socket off to an appropriate processor
  651               workerThread.assign(socket);
  652   
  653               // The processor will recycle itself when it finishes
  654   
  655           }
  656   
  657           // Notify the threadStop() method that we have shut ourselves down
  658           synchronized (threadSync) {
  659               threadSync.notifyAll();
  660           }
  661   
  662       }
  663   
  664   
  665       /**
  666        * Start the background processing thread.
  667        */
  668       private void threadStart() {
  669           thread = new Thread(this, tp.getName());
  670           thread.setPriority(getThreadPriority());
  671           thread.setDaemon(true);
  672           thread.start();
  673       }
  674   
  675   
  676       /**
  677        * Stop the background processing thread.
  678        */
  679       private void threadStop() {
  680           thread = null;
  681       }
  682   
  683   
  684   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » tomcat » util » net » [javadoc | source]