Home » apache-tomcat-6.0.26-src » org.apache » tomcat » util » net » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one or more
    3    *  contributor license agreements.  See the NOTICE file distributed with
    4    *  this work for additional information regarding copyright ownership.
    5    *  The ASF licenses this file to You under the Apache License, Version 2.0
    6    *  (the "License"); you may not use this file except in compliance with
    7    *  the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   
   18   package org.apache.tomcat.util.net;
   19   
   20   import java.io.IOException;
   21   import java.io.InterruptedIOException;
   22   import java.net.BindException;
   23   import java.net.InetAddress;
   24   import java.net.ServerSocket;
   25   import java.net.Socket;
   26   import java.net.SocketException;
   27   import java.security.AccessControlException;
   28   import java.util.Stack;
   29   import java.util.Vector;
   30   
   31   import org.apache.juli.logging.Log;
   32   import org.apache.juli.logging.LogFactory;
   33   import org.apache.tomcat.util.res.StringManager;
   34   import org.apache.tomcat.util.threads.ThreadPool;
   35   import org.apache.tomcat.util.threads.ThreadPoolRunnable;
   36   
   37   /* Similar with MPM module in Apache2.0. Handles all the details related with
   38      "tcp server" functionality - thread management, accept policy, etc.
   39      It should do nothing more - as soon as it get a socket ( and all socket options
   40      are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin)
   41   */
   42   
   43   
   44   
   45   /**
   46    * Handle incoming TCP connections.
   47    *
   48    * This class implement a simple server model: one listener thread accepts on a socket and
   49    * creates a new worker thread for each incoming connection.
   50    *
   51    * More advanced Endpoints will reuse the threads, use queues, etc.
   52    *
   53    * @author James Duncan Davidson [duncan@eng.sun.com]
   54    * @author Jason Hunter [jch@eng.sun.com]
   55    * @author James Todd [gonzo@eng.sun.com]
   56    * @author Costin@eng.sun.com
   57    * @author Gal Shachor [shachor@il.ibm.com]
   58    * @author Yoav Shapira <yoavs@apache.org>
   59    */
   60   public class PoolTcpEndpoint implements Runnable { // implements Endpoint {
   61   
   62       static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
   63   
   64       private StringManager sm = 
   65           StringManager.getManager("org.apache.tomcat.util.net.res");
   66   
   67       private static final int BACKLOG = 100;
   68       private static final int TIMEOUT = 1000;
   69   
   70       private final Object threadSync = new Object();
   71   
   72       private int backlog = BACKLOG;
   73       private int serverTimeout = TIMEOUT;
   74   
   75       private InetAddress inet;
   76       private int port;
   77   
   78       private ServerSocketFactory factory;
   79       private ServerSocket serverSocket;
   80   
   81       private volatile boolean running = false;
   82       private volatile boolean paused = false;
   83       private boolean initialized = false;
   84       private boolean reinitializing = false;
   85       static final int debug=0;
   86   
   87       protected boolean tcpNoDelay=false;
   88       protected int linger=100;
   89       protected int socketTimeout=-1;
   90       private boolean lf = true;
   91   
   92       
   93       // ------ Leader follower fields
   94   
   95       
   96       TcpConnectionHandler handler;
   97       ThreadPoolRunnable listener;
   98       ThreadPool tp;
   99   
  100       
  101       // ------ Master slave fields
  102   
  103       /* The background thread. */
  104       private Thread thread = null;
  105       /* Available processors. */
  106       private Stack workerThreads = new Stack();
  107       private int curThreads = 0;
  108       private int maxThreads = 20;
  109       /* All processors which have been created. */
  110       private Vector created = new Vector();
  111   
  112       
  113       public PoolTcpEndpoint() {
  114   	tp = new ThreadPool();
  115       }
  116   
  117       public PoolTcpEndpoint( ThreadPool tp ) {
  118           this.tp=tp;
  119       }
  120   
  121       // -------------------- Configuration --------------------
  122   
  123       public void setMaxThreads(int maxThreads) {
  124   	if( maxThreads > 0)
  125   	    tp.setMaxThreads(maxThreads);
  126       }
  127   
  128       public int getMaxThreads() {
  129           return tp.getMaxThreads();
  130       }
  131   
  132       public void setMaxSpareThreads(int maxThreads) {
  133   	if(maxThreads > 0) 
  134   	    tp.setMaxSpareThreads(maxThreads);
  135       }
  136   
  137       public int getMaxSpareThreads() {
  138           return tp.getMaxSpareThreads();
  139       }
  140   
  141       public void setMinSpareThreads(int minThreads) {
  142   	if(minThreads > 0) 
  143   	    tp.setMinSpareThreads(minThreads);
  144       }
  145   
  146       public int getMinSpareThreads() {
  147           return tp.getMinSpareThreads();
  148       }
  149   
  150       public void setThreadPriority(int threadPriority) {
  151         tp.setThreadPriority(threadPriority);
  152       }
  153   
  154       public int getThreadPriority() {
  155         return tp.getThreadPriority();
  156       }
  157   
  158       public int getPort() {
  159           return port;
  160       }
  161   
  162       public void setPort(int port ) {
  163           this.port=port;
  164       }
  165   
  166       public InetAddress getAddress() {
  167   	    return inet;
  168       }
  169   
  170       public void setAddress(InetAddress inet) {
  171   	    this.inet=inet;
  172       }
  173   
  174       public void setServerSocket(ServerSocket ss) {
  175   	    serverSocket = ss;
  176       }
  177   
  178       public void setServerSocketFactory(  ServerSocketFactory factory ) {
  179   	    this.factory=factory;
  180       }
  181   
  182      ServerSocketFactory getServerSocketFactory() {
  183    	    return factory;
  184      }
  185   
  186       public void setConnectionHandler( TcpConnectionHandler handler ) {
  187       	this.handler=handler;
  188       }
  189   
  190       public TcpConnectionHandler getConnectionHandler() {
  191   	    return handler;
  192       }
  193   
  194       public boolean isRunning() {
  195   	return running;
  196       }
  197       
  198       public boolean isPaused() {
  199   	return paused;
  200       }
  201       
  202       /**
  203        * Allows the server developer to specify the backlog that
  204        * should be used for server sockets. By default, this value
  205        * is 100.
  206        */
  207       public void setBacklog(int backlog) {
  208   	if( backlog>0)
  209   	    this.backlog = backlog;
  210       }
  211   
  212       public int getBacklog() {
  213           return backlog;
  214       }
  215   
  216       /**
  217        * Sets the timeout in ms of the server sockets created by this
  218        * server. This method allows the developer to make servers
  219        * more or less responsive to having their server sockets
  220        * shut down.
  221        *
  222        * <p>By default this value is 1000ms.
  223        */
  224       public void setServerTimeout(int timeout) {
  225   	this.serverTimeout = timeout;
  226       }
  227   
  228       public boolean getTcpNoDelay() {
  229           return tcpNoDelay;
  230       }
  231       
  232       public void setTcpNoDelay( boolean b ) {
  233   	tcpNoDelay=b;
  234       }
  235   
  236       public int getSoLinger() {
  237           return linger;
  238       }
  239       
  240       public void setSoLinger( int i ) {
  241   	linger=i;
  242       }
  243   
  244       public int getSoTimeout() {
  245           return socketTimeout;
  246       }
  247       
  248       public void setSoTimeout( int i ) {
  249   	socketTimeout=i;
  250       }
  251       
  252       public int getServerSoTimeout() {
  253           return serverTimeout;
  254       }  
  255       
  256       public void setServerSoTimeout( int i ) {
  257   	serverTimeout=i;
  258       }
  259   
  260       public String getStrategy() {
  261           if (lf) {
  262               return "lf";
  263           } else {
  264               return "ms";
  265           }
  266       }
  267       
  268       public void setStrategy(String strategy) {
  269           if ("ms".equals(strategy)) {
  270               lf = false;
  271           } else {
  272               lf = true;
  273           }
  274       }
  275   
  276       public int getCurrentThreadCount() {
  277           return curThreads;
  278       }
  279       
  280       public int getCurrentThreadsBusy() {
  281           return curThreads - workerThreads.size();
  282       }
  283       
  284       // -------------------- Public methods --------------------
  285   
  286       public void initEndpoint() throws IOException, InstantiationException {
  287           try {
  288               if(factory==null)
  289                   factory=ServerSocketFactory.getDefault();
  290               if(serverSocket==null) {
  291                   try {
  292                       if (inet == null) {
  293                           serverSocket = factory.createSocket(port, backlog);
  294                       } else {
  295                           serverSocket = factory.createSocket(port, backlog, inet);
  296                       }
  297                   } catch (BindException orig) {
  298                       String msg;
  299                       if (inet == null)
  300                           msg = orig.getMessage() + "<null>:" + port;
  301                       else
  302                           msg = orig.getMessage() + " " +
  303                                   inet.toString() + ":" + port;
  304                       BindException be = new BindException(msg);
  305                       be.initCause(orig);
  306                       throw be;
  307                   }
  308               }
  309               if( serverTimeout >= 0 )
  310                   serverSocket.setSoTimeout( serverTimeout );
  311           } catch( IOException ex ) {
  312               throw ex;
  313           } catch( InstantiationException ex1 ) {
  314               throw ex1;
  315           }
  316           initialized = true;
  317       }
  318       
  319       public void startEndpoint() throws IOException, InstantiationException {
  320           if (!initialized) {
  321               initEndpoint();
  322           }
  323           if (lf) {
  324               tp.start();
  325           }
  326           running = true;
  327           paused = false;
  328           if (lf) {
  329               listener = new LeaderFollowerWorkerThread(this);
  330               tp.runIt(listener);
  331           } else {
  332               maxThreads = getMaxThreads();
  333               threadStart();
  334           }
  335       }
  336   
  337       public void pauseEndpoint() {
  338           if (running && !paused) {
  339               paused = true;
  340               unlockAccept();
  341           }
  342       }
  343   
  344       public void resumeEndpoint() {
  345           if (running) {
  346               paused = false;
  347           }
  348       }
  349   
  350       public void stopEndpoint() {
  351           if (running) {
  352               if (lf) {
  353                   tp.shutdown();
  354               }
  355               running = false;
  356               if (serverSocket != null) {
  357                   closeServerSocket();
  358               }
  359               if (!lf) {
  360                   threadStop();
  361               }
  362               initialized=false ;
  363           }
  364       }
  365   
  366       protected void closeServerSocket() {
  367           if (!paused)
  368               unlockAccept();
  369           try {
  370               if( serverSocket!=null)
  371                   serverSocket.close();
  372           } catch(Exception e) {
  373               log.error(sm.getString("endpoint.err.close"), e);
  374           }
  375           serverSocket = null;
  376       }
  377   
  378       protected void unlockAccept() {
  379           Socket s = null;
  380           try {
  381               // Need to create a connection to unlock the accept();
  382               if (inet == null) {
  383                   s = new Socket(InetAddress.getByName("localhost").getHostAddress(), port);
  384               } else {
  385                   s = new Socket(inet, port);
  386                       // setting soLinger to a small value will help shutdown the
  387                       // connection quicker
  388                   s.setSoLinger(true, 0);
  389               }
  390           } catch(Exception e) {
  391               if (log.isDebugEnabled()) {
  392                   log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
  393               }
  394           } finally {
  395               if (s != null) {
  396                   try {
  397                       s.close();
  398                   } catch (Exception e) {
  399                       // Ignore
  400                   }
  401               }
  402           }
  403       }
  404   
  405       // -------------------- Private methods
  406   
  407       Socket acceptSocket() {
  408           if( !running || serverSocket==null ) return null;
  409   
  410           Socket accepted = null;
  411   
  412       	try {
  413               if(factory==null) {
  414                   accepted = serverSocket.accept();
  415               } else {
  416                   accepted = factory.acceptSocket(serverSocket);
  417               }
  418               if (null == accepted) {
  419                   log.warn(sm.getString("endpoint.warn.nullSocket"));
  420               } else {
  421                   if (!running) {
  422                       accepted.close();  // rude, but unlikely!
  423                       accepted = null;
  424                   } else if (factory != null) {
  425                       factory.initSocket( accepted );
  426                   }
  427               }
  428           }
  429           catch(InterruptedIOException iioe) {
  430               // normal part -- should happen regularly so
  431               // that the endpoint can release if the server
  432               // is shutdown.
  433           }
  434           catch (AccessControlException ace) {
  435               // When using the Java SecurityManager this exception
  436               // can be thrown if you are restricting access to the
  437               // socket with SocketPermission's.
  438               // Log the unauthorized access and continue
  439               String msg = sm.getString("endpoint.warn.security",
  440                                         serverSocket, ace);
  441               log.warn(msg);
  442           }
  443           catch (IOException e) {
  444   
  445               String msg = null;
  446   
  447               if (running) {
  448                   msg = sm.getString("endpoint.err.nonfatal",
  449                           serverSocket, e);
  450                   log.error(msg, e);
  451               }
  452   
  453               if (accepted != null) {
  454                   try {
  455                       accepted.close();
  456                   } catch(Throwable ex) {
  457                       msg = sm.getString("endpoint.err.nonfatal",
  458                                          accepted, ex);
  459                       log.warn(msg, ex);
  460                   }
  461                   accepted = null;
  462               }
  463   
  464               if( ! running ) return null;
  465               reinitializing = true;
  466               // Restart endpoint when getting an IOException during accept
  467               synchronized (threadSync) {
  468                   if (reinitializing) {
  469                       reinitializing = false;
  470                       // 1) Attempt to close server socket
  471                       closeServerSocket();
  472                       initialized = false;
  473                       // 2) Reinit endpoint (recreate server socket)
  474                       try {
  475                           msg = sm.getString("endpoint.warn.reinit");
  476                           log.warn(msg);
  477                           initEndpoint();
  478                       } catch (Throwable t) {
  479                           msg = sm.getString("endpoint.err.nonfatal",
  480                                              serverSocket, t);
  481                           log.error(msg, t);
  482                       }
  483                       // 3) If failed, attempt to restart endpoint
  484                       if (!initialized) {
  485                           msg = sm.getString("endpoint.warn.restart");
  486                           log.warn(msg);
  487                           try {
  488                               stopEndpoint();
  489                               initEndpoint();
  490                               startEndpoint();
  491                           } catch (Throwable t) {
  492                               msg = sm.getString("endpoint.err.fatal",
  493                                                  serverSocket, t);
  494                               log.error(msg, t);
  495                           }
  496                           // Current thread is now invalid: kill it
  497                           throw new ThreadDeath();
  498                       }
  499                   }
  500               }
  501   
  502           }
  503   
  504           return accepted;
  505       }
  506   
  507       void setSocketOptions(Socket socket)
  508           throws SocketException {
  509           if(linger >= 0 ) 
  510               socket.setSoLinger( true, linger);
  511           if( tcpNoDelay )
  512               socket.setTcpNoDelay(tcpNoDelay);
  513           if( socketTimeout > 0 )
  514               socket.setSoTimeout( socketTimeout );
  515       }
  516   
  517       
  518       void processSocket(Socket s, TcpConnection con, Object[] threadData) {
  519           // Process the connection
  520           int step = 1;
  521           try {
  522               
  523               // 1: Set socket options: timeout, linger, etc
  524               setSocketOptions(s);
  525               
  526               // 2: SSL handshake
  527               step = 2;
  528               if (getServerSocketFactory() != null) {
  529                   getServerSocketFactory().handshake(s);
  530               }
  531               
  532               // 3: Process the connection
  533               step = 3;
  534               con.setEndpoint(this);
  535               con.setSocket(s);
  536               getConnectionHandler().processConnection(con, threadData);
  537               
  538           } catch (SocketException se) {
  539               log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()),
  540                       se);
  541               // Try to close the socket
  542               try {
  543                   s.close();
  544               } catch (IOException e) {
  545               }
  546           } catch (Throwable t) {
  547               if (step == 2) {
  548                   if (log.isDebugEnabled()) {
  549                       log.debug(sm.getString("endpoint.err.handshake"), t);
  550                   }
  551               } else {
  552                   log.error(sm.getString("endpoint.err.unexpected"), t);
  553               }
  554               // Try to close the socket
  555               try {
  556                   s.close();
  557               } catch (IOException e) {
  558               }
  559           } finally {
  560               if (con != null) {
  561                   con.recycle();
  562               }
  563           }
  564       }
  565       
  566   
  567       // -------------------------------------------------- Master Slave Methods
  568   
  569   
  570       /**
  571        * Create (or allocate) and return an available processor for use in
  572        * processing a specific HTTP request, if possible.  If the maximum
  573        * allowed processors have already been created and are in use, return
  574        * <code>null</code> instead.
  575        */
  576       private MasterSlaveWorkerThread createWorkerThread() {
  577   
  578           synchronized (workerThreads) {
  579               if (workerThreads.size() > 0) {
  580                   return ((MasterSlaveWorkerThread) workerThreads.pop());
  581               }
  582               if ((maxThreads > 0) && (curThreads < maxThreads)) {
  583                   return (newWorkerThread());
  584               } else {
  585                   if (maxThreads < 0) {
  586                       return (newWorkerThread());
  587                   } else {
  588                       return (null);
  589                   }
  590               }
  591           }
  592   
  593       }
  594   
  595       
  596       /**
  597        * Create and return a new processor suitable for processing HTTP
  598        * requests and returning the corresponding responses.
  599        */
  600       private MasterSlaveWorkerThread newWorkerThread() {
  601   
  602           MasterSlaveWorkerThread workerThread = 
  603               new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
  604           workerThread.start();
  605           created.addElement(workerThread);
  606           return (workerThread);
  607   
  608       }
  609   
  610   
  611       /**
  612        * Recycle the specified Processor so that it can be used again.
  613        *
  614        * @param processor The processor to be recycled
  615        */
  616       void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
  617           workerThreads.push(workerThread);
  618       }
  619   
  620       
  621       /**
  622        * The background thread that listens for incoming TCP/IP connections and
  623        * hands them off to an appropriate processor.
  624        */
  625       public void run() {
  626   
  627           // Loop until we receive a shutdown command
  628           while (running) {
  629   
  630               // Loop if endpoint is paused
  631               while (paused) {
  632                   try {
  633                       Thread.sleep(1000);
  634                   } catch (InterruptedException e) {
  635                       // Ignore
  636                   }
  637               }
  638   
  639               // Allocate a new worker thread
  640               MasterSlaveWorkerThread workerThread = createWorkerThread();
  641               if (workerThread == null) {
  642                   try {
  643                       // Wait a little for load to go down: as a result, 
  644                       // no accept will be made until the concurrency is
  645                       // lower than the specified maxThreads, and current
  646                       // connections will wait for a little bit instead of
  647                       // failing right away.
  648                       Thread.sleep(100);
  649                   } catch (InterruptedException e) {
  650                       // Ignore
  651                   }
  652                   continue;
  653               }
  654               
  655               // Accept the next incoming connection from the server socket
  656               Socket socket = acceptSocket();
  657   
  658               // Hand this socket off to an appropriate processor
  659               workerThread.assign(socket);
  660   
  661               // The processor will recycle itself when it finishes
  662   
  663           }
  664   
  665           // Notify the threadStop() method that we have shut ourselves down
  666           synchronized (threadSync) {
  667               threadSync.notifyAll();
  668           }
  669   
  670       }
  671   
  672   
  673       /**
  674        * Start the background processing thread.
  675        */
  676       private void threadStart() {
  677           thread = new Thread(this, tp.getName());
  678           thread.setPriority(getThreadPriority());
  679           thread.setDaemon(true);
  680           thread.start();
  681       }
  682   
  683   
  684       /**
  685        * Stop the background processing thread.
  686        */
  687       private void threadStop() {
  688           thread = null;
  689       }
  690   
  691   
  692   }

Home » apache-tomcat-6.0.26-src » org.apache » tomcat » util » net » [javadoc | source]