Home » apache-tomcat-6.0.26-src » org.apache » tomcat » util » net » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one or more
    3    *  contributor license agreements.  See the NOTICE file distributed with
    4    *  this work for additional information regarding copyright ownership.
    5    *  The ASF licenses this file to You under the Apache License, Version 2.0
    6    *  (the "License"); you may not use this file except in compliance with
    7    *  the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   
   18   package org.apache.tomcat.util.net;
   19   
   20   import java.io.File;
   21   import java.io.FileInputStream;
   22   import java.io.IOException;
   23   import java.net.InetAddress;
   24   import java.net.InetSocketAddress;
   25   import java.net.Socket;
   26   import java.net.SocketAddress;
   27   import java.net.SocketTimeoutException;
   28   import java.nio.ByteBuffer;
   29   import java.nio.channels.CancelledKeyException;
   30   import java.nio.channels.FileChannel;
   31   import java.nio.channels.SelectionKey;
   32   import java.nio.channels.Selector;
   33   import java.nio.channels.ServerSocketChannel;
   34   import java.nio.channels.SocketChannel;
   35   import java.nio.channels.WritableByteChannel;
   36   import java.security.KeyStore;
   37   import java.util.Collection;
   38   import java.util.Iterator;
   39   import java.util.Set;
   40   import java.util.StringTokenizer;
   41   import java.util.concurrent.ConcurrentLinkedQueue;
   42   import java.util.concurrent.CountDownLatch;
   43   import java.util.concurrent.Executor;
   44   import java.util.concurrent.LinkedBlockingQueue;
   45   import java.util.concurrent.ThreadFactory;
   46   import java.util.concurrent.ThreadPoolExecutor;
   47   import java.util.concurrent.TimeUnit;
   48   import java.util.concurrent.atomic.AtomicInteger;
   49   import java.util.concurrent.atomic.AtomicLong;
   50   
   51   import javax.net.ssl.KeyManager;
   52   import javax.net.ssl.KeyManagerFactory;
   53   import javax.net.ssl.SSLContext;
   54   import javax.net.ssl.SSLEngine;
   55   import javax.net.ssl.SSLSessionContext;
   56   import javax.net.ssl.TrustManagerFactory;
   57   import javax.net.ssl.X509KeyManager;
   58   
   59   import org.apache.juli.logging.Log;
   60   import org.apache.juli.logging.LogFactory;
   61   import org.apache.tomcat.util.IntrospectionUtils;
   62   import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
   63   import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
   64   import org.apache.tomcat.util.res.StringManager;
   65   
   66   /**
   67    * NIO tailored thread pool, providing the following services:
   68    * <ul>
   69    * <li>Socket acceptor thread</li>
   70    * <li>Socket poller thread</li>
   71    * <li>Worker threads pool</li>
   72    * </ul>
   73    *
   74    * When switching to Java 5, there's an opportunity to use the virtual
   75    * machine's thread pool.
   76    *
   77    * @author Mladen Turk
   78    * @author Remy Maucherat
   79    * @author Filip Hanik
   80    */
   81   public class NioEndpoint {
   82   
   83   
   84       // -------------------------------------------------------------- Constants
   85   
   86   
   87       protected static Log log = LogFactory.getLog(NioEndpoint.class);
   88   
   89       protected static StringManager sm =
   90           StringManager.getManager("org.apache.tomcat.util.net.res");
   91   
   92   
   93       /**
   94        * The Request attribute key for the cipher suite.
   95        */
   96       public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
   97   
   98       /**
   99        * The Request attribute key for the key size.
  100        */
  101       public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
  102   
  103       /**
  104        * The Request attribute key for the client certificate chain.
  105        */
  106       public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
  107   
  108       /**
  109        * The Request attribute key for the session id.
  110        * This one is a Tomcat extension to the Servlet spec.
  111        */
  112       public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
  113   
  114       public static final int OP_REGISTER = 0x100; //register interest op
  115       public static final int OP_CALLBACK = 0x200; //callback interest op
  116       
  117       // ----------------------------------------------------------------- Fields
  118   
  119   
  120       /**
  121        * Available workers.
  122        */
  123       protected WorkerStack workers = null;
  124   
  125   
  126       /**
  127        * Running state of the endpoint.
  128        */
  129       protected volatile boolean running = false;
  130   
  131   
  132       /**
  133        * Will be set to true whenever the endpoint is paused.
  134        */
  135       protected volatile boolean paused = false;
  136   
  137   
  138       /**
  139        * Track the initialization state of the endpoint.
  140        */
  141       protected boolean initialized = false;
  142   
  143   
  144       /**
  145        * Current worker threads busy count.
  146        */
  147       protected int curThreadsBusy = 0;
  148   
  149   
  150       /**
  151        * Current worker threads count.
  152        */
  153       protected int curThreads = 0;
  154   
  155   
  156       /**
  157        * Sequence number used to generate thread names.
  158        */
  159       protected int sequence = 0;
  160       
  161       protected NioSelectorPool selectorPool = new NioSelectorPool();
  162       
  163       /**
  164        * Server socket "pointer".
  165        */
  166       protected ServerSocketChannel serverSock = null;
  167       
  168       /**
  169        * use send file
  170        */
  171       protected boolean useSendfile = true;
  172       
  173       /**
  174        * The size of the OOM parachute.
  175        */
  176       protected int oomParachute = 1024*1024;
  177       /**
  178        * The oom parachute, when an OOM error happens, 
  179        * will release the data, giving the JVM instantly 
  180        * a chunk of data to be able to recover with.
  181        */
  182       protected byte[] oomParachuteData = null;
  183       
  184       /**
  185        * Make sure this string has already been allocated
  186        */
  187       protected static final String oomParachuteMsg = 
  188           "SEVERE:Memory usage is low, parachute is non existent, your system may start failing.";
  189       
  190       /**
  191        * Keep track of OOM warning messages.
  192        */
  193       long lastParachuteCheck = System.currentTimeMillis();
  194       
  195       /**
  196        * Keep track of how many threads are in use
  197        */
  198       protected AtomicInteger activeSocketProcessors = new AtomicInteger(0);
  199       
  200       /**
  201        * 
  202        */
  203       protected volatile CountDownLatch stopLatch = null;
  204       
  205       /**
  206        * Cache for SocketProcessor objects
  207        */
  208       protected ConcurrentLinkedQueue<SocketProcessor> processorCache = new ConcurrentLinkedQueue<SocketProcessor>() {
  209           protected AtomicInteger size = new AtomicInteger(0);
  210           public boolean offer(SocketProcessor sc) {
  211               sc.reset(null,null);
  212               boolean offer = socketProperties.getProcessorCache()==-1?true:size.get()<socketProperties.getProcessorCache();
  213               //avoid over growing our cache or add after we have stopped
  214               if ( running && (!paused) && (offer) ) {
  215                   boolean result = super.offer(sc);
  216                   if ( result ) {
  217                       size.incrementAndGet();
  218                   }
  219                   return result;
  220               }
  221               else return false;
  222           }
  223           
  224           public SocketProcessor poll() {
  225               SocketProcessor result = super.poll();
  226               if ( result != null ) {
  227                   size.decrementAndGet();
  228               }
  229               return result;
  230           }
  231           
  232           public void clear() {
  233               super.clear();
  234               size.set(0);
  235           }
  236       };
  237   
  238   
  239       /**
  240        * Cache for key attachment objects
  241        */
  242       protected ConcurrentLinkedQueue<KeyAttachment> keyCache = new ConcurrentLinkedQueue<KeyAttachment>() {
  243           protected AtomicInteger size = new AtomicInteger(0);
  244           public boolean offer(KeyAttachment ka) {
  245               ka.reset();
  246               boolean offer = socketProperties.getKeyCache()==-1?true:size.get()<socketProperties.getKeyCache();
  247               //avoid over growing our cache or add after we have stopped
  248               if ( running && (!paused) && (offer) ) {
  249                   boolean result = super.offer(ka);
  250                   if ( result ) {
  251                       size.incrementAndGet();
  252                   }
  253                   return result;
  254               }
  255               else return false;
  256           }
  257   
  258           public KeyAttachment poll() {
  259               KeyAttachment result = super.poll();
  260               if ( result != null ) {
  261                   size.decrementAndGet();
  262               }
  263               return result;
  264           }
  265   
  266           public void clear() {
  267               super.clear();
  268               size.set(0);
  269           }
  270       };
  271   
  272       
  273       /**
  274        * Cache for poller events
  275        */
  276       protected ConcurrentLinkedQueue<PollerEvent> eventCache = new ConcurrentLinkedQueue<PollerEvent>() {
  277           protected AtomicInteger size = new AtomicInteger(0);
  278           public boolean offer(PollerEvent pe) {
  279               pe.reset();
  280               boolean offer = socketProperties.getEventCache()==-1?true:size.get()<socketProperties.getEventCache();
  281               //avoid over growing our cache or add after we have stopped
  282               if ( running && (!paused) && (offer) ) {
  283                   boolean result = super.offer(pe);
  284                   if ( result ) {
  285                       size.incrementAndGet();
  286                   }
  287                   return result;
  288               }
  289               else return false;
  290           }
  291   
  292           public PollerEvent poll() {
  293               PollerEvent result = super.poll();
  294               if ( result != null ) {
  295                   size.decrementAndGet();
  296               }
  297               return result;
  298           }
  299   
  300           public void clear() {
  301               super.clear();
  302               size.set(0);
  303           }
  304       };
  305   
  306   
  307       /**
  308        * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
  309        */
  310       protected ConcurrentLinkedQueue<NioChannel> nioChannels = new ConcurrentLinkedQueue<NioChannel>() {
  311           protected AtomicInteger size = new AtomicInteger(0);
  312           protected AtomicInteger bytes = new AtomicInteger(0);
  313           public boolean offer(NioChannel socket) {
  314               boolean offer = socketProperties.getBufferPool()==-1?true:size.get()<socketProperties.getBufferPool();
  315               offer = offer && (socketProperties.getBufferPoolSize()==-1?true:(bytes.get()+socket.getBufferSize())<socketProperties.getBufferPoolSize());
  316               //avoid over growing our cache or add after we have stopped
  317               if ( running && (!paused) && (offer) ) {
  318                   boolean result = super.offer(socket);
  319                   if ( result ) {
  320                       size.incrementAndGet();
  321                       bytes.addAndGet(socket.getBufferSize());
  322                   }
  323                   return result;
  324               }
  325               else return false;
  326           }
  327           
  328           public NioChannel poll() {
  329               NioChannel result = super.poll();
  330               if ( result != null ) {
  331                   size.decrementAndGet();
  332                   bytes.addAndGet(-result.getBufferSize());
  333               }
  334               return result;
  335           }
  336           
  337           public void clear() {
  338               super.clear();
  339               size.set(0);
  340               bytes.set(0);
  341           }
  342       };
  343   
  344       
  345   
  346       // ------------------------------------------------------------- Properties
  347   
  348   
  349       /**
  350        * External Executor based thread pool.
  351        */
  352       protected Executor executor = null;
  353       public void setExecutor(Executor executor) { this.executor = executor; }
  354       public Executor getExecutor() { return executor; }
  355       
  356       protected boolean useExecutor = true;
  357       public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
  358       public boolean getUseExecutor() { return useExecutor || (executor!=null);}
  359   
  360       /**
  361        * Maximum amount of worker threads.
  362        */
  363       protected int maxThreads = 200;
  364       public void setMaxThreads(int maxThreads) {
  365           this.maxThreads = maxThreads;
  366           if (running) {
  367               if (getUseExecutor() && executor!=null) {
  368                   if (executor instanceof ThreadPoolExecutor) {
  369                       ((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads);
  370                   }
  371               }else if (workers!=null){            
  372                   synchronized(workers) {
  373                       workers.resize(maxThreads);
  374                   }
  375               }
  376           }
  377       }
  378       public int getMaxThreads() {
  379           if (running && getUseExecutor() && executor!=null) {
  380               if (executor instanceof ThreadPoolExecutor) {
  381                   return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
  382               } else {
  383                   return -1;
  384               }
  385           } else {
  386               return maxThreads;
  387           }
  388       }
  389   
  390   
  391       /**
  392        * Priority of the worker threads.
  393        */
  394       protected int threadPriority = Thread.NORM_PRIORITY;
  395       public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
  396       public int getThreadPriority() { return threadPriority; }
  397   
  398       /**
  399        * Priority of the acceptor threads.
  400        */
  401       protected int acceptorThreadPriority = Thread.NORM_PRIORITY;
  402       public void setAcceptorThreadPriority(int acceptorThreadPriority) { this.acceptorThreadPriority = acceptorThreadPriority; }
  403       public int getAcceptorThreadPriority() { return acceptorThreadPriority; }
  404   
  405       /**
  406        * Priority of the poller threads.
  407        */
  408       protected int pollerThreadPriority = Thread.NORM_PRIORITY;
  409       public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; }
  410       public int getPollerThreadPriority() { return pollerThreadPriority; }
  411   
  412       /**
  413        * Server socket port.
  414        */
  415       protected int port;
  416       public int getPort() { return port; }
  417       public void setPort(int port ) { this.port=port; }
  418   
  419   
  420       /**
  421        * Address for the server socket.
  422        */
  423       protected InetAddress address;
  424       public InetAddress getAddress() { return address; }
  425       public void setAddress(InetAddress address) { this.address = address; }
  426   
  427   
  428       /**
  429        * Handling of accepted sockets.
  430        */
  431       protected Handler handler = null;
  432       public void setHandler(Handler handler ) { this.handler = handler; }
  433       public Handler getHandler() { return handler; }
  434   
  435   
  436       /**
  437        * Allows the server developer to specify the backlog that
  438        * should be used for server sockets. By default, this value
  439        * is 100.
  440        */
  441       protected int backlog = 100;
  442       public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
  443       public int getBacklog() { return backlog; }
  444   
  445       protected SocketProperties socketProperties = new SocketProperties();
  446   
  447       /**
  448        * Socket TCP no delay.
  449        */
  450       public boolean getTcpNoDelay() { return socketProperties.getTcpNoDelay();}
  451       public void setTcpNoDelay(boolean tcpNoDelay) { socketProperties.setTcpNoDelay(tcpNoDelay); }
  452   
  453   
  454       /**
  455        * Socket linger.
  456        */
  457       public int getSoLinger() { return socketProperties.getSoLingerTime(); }
  458       public void setSoLinger(int soLinger) { 
  459           socketProperties.setSoLingerTime(soLinger);
  460           socketProperties.setSoLingerOn(soLinger>=0);
  461       }
  462   
  463   
  464       /**
  465        * Socket timeout.
  466        */
  467       public int getSoTimeout() { return socketProperties.getSoTimeout(); }
  468       public void setSoTimeout(int soTimeout) { socketProperties.setSoTimeout(soTimeout); }
  469   
  470       /**
  471        * The default is true - the created threads will be
  472        *  in daemon mode. If set to false, the control thread
  473        *  will not be daemon - and will keep the process alive.
  474        */
  475       protected boolean daemon = true;
  476       public void setDaemon(boolean b) { daemon = b; }
  477       public boolean getDaemon() { return daemon; }
  478   
  479   
  480       /**
  481        * Name of the thread pool, which will be used for naming child threads.
  482        */
  483       protected String name = "TP";
  484       public void setName(String name) { this.name = name; }
  485       public String getName() { return name; }
  486   
  487   
  488   
  489       /**
  490        * Allow comet request handling.
  491        */
  492       protected boolean useComet = true;
  493       public void setUseComet(boolean useComet) { this.useComet = useComet; }
  494       public boolean getUseComet() { return useComet; }
  495   
  496   
  497       /**
  498        * Acceptor thread count.
  499        */
  500       protected int acceptorThreadCount = 1;
  501       public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }
  502       public int getAcceptorThreadCount() { return acceptorThreadCount; }
  503   
  504   
  505   
  506       /**
  507        * Poller thread count.
  508        */
  509       protected int pollerThreadCount = Runtime.getRuntime().availableProcessors();
  510       public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
  511       public int getPollerThreadCount() { return pollerThreadCount; }
  512   
  513       protected long selectorTimeout = 1000;
  514       public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;}
  515       public long getSelectorTimeout(){ return this.selectorTimeout; }
  516       /**
  517        * The socket poller.
  518        */
  519       protected Poller[] pollers = null;
  520       protected AtomicInteger pollerRotater = new AtomicInteger(0);
  521       /**
  522        * Return an available poller in true round robin fashion
  523        * @return
  524        */
  525       public Poller getPoller0() {
  526           int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
  527           return pollers[idx];
  528       }
  529   
  530       /**
  531        * Dummy maxSpareThreads property.
  532        */
  533       public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
  534   
  535   
  536       /**
  537        * Dummy minSpareThreads property.
  538        */
  539       public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
  540       
  541       /**
  542        * Generic properties, introspected
  543        */
  544       public boolean setProperty(String name, String value) {
  545           final String selectorPoolName = "selectorPool.";
  546           final String socketName = "socket.";
  547           try {
  548               if (name.startsWith(selectorPoolName)) {
  549                   return IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value);
  550               } else if (name.startsWith(socketName)) {
  551                   return IntrospectionUtils.setProperty(socketProperties, name.substring(socketName.length()), value);
  552               } else {
  553                   return IntrospectionUtils.setProperty(this,name,value,false);
  554               }
  555           }catch ( Exception x ) {
  556               log.error("Unable to set attribute \""+name+"\" to \""+value+"\"",x);
  557               return false;
  558           }
  559       }
  560   
  561   
  562       public String adjustRelativePath(String path, String relativeTo) {
  563           File f = new File(path);
  564           if ( !f.isAbsolute()) {
  565               path = relativeTo + File.separator + path;
  566               f = new File(path);
  567           }
  568           if (!f.exists()) {
  569               log.warn("configured file:["+path+"] does not exist.");
  570           }
  571           return path;
  572       }
  573       
  574       public String defaultIfNull(String val, String defaultValue) {
  575           if (val==null) return defaultValue;
  576           else return val;
  577       }
  578       // --------------------  SSL related properties --------------------
  579       protected String truststoreFile = System.getProperty("javax.net.ssl.trustStore");
  580       public void setTruststoreFile(String s) {
  581           s = adjustRelativePath(s,System.getProperty("catalina.base"));
  582           this.truststoreFile = s;
  583       }
  584       public String getTruststoreFile() {return truststoreFile;}
  585       protected String truststorePass = System.getProperty("javax.net.ssl.trustStorePassword");
  586       public void setTruststorePass(String truststorePass) {this.truststorePass = truststorePass;}
  587       public String getTruststorePass() {return truststorePass;}
  588       protected String truststoreType = System.getProperty("javax.net.ssl.trustStoreType");
  589       public void setTruststoreType(String truststoreType) {this.truststoreType = truststoreType;}
  590       public String getTruststoreType() {return truststoreType;}
  591   
  592       protected String keystoreFile = System.getProperty("user.home")+"/.keystore";
  593       public String getKeystoreFile() { return keystoreFile;}
  594       public void setKeystoreFile(String s ) { 
  595           s = adjustRelativePath(s,System.getProperty("catalina.base"));
  596           this.keystoreFile = s; 
  597       }
  598       public void setKeystore(String s ) { setKeystoreFile(s);}
  599       public String getKeystore() { return getKeystoreFile();}
  600   
  601       String keyAlias = null;
  602       public String getKeyAlias() { return keyAlias;}
  603       public void setKeyAlias(String s ) { keyAlias = s;}
  604       
  605       
  606       protected String algorithm = "SunX509";
  607       public String getAlgorithm() { return algorithm;}
  608       public void setAlgorithm(String s ) { this.algorithm = s;}
  609   
  610       protected String clientAuth = "false";
  611       public String getClientAuth() { return clientAuth;}
  612       public void setClientAuth(String s ) { this.clientAuth = s;}
  613       
  614       protected String keystorePass = "changeit";
  615       public String getKeystorePass() { return keystorePass;}
  616       public void setKeystorePass(String s ) { this.keystorePass = s;}
  617       
  618       protected String keystoreType = "JKS";
  619       public String getKeystoreType() { return keystoreType;}
  620       public void setKeystoreType(String s ) { this.keystoreType = s;}
  621   
  622       protected String sslProtocol = "TLS"; 
  623       public String getSslProtocol() { return sslProtocol;}
  624       public void setSslProtocol(String s) { sslProtocol = s;}
  625       
  626       protected String sslEnabledProtocols=null; //"TLSv1,SSLv3,SSLv2Hello"
  627       protected String[] sslEnabledProtocolsarr =  new String[0];
  628       public void setSslEnabledProtocols(String s) {
  629           this.sslEnabledProtocols = s;
  630           StringTokenizer t = new StringTokenizer(s,",");
  631           sslEnabledProtocolsarr = new String[t.countTokens()];
  632           for (int i=0; i<sslEnabledProtocolsarr.length; i++ ) sslEnabledProtocolsarr[i] = t.nextToken();
  633       }
  634       
  635       protected String ciphers = null;
  636       protected String[] ciphersarr = new String[0];
  637       public String getCiphers() { return ciphers;}
  638       public void setCiphers(String s) { 
  639           ciphers = s;
  640           if ( s == null ) ciphersarr = new String[0];
  641           else {
  642               StringTokenizer t = new StringTokenizer(s,",");
  643               ciphersarr = new String[t.countTokens()];
  644               for (int i=0; i<ciphersarr.length; i++ ) ciphersarr[i] = t.nextToken();
  645           }
  646       }
  647   
  648       protected int sessionCacheSize = 0;
  649       public int getSessionCacheSize() { return sessionCacheSize;}
  650       public void setSessionCacheSize(int i) { sessionCacheSize = i;}
  651   
  652       protected int sessionCacheTimeout = 86400;
  653       public int getSessionCacheTimeout() { return sessionCacheTimeout;}
  654       public void setSessionCacheTimeout(int i) { sessionCacheTimeout = i;}
  655   
  656       /**
  657        * SSL engine.
  658        */
  659       protected boolean SSLEnabled = false;
  660       public boolean isSSLEnabled() { return SSLEnabled;}
  661       public void setSSLEnabled(boolean SSLEnabled) {this.SSLEnabled = SSLEnabled;}
  662   
  663       protected boolean secure = false;
  664       public boolean getSecure() { return secure;}
  665       public void setSecure(boolean b) { secure = b;}
  666   
  667       public void setSelectorPool(NioSelectorPool selectorPool) {
  668           this.selectorPool = selectorPool;
  669       }
  670   
  671       public void setSocketProperties(SocketProperties socketProperties) {
  672           this.socketProperties = socketProperties;
  673       }
  674   
  675       public void setUseSendfile(boolean useSendfile) {
  676           this.useSendfile = useSendfile;
  677       }
  678   
  679       public void setOomParachute(int oomParachute) {
  680           this.oomParachute = oomParachute;
  681       }
  682   
  683       public void setOomParachuteData(byte[] oomParachuteData) {
  684           this.oomParachuteData = oomParachuteData;
  685       }
  686   
  687   
  688       protected SSLContext sslContext = null;
  689       public SSLContext getSSLContext() { return sslContext;}
  690       public void setSSLContext(SSLContext c) { sslContext = c;}
  691       
  692       // --------------------------------------------------------- OOM Parachute Methods
  693   
  694       protected void checkParachute() {
  695           boolean para = reclaimParachute(false);
  696           if (!para && (System.currentTimeMillis()-lastParachuteCheck)>10000) {
  697               try {
  698                   log.fatal(oomParachuteMsg);
  699               }catch (Throwable t) {
  700                   System.err.println(oomParachuteMsg);
  701               }
  702               lastParachuteCheck = System.currentTimeMillis();
  703           }
  704       }
  705       
  706       protected boolean reclaimParachute(boolean force) {
  707           if ( oomParachuteData != null ) return true;
  708           if ( oomParachute > 0 && ( force || (Runtime.getRuntime().freeMemory() > (oomParachute*2))) )  
  709               oomParachuteData = new byte[oomParachute];
  710           return oomParachuteData != null;
  711       }
  712       
  713       protected void releaseCaches() {
  714           this.keyCache.clear();
  715           this.nioChannels.clear();
  716           this.processorCache.clear();
  717           if ( handler != null ) handler.releaseCaches();
  718           
  719       }
  720       
  721       // --------------------------------------------------------- Public Methods
  722       /**
  723        * Number of keepalive sockets.
  724        */
  725       public int getKeepAliveCount() {
  726           if (pollers == null) {
  727               return 0;
  728           } else {
  729               int sum = 0;
  730               for (int i=0; i<pollers.length; i++) {
  731                   sum += pollers[i].selector.keys().size();
  732               }
  733               return sum;
  734           }
  735       }
  736   
  737   
  738   
  739       /**
  740        * Return the amount of threads that are managed by the pool.
  741        *
  742        * @return the amount of threads that are managed by the pool
  743        */
  744       public int getCurrentThreadCount() {
  745           if (executor!=null) {
  746               if (executor instanceof ThreadPoolExecutor) {
  747                   return ((ThreadPoolExecutor)executor).getPoolSize();
  748               } else {
  749                   return -1;
  750               }
  751           } else {
  752               return curThreads;
  753           }
  754       }
  755   
  756       /**
  757        * Return the amount of threads that are in use 
  758        *
  759        * @return the amount of threads that are in use
  760        */
  761       public int getCurrentThreadsBusy() {
  762           if (executor!=null) {
  763               if (executor instanceof ThreadPoolExecutor) {
  764                   return ((ThreadPoolExecutor)executor).getActiveCount();
  765               } else {
  766                   return -1;
  767               }
  768           } else {
  769               return workers!=null?curThreads - workers.size():0;
  770           }
  771       }
  772   
  773       /**
  774        * Return the state of the endpoint.
  775        *
  776        * @return true if the endpoint is running, false otherwise
  777        */
  778       public boolean isRunning() {
  779           return running;
  780       }
  781   
  782   
  783       /**
  784        * Return the state of the endpoint.
  785        *
  786        * @return true if the endpoint is paused, false otherwise
  787        */
  788       public boolean isPaused() {
  789           return paused;
  790       }
  791   
  792   
  793       // ----------------------------------------------- Public Lifecycle Methods
  794   
  795   
  796       /**
  797        * Initialize the endpoint.
  798        */
  799       public void init()
  800           throws Exception {
  801   
  802           if (initialized)
  803               return;
  804   
  805           serverSock = ServerSocketChannel.open();
  806           serverSock.socket().setPerformancePreferences(socketProperties.getPerformanceConnectionTime(),
  807                                                         socketProperties.getPerformanceLatency(),
  808                                                         socketProperties.getPerformanceBandwidth());
  809           InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
  810           serverSock.socket().bind(addr,backlog); 
  811           serverSock.configureBlocking(true); //mimic APR behavior
  812           serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
  813   
  814           // Initialize thread count defaults for acceptor, poller
  815           if (acceptorThreadCount == 0) {
  816               // FIXME: Doesn't seem to work that well with multiple accept threads
  817               acceptorThreadCount = 1;
  818           }
  819           if (pollerThreadCount <= 0) {
  820               //minimum one poller thread
  821               pollerThreadCount = 1;
  822           }
  823           stopLatch = new CountDownLatch(pollerThreadCount);
  824   
  825           // Initialize SSL if needed
  826           if (isSSLEnabled()) {
  827               // Initialize SSL
  828               char[] passphrase = getKeystorePass().toCharArray();
  829   
  830               char[] tpassphrase = (getTruststorePass()!=null)?getTruststorePass().toCharArray():passphrase;
  831               String ttype = (getTruststoreType()!=null)?getTruststoreType():getKeystoreType();
  832               
  833               KeyStore ks = KeyStore.getInstance(getKeystoreType());
  834               ks.load(new FileInputStream(getKeystoreFile()), passphrase);
  835               KeyStore ts = null;
  836               if (getTruststoreFile()==null) {
  837                   //no op, same as for BIO connector
  838               }else {
  839                   ts = KeyStore.getInstance(ttype);
  840                   ts.load(new FileInputStream(getTruststoreFile()), tpassphrase);
  841               }
  842   
  843               KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
  844               kmf.init(ks, passphrase);
  845   
  846               TrustManagerFactory tmf = TrustManagerFactory.getInstance(getAlgorithm());
  847               tmf.init(ts);
  848   
  849               sslContext = SSLContext.getInstance(getSslProtocol());
  850               sslContext.init(wrap(kmf.getKeyManagers()), tmf.getTrustManagers(), null);
  851               SSLSessionContext sessionContext =
  852                   sslContext.getServerSessionContext();
  853               if (sessionContext != null) {
  854                   sessionContext.setSessionCacheSize(sessionCacheSize);
  855                   sessionContext.setSessionTimeout(sessionCacheTimeout);
  856               }
  857           }
  858           
  859           if (oomParachute>0) reclaimParachute(true);
  860           selectorPool.open();
  861           initialized = true;
  862   
  863       }
  864       
  865       public KeyManager[] wrap(KeyManager[] managers) {
  866           if (managers==null) return null;
  867           KeyManager[] result = new KeyManager[managers.length];
  868           for (int i=0; i<result.length; i++) {
  869               if (managers[i] instanceof X509KeyManager && getKeyAlias()!=null) {
  870                   result[i] = new NioX509KeyManager((X509KeyManager)managers[i],getKeyAlias());
  871               } else {
  872                   result[i] = managers[i];
  873               }
  874           }
  875           return result;
  876       }
  877   
  878   
  879       /**
  880        * Start the NIO endpoint, creating acceptor, poller threads.
  881        */
  882       public void start()
  883           throws Exception {
  884           // Initialize socket if not done before
  885           if (!initialized) {
  886               init();
  887           }
  888           if (!running) {
  889               running = true;
  890               paused = false;
  891               
  892               // Create worker collection
  893               if (getUseExecutor()) {
  894                   if ( executor == null ) {
  895                       TaskQueue taskqueue = new TaskQueue();
  896                       TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
  897                       executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
  898                       taskqueue.setParent( (ThreadPoolExecutor) executor, this);
  899                   }
  900               } else if ( executor == null ) {//avoid two thread pools being created
  901                   workers = new WorkerStack(maxThreads);
  902               }
  903   
  904               // Start poller threads
  905               pollers = new Poller[getPollerThreadCount()];
  906               for (int i=0; i<pollers.length; i++) {
  907                   pollers[i] = new Poller();
  908                   Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
  909                   pollerThread.setPriority(threadPriority);
  910                   pollerThread.setDaemon(true);
  911                   pollerThread.start();
  912               }
  913   
  914               // Start acceptor threads
  915               for (int i = 0; i < acceptorThreadCount; i++) {
  916                   Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
  917                   acceptorThread.setPriority(threadPriority);
  918                   acceptorThread.setDaemon(daemon);
  919                   acceptorThread.start();
  920               }
  921           }
  922       }
  923   
  924   
  925       /**
  926        * Pause the endpoint, which will make it stop accepting new sockets.
  927        */
  928       public void pause() {
  929           if (running && !paused) {
  930               paused = true;
  931               unlockAccept();
  932           }
  933       }
  934   
  935   
  936       /**
  937        * Resume the endpoint, which will make it start accepting new sockets
  938        * again.
  939        */
  940       public void resume() {
  941           if (running) {
  942               paused = false;
  943           }
  944       }
  945   
  946   
  947       /**
  948        * Stop the endpoint. This will cause all processing threads to stop.
  949        */
  950       public void stop() {
  951           if (running) {
  952               running = false;
  953               unlockAccept();
  954               for (int i=0; pollers!=null && i<pollers.length; i++) {
  955                   if (pollers[i]==null) continue;
  956                   pollers[i].destroy();
  957                   pollers[i] = null;
  958               }
  959               try { stopLatch.await(selectorTimeout+100,TimeUnit.MILLISECONDS); } catch (InterruptedException ignore ) {}
  960           }
  961           eventCache.clear();
  962           keyCache.clear();
  963           nioChannels.clear();
  964           processorCache.clear();
  965           if ( executor!=null ) {
  966               if ( executor instanceof ThreadPoolExecutor ) {
  967                   //this is our internal one, so we need to shut it down
  968                   ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
  969                   tpe.shutdownNow();
  970                   TaskQueue queue = (TaskQueue) tpe.getQueue();
  971                   queue.setParent(null,null);
  972               }
  973               executor = null;
  974           }
  975           
  976       }
  977   
  978   
  979       /**
  980        * Deallocate NIO memory pools, and close server socket.
  981        */
  982       public void destroy() throws Exception {
  983           if (log.isDebugEnabled()) {
  984               log.debug("Destroy initiated for "+new InetSocketAddress(address,port));
  985           }
  986           if (running) {
  987               stop();
  988           }
  989           // Close server socket
  990           serverSock.socket().close();
  991           serverSock.close();
  992           serverSock = null;
  993           sslContext = null;
  994           initialized = false;
  995           releaseCaches();
  996           selectorPool.close();
  997           if (log.isDebugEnabled()) {
  998               log.debug("Destroy completed for "+new InetSocketAddress(address,port));
  999           }
 1000       }
 1001   
 1002   
 1003       // ------------------------------------------------------ Protected Methods
 1004   
 1005   
 1006       /**
 1007        * Get a sequence number used for thread naming.
 1008        */
 1009       protected int getSequence() {
 1010           return sequence++;
 1011       }
 1012   
 1013       public int getWriteBufSize() {
 1014           return socketProperties.getTxBufSize();
 1015       }
 1016   
 1017       public int getReadBufSize() {
 1018           return socketProperties.getRxBufSize();
 1019       }
 1020   
 1021       public NioSelectorPool getSelectorPool() {
 1022           return selectorPool;
 1023       }
 1024   
 1025       public SocketProperties getSocketProperties() {
 1026           return socketProperties;
 1027       }
 1028   
 1029       public boolean getUseSendfile() {
 1030           return useSendfile;
 1031       }
 1032   
 1033       public int getOomParachute() {
 1034           return oomParachute;
 1035       }
 1036   
 1037       public byte[] getOomParachuteData() {
 1038           return oomParachuteData;
 1039       }
 1040   
 1041   
 1042       /**
 1043        * Unlock the server socket accept using a bogus connection.
 1044        */
 1045       protected void unlockAccept() {
 1046           java.net.Socket s = null;
 1047           InetSocketAddress saddr = null;
 1048           try {
 1049               // Need to create a connection to unlock the accept();
 1050               if (address == null) {
 1051                    saddr = new InetSocketAddress("localhost", port);
 1052               } else {
 1053                    saddr = new InetSocketAddress(address,port);
 1054               }
 1055               s = new java.net.Socket();
 1056               s.setSoTimeout(getSocketProperties().getSoTimeout());
 1057               s.setSoLinger(getSocketProperties().getSoLingerOn(),getSocketProperties().getSoLingerTime());
 1058               if (log.isDebugEnabled()) {
 1059                   log.debug("About to unlock socket for:"+saddr);
 1060               }
 1061               s.connect(saddr,getSocketProperties().getUnlockTimeout());
 1062               if (log.isDebugEnabled()) {
 1063                   log.debug("Socket unlock completed for:"+saddr);
 1064               } 
 1065           } catch(Exception e) {
 1066               if (log.isDebugEnabled()) {
 1067                   log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
 1068               }
 1069           } finally {
 1070               if (s != null) {
 1071                   try {
 1072                       s.close();
 1073                   } catch (Exception e) {
 1074                       // Ignore
 1075                   }
 1076               }
 1077           }
 1078       }
 1079   
 1080   
 1081       /**
 1082        * Process the specified connection.
 1083        */
 1084       protected boolean setSocketOptions(SocketChannel socket) {
 1085           // Process the connection
 1086           try {
 1087               //disable blocking, APR style, we are gonna be polling it
 1088               socket.configureBlocking(false);
 1089               Socket sock = socket.socket();
 1090               socketProperties.setProperties(sock);
 1091   
 1092               NioChannel channel = nioChannels.poll();
 1093               if ( channel == null ) {
 1094                   // SSL setup
 1095                   if (sslContext != null) {
 1096                       SSLEngine engine = createSSLEngine();
 1097                       int appbufsize = engine.getSession().getApplicationBufferSize();
 1098                       NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
 1099                                                                          Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
 1100                                                                          socketProperties.getDirectBuffer());
 1101                       channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
 1102                   } else {
 1103                       // normal tcp setup
 1104                       NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
 1105                                                                          socketProperties.getAppWriteBufSize(),
 1106                                                                          socketProperties.getDirectBuffer());
 1107   
 1108                       channel = new NioChannel(socket, bufhandler);
 1109                   }
 1110               } else {                
 1111                   channel.setIOChannel(socket);
 1112                   if ( channel instanceof SecureNioChannel ) {
 1113                       SSLEngine engine = createSSLEngine();
 1114                       ((SecureNioChannel)channel).reset(engine);
 1115                   } else {
 1116                       channel.reset();
 1117                   }
 1118               }
 1119               getPoller0().register(channel);
 1120           } catch (Throwable t) {
 1121               try {
 1122                   log.error("",t);
 1123               }catch ( Throwable tt){}
 1124               // Tell to close the socket
 1125               return false;
 1126           }
 1127           return true;
 1128       }
 1129   
 1130       protected SSLEngine createSSLEngine() {
 1131           SSLEngine engine = sslContext.createSSLEngine();
 1132           if ("false".equals(getClientAuth())) {
 1133               engine.setNeedClientAuth(false);
 1134               engine.setWantClientAuth(false);
 1135           } else if ("true".equals(getClientAuth()) || "yes".equals(getClientAuth())){
 1136               engine.setNeedClientAuth(true);
 1137           } else if ("want".equals(getClientAuth())) {
 1138               engine.setWantClientAuth(true);
 1139           }
 1140           engine.setUseClientMode(false);
 1141           if ( ciphersarr.length > 0 ) engine.setEnabledCipherSuites(ciphersarr);
 1142           if ( sslEnabledProtocolsarr.length > 0 ) engine.setEnabledProtocols(sslEnabledProtocolsarr);
 1143           
 1144           return engine;
 1145       }
 1146   
 1147   
 1148       /**
 1149        * Returns true if a worker thread is available for processing.
 1150        * @return boolean
 1151        */
 1152       protected boolean isWorkerAvailable() {
 1153           if ( executor != null ) {
 1154               return true;
 1155           } else {
 1156               if (workers.size() > 0) {
 1157                   return true;
 1158               }
 1159               if ( (maxThreads > 0) && (curThreads < maxThreads)) {
 1160                   return true;
 1161               } else {
 1162                   if (maxThreads < 0) {
 1163                       return true;
 1164                   } else {
 1165                       return false;
 1166                   }
 1167               }
 1168           }
 1169       }
 1170       /**
 1171        * Create (or allocate) and return an available processor for use in
 1172        * processing a specific HTTP request, if possible.  If the maximum
 1173        * allowed processors have already been created and are in use, return
 1174        * <code>null</code> instead.
 1175        */
 1176       protected Worker createWorkerThread() {
 1177   
 1178           synchronized (workers) {
 1179               if (workers.size() > 0) {
 1180                   curThreadsBusy++;
 1181                   return (workers.pop());
 1182               }
 1183               if ((maxThreads > 0) && (curThreads < maxThreads)) {
 1184                   curThreadsBusy++;
 1185                   if (curThreadsBusy == maxThreads) {
 1186                       log.info(sm.getString("endpoint.info.maxThreads",
 1187                               Integer.toString(maxThreads), address,
 1188                               Integer.toString(port)));
 1189                   }
 1190                   return (newWorkerThread());
 1191               } else {
 1192                   if (maxThreads < 0) {
 1193                       curThreadsBusy++;
 1194                       return (newWorkerThread());
 1195                   } else {
 1196                       return (null);
 1197                   }
 1198               }
 1199           }
 1200       }
 1201   
 1202   
 1203       /**
 1204        * Create and return a new processor suitable for processing HTTP
 1205        * requests and returning the corresponding responses.
 1206        */
 1207       protected Worker newWorkerThread() {
 1208   
 1209           Worker workerThread = new Worker();
 1210           workerThread.start();
 1211           return (workerThread);
 1212   
 1213       }
 1214   
 1215   
 1216       /**
 1217        * Return a new worker thread, and block while to worker is available.
 1218        */
 1219       protected Worker getWorkerThread() {
 1220           // Allocate a new worker thread
 1221           Worker workerThread = createWorkerThread();
 1222           while (workerThread == null) {
 1223               try {
 1224                   synchronized (workers) {
 1225                       workerThread = createWorkerThread();
 1226                       if ( workerThread == null ) workers.wait();
 1227                   }
 1228               } catch (InterruptedException e) {
 1229                   // Ignore
 1230               }
 1231               if ( workerThread == null ) workerThread = createWorkerThread();
 1232           }
 1233           return workerThread;
 1234       }
 1235   
 1236   
 1237       /**
 1238        * Recycle the specified Processor so that it can be used again.
 1239        *
 1240        * @param workerThread The processor to be recycled
 1241        */
 1242       protected void recycleWorkerThread(Worker workerThread) {
 1243           synchronized (workers) {
 1244               workers.push(workerThread);
 1245               curThreadsBusy--;
 1246               workers.notify();
 1247           }
 1248       }
 1249       /**
 1250        * Process given socket.
 1251        */
 1252       protected boolean processSocket(NioChannel socket) {
 1253           return processSocket(socket,null);
 1254       }
 1255   
 1256   
 1257       /**
 1258        * Process given socket for an event.
 1259        */
 1260       protected boolean processSocket(NioChannel socket, SocketStatus status) {
 1261           return processSocket(socket,status,true);
 1262       }
 1263       
 1264       protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
 1265           try {
 1266               KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
 1267               attachment.setCometNotify(false); //will get reset upon next reg
 1268               if (executor == null) {
 1269                   getWorkerThread().assign(socket, status);
 1270               } else {
 1271                   SocketProcessor sc = processorCache.poll();
 1272                   if ( sc == null ) sc = new SocketProcessor(socket,status);
 1273                   else sc.reset(socket,status);
 1274                   if ( dispatch ) executor.execute(sc);
 1275                   else sc.run();
 1276               }
 1277           } catch (Throwable t) {
 1278               // This means we got an OOM or similar creating a thread, or that
 1279               // the pool and its queue are full
 1280               log.error(sm.getString("endpoint.process.fail"), t);
 1281               return false;
 1282           }
 1283           return true;
 1284       }
 1285   
 1286   
 1287       // --------------------------------------------------- Acceptor Inner Class
 1288   
 1289   
 1290       /**
 1291        * Server socket acceptor thread.
 1292        */
 1293       protected class Acceptor implements Runnable {
 1294           /**
 1295            * The background thread that listens for incoming TCP/IP connections and
 1296            * hands them off to an appropriate processor.
 1297            */
 1298           public void run() {
 1299               // Loop until we receive a shutdown command
 1300               while (running) {
 1301                   // Loop if endpoint is paused
 1302                   while (paused) {
 1303                       try {
 1304                           Thread.sleep(1000);
 1305                       } catch (InterruptedException e) {
 1306                           // Ignore
 1307                       }
 1308                   }
 1309                   try {
 1310                       // Accept the next incoming connection from the server socket
 1311                       SocketChannel socket = serverSock.accept();
 1312                       // Hand this socket off to an appropriate processor
 1313                       //TODO FIXME - this is currently a blocking call, meaning we will be blocking
 1314                       //further accepts until there is a thread available.
 1315                       if ( running && (!paused) && socket != null ) {
 1316                           //processSocket(socket);
 1317                           if (!setSocketOptions(socket)) {
 1318                               try {
 1319                                   socket.socket().close();
 1320                                   socket.close();
 1321                               } catch (IOException ix) {
 1322                                   if (log.isDebugEnabled())
 1323                                       log.debug("", ix);
 1324                               }
 1325                           } 
 1326                       }
 1327                   }catch (SocketTimeoutException sx) {
 1328                       //normal condition
 1329                   }catch ( IOException x ) {
 1330                       if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
 1331                   } catch (OutOfMemoryError oom) {
 1332                       try {
 1333                           oomParachuteData = null;
 1334                           releaseCaches();
 1335                           log.error("", oom);
 1336                       }catch ( Throwable oomt ) {
 1337                           try {
 1338                               try {
 1339                                   System.err.println(oomParachuteMsg);
 1340                                   oomt.printStackTrace();
 1341                               }catch (Throwable letsHopeWeDontGetHere){}
 1342                           }catch (Throwable letsHopeWeDontGetHere){}
 1343                       }
 1344                   } catch (Throwable t) {
 1345                       log.error(sm.getString("endpoint.accept.fail"), t);
 1346                   }
 1347               }//while
 1348           }//run
 1349       }
 1350   
 1351   
 1352       // ----------------------------------------------------- Poller Inner Classes
 1353   
 1354       /**
 1355        * 
 1356        * PollerEvent, cacheable object for poller events to avoid GC
 1357        */
 1358       public class PollerEvent implements Runnable {
 1359           
 1360           protected NioChannel socket;
 1361           protected int interestOps;
 1362           protected KeyAttachment key;
 1363           public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
 1364               reset(ch, k, intOps);
 1365           }
 1366       
 1367           public void reset(NioChannel ch, KeyAttachment k, int intOps) {
 1368               socket = ch;
 1369               interestOps = intOps;
 1370               key = k;
 1371           }
 1372       
 1373           public void reset() {
 1374               reset(null, null, 0);
 1375           }
 1376       
 1377           public void run() {
 1378               if ( interestOps == OP_REGISTER ) {
 1379                   try {
 1380                       socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
 1381                   } catch (Exception x) {
 1382                       log.error("", x);
 1383                   }
 1384               } else {
 1385                   final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
 1386                   try {
 1387                       boolean cancel = false;
 1388                       if (key != null) {
 1389                           final KeyAttachment att = (KeyAttachment) key.attachment();
 1390                           if ( att!=null ) {
 1391                               //handle callback flag
 1392                               if (att.getComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
 1393                                   att.setCometNotify(true);
 1394                               } else {
 1395                                   att.setCometNotify(false);
 1396                               }
 1397                               interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
 1398                               att.access();//to prevent timeout
 1399                               //we are registering the key to start with, reset the fairness counter.
 1400                               int ops = key.interestOps() | interestOps;
 1401                               att.interestOps(ops);
 1402                               key.interestOps(ops);
 1403                               att.setCometOps(ops);
 1404                           } else {
 1405                               cancel = true;
 1406                           }
 1407                       } else {
 1408                           cancel = true;
 1409                       }
 1410                       if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
 1411                   }catch (CancelledKeyException ckx) {
 1412                       try {
 1413                           socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
 1414                       }catch (Exception ignore) {}
 1415                   }
 1416               }//end if
 1417           }//run
 1418           
 1419           public String toString() {
 1420               return super.toString()+"[intOps="+this.interestOps+"]";
 1421           }
 1422       }
 1423       
 1424       /**
 1425        * Poller class.
 1426        */
 1427       public class Poller implements Runnable {
 1428   
 1429           protected Selector selector;
 1430           protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
 1431           
 1432           protected volatile boolean close = false;
 1433           protected long nextExpiration = 0;//optimize expiration handling
 1434           
 1435           protected AtomicLong wakeupCounter = new AtomicLong(0l);
 1436           
 1437           protected CountDownLatch stopLatch = new CountDownLatch(1);
 1438   
 1439   
 1440   
 1441           public Poller() throws IOException {
 1442               this.selector = Selector.open();
 1443           }
 1444           
 1445           public Selector getSelector() { return selector;}
 1446   
 1447           /**
 1448            * Destroy the poller.
 1449            */
 1450           protected void destroy() {
 1451               // Wait for polltime before doing anything, so that the poller threads
 1452               // exit, otherwise parallel closure of sockets which are still
 1453               // in the poller can cause problems
 1454               close = true;
 1455               events.clear();
 1456               selector.wakeup();
 1457           }
 1458           
 1459           public void addEvent(Runnable event) {
 1460               events.offer(event);
 1461               if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
 1462           }
 1463           
 1464           public void cometInterest(NioChannel socket) {
 1465               KeyAttachment att = (KeyAttachment)socket.getAttachment(false);
 1466               add(socket,att.getCometOps());
 1467               if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
 1468                   nextExpiration = 0; //force the check for faster callback
 1469                   selector.wakeup();
 1470               }
 1471           }
 1472           
 1473           public void wakeup() {
 1474               selector.wakeup();
 1475           }
 1476   
 1477           /**
 1478            * Add specified socket and associated pool to the poller. The socket will
 1479            * be added to a temporary array, and polled first after a maximum amount
 1480            * of time equal to pollTime (in most cases, latency will be much lower,
 1481            * however).
 1482            *
 1483            * @param socket to add to the poller
 1484            */
 1485           public void add(final NioChannel socket) {
 1486               add(socket,SelectionKey.OP_READ);
 1487           }
 1488           
 1489           public void add(final NioChannel socket, final int interestOps) {
 1490               PollerEvent r = eventCache.poll();
 1491               if ( r==null) r = new PollerEvent(socket,null,interestOps);
 1492               else r.reset(socket,null,interestOps);
 1493               addEvent(r);
 1494           }
 1495           
 1496           public boolean events() {
 1497               boolean result = false;
 1498               //synchronized (events) {
 1499                   Runnable r = null;
 1500                   result = (events.size() > 0);
 1501                   while ( (r = (Runnable)events.poll()) != null ) {
 1502                       try {
 1503                           r.run();
 1504                           if ( r instanceof PollerEvent ) {
 1505                               ((PollerEvent)r).reset();
 1506                               eventCache.offer((PollerEvent)r);
 1507                           }
 1508                       } catch ( Throwable x ) {
 1509                           log.error("",x);
 1510                       }
 1511                   }
 1512                   //events.clear();
 1513               //}
 1514               return result;
 1515           }
 1516           
 1517           public void register(final NioChannel socket)
 1518           {
 1519               socket.setPoller(this);
 1520               KeyAttachment key = keyCache.poll();
 1521               final KeyAttachment ka = key!=null?key:new KeyAttachment();
 1522               ka.reset(this,socket,getSocketProperties().getSoTimeout());
 1523               PollerEvent r = eventCache.poll();
 1524               ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
 1525               if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
 1526               else r.reset(socket,ka,OP_REGISTER);
 1527               addEvent(r);
 1528           }
 1529           public void cancelledKey(SelectionKey key, SocketStatus status, boolean dispatch) {
 1530               try {
 1531                   if ( key == null ) return;//nothing to do
 1532                   KeyAttachment ka = (KeyAttachment) key.attachment();
 1533                   if (ka != null && ka.getComet() && status != null) {
 1534                       //the comet event takes care of clean up
 1535                       //processSocket(ka.getChannel(), status, dispatch);
 1536                       ka.setComet(false);//to avoid a loop
 1537                       if (status == SocketStatus.TIMEOUT ) {
 1538                           processSocket(ka.getChannel(), status, true);
 1539                           return; // don't close on comet timeout
 1540                       } else {
 1541                           processSocket(ka.getChannel(), status, false); //don't dispatch if the lines below are cancelling the key
 1542                       }                    
 1543                   }
 1544                   
 1545                   key.attach(null);
 1546                   if (ka!=null) handler.release(ka.getChannel());
 1547                   if (key.isValid()) key.cancel();
 1548                   if (key.channel().isOpen()) try {key.channel().close();}catch (Exception ignore){}
 1549                   try {if (ka!=null) ka.channel.close(true);}catch (Exception ignore){}
 1550                   try {if (ka!=null && ka.getSendfileData()!=null && ka.getSendfileData().fchannel!=null && ka.getSendfileData().fchannel.isOpen()) ka.getSendfileData().fchannel.close();}catch (Exception ignore){}
 1551                   if (ka!=null) ka.reset();
 1552               } catch (Throwable e) {
 1553                   if ( log.isDebugEnabled() ) log.error("",e);
 1554                   // Ignore
 1555               }
 1556           }
 1557           /**
 1558            * The background thread that listens for incoming TCP/IP connections and
 1559            * hands them off to an appropriate processor.
 1560            */
 1561           public void run() {
 1562               // Loop until we receive a shutdown command
 1563               while (running) {
 1564                   try {
 1565                       // Loop if endpoint is paused
 1566                       while (paused && (!close) ) {
 1567                           try {
 1568                               Thread.sleep(100);
 1569                           } catch (InterruptedException e) {
 1570                               // Ignore
 1571                           }
 1572                       }
 1573                       boolean hasEvents = false;
 1574   
 1575                       hasEvents = (hasEvents | events());
 1576                       // Time to terminate?
 1577                       if (close) {
 1578                           timeout(0, false);
 1579                           break;
 1580                       }
 1581                       int keyCount = 0;
 1582                       try {
 1583                           if ( !close ) {
 1584                               if (wakeupCounter.get()>0) {
 1585                                   //if we are here, means we have other stuff to do
 1586                                   //do a non blocking select
 1587                                   keyCount = selector.selectNow();
 1588                               }else {
 1589                                   wakeupCounter.set( -1);
 1590                                   keyCount = selector.select(selectorTimeout);
 1591                               }
 1592                               wakeupCounter.set(0);
 1593                           }
 1594                           if (close) {
 1595                               timeout(0, false);
 1596                               selector.close(); 
 1597                               break; 
 1598                           }
 1599                       } catch ( NullPointerException x ) {
 1600                           //sun bug 5076772 on windows JDK 1.5
 1601                           if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
 1602                           if ( wakeupCounter == null || selector == null ) throw x;
 1603                           continue;
 1604                       } catch ( CancelledKeyException x ) {
 1605                           //sun bug 5076772 on windows JDK 1.5
 1606                           if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
 1607                           if ( wakeupCounter == null || selector == null ) throw x;
 1608                           continue;
 1609                       } catch (Throwable x) {
 1610                           log.error("",x);
 1611                           continue;
 1612                       }
 1613                       //either we timed out or we woke up, process events first
 1614                       if ( keyCount == 0 ) hasEvents = (hasEvents | events());
 1615   
 1616                       Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
 1617                       // Walk through the collection of ready keys and dispatch
 1618                       // any active event.
 1619                       while (iterator != null && iterator.hasNext()) {
 1620                           SelectionKey sk = (SelectionKey) iterator.next();
 1621                           KeyAttachment attachment = (KeyAttachment)sk.attachment();
 1622                           attachment.access();
 1623                           iterator.remove();
 1624                           processKey(sk, attachment);
 1625                       }//while
 1626   
 1627                       //process timeouts
 1628                       timeout(keyCount,hasEvents);
 1629                       if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
 1630                   } catch (OutOfMemoryError oom) {
 1631                       try {
 1632                           oomParachuteData = null;
 1633                           releaseCaches();
 1634                           log.error("", oom);
 1635                       }catch ( Throwable oomt ) {
 1636                           try {
 1637                               System.err.println(oomParachuteMsg);
 1638                               oomt.printStackTrace();
 1639                           }catch (Throwable letsHopeWeDontGetHere){}
 1640                       }
 1641                   }
 1642               }//while
 1643               synchronized (this) {
 1644                   this.notifyAll();
 1645               }
 1646               stopLatch.countDown();
 1647   
 1648           }
 1649           
 1650           protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
 1651               boolean result = true;
 1652               try {
 1653                   if ( close ) {
 1654                       cancelledKey(sk, SocketStatus.STOP, false);
 1655                   } else if ( sk.isValid() && attachment != null ) {
 1656                       attachment.access();//make sure we don't time out valid sockets
 1657                       sk.attach(attachment);//cant remember why this is here
 1658                       NioChannel channel = attachment.getChannel();
 1659                       if (sk.isReadable() || sk.isWritable() ) {
 1660                           if ( attachment.getSendfileData() != null ) {
 1661                               processSendfile(sk,attachment,true, false);
 1662                           } else if ( attachment.getComet() ) {
 1663                               //check if thread is available
 1664                               if ( isWorkerAvailable() ) {
 1665                                   //set interest ops to 0 so we don't get multiple
 1666                                   //invokations for both read and write on separate threads
 1667                                   reg(sk, attachment, 0);
 1668                                   //read goes before write
 1669                                   if (sk.isReadable()) {
 1670                                       //read notification
 1671                                       if (!processSocket(channel, SocketStatus.OPEN))
 1672                                           processSocket(channel, SocketStatus.DISCONNECT);
 1673                                   } else {
 1674                                       //future placement of a WRITE notif
 1675                                       if (!processSocket(channel, SocketStatus.OPEN))
 1676                                           processSocket(channel, SocketStatus.DISCONNECT);
 1677                                   }
 1678                               } else {
 1679                                   result = false;
 1680                               }
 1681                           } else {
 1682                               //later on, improve latch behavior
 1683                               if ( isWorkerAvailable() ) {
 1684                                   unreg(sk, attachment,sk.readyOps());
 1685                                   boolean close = (!processSocket(channel));
 1686                                   if (close) {
 1687                                       cancelledKey(sk,SocketStatus.DISCONNECT,false);
 1688                                   }
 1689                               } else {
 1690                                   result = false;
 1691                               }
 1692                           }
 1693                       } 
 1694                   } else {
 1695                       //invalid key
 1696                       cancelledKey(sk, SocketStatus.ERROR,false);
 1697                   }
 1698               } catch ( CancelledKeyException ckx ) {
 1699                   cancelledKey(sk, SocketStatus.ERROR,false);
 1700               } catch (Throwable t) {
 1701                   log.error("",t);
 1702               }
 1703               return result;
 1704           }
 1705           
 1706           public boolean processSendfile(SelectionKey sk, KeyAttachment attachment, boolean reg, boolean event) {
 1707               NioChannel sc = null;
 1708               try {
 1709                   //unreg(sk,attachment);//only do this if we do process send file on a separate thread
 1710                   SendfileData sd = attachment.getSendfileData();
 1711                   if ( sd.fchannel == null ) {
 1712                       File f = new File(sd.fileName);
 1713                       if ( !f.exists() ) {
 1714                           cancelledKey(sk,SocketStatus.ERROR,false);
 1715                           return false;
 1716                       }
 1717                       sd.fchannel = new FileInputStream(f).getChannel();
 1718                   }
 1719                   sc = attachment.getChannel();
 1720                   sc.setSendFile(true);
 1721                   WritableByteChannel wc =(WritableByteChannel) ((sc instanceof SecureNioChannel)?sc:sc.getIOChannel());
 1722                   
 1723                   if (sc.getOutboundRemaining()>0) {
 1724                       if (sc.flushOutbound()) {
 1725                           attachment.access();
 1726                       }
 1727                   } else {
 1728                       long written = sd.fchannel.transferTo(sd.pos,sd.length,wc);
 1729                       if ( written > 0 ) {
 1730                           sd.pos += written;
 1731                           sd.length -= written;
 1732                           attachment.access();
 1733                       }
 1734                   }
 1735                   if ( sd.length <= 0 && sc.getOutboundRemaining()<=0) {
 1736                       if (log.isDebugEnabled()) {
 1737                           log.debug("Send file complete for:"+sd.fileName);
 1738                       }
 1739                       attachment.setSendfileData(null);
 1740                       try {sd.fchannel.close();}catch(Exception ignore){}
 1741                       if ( sd.keepAlive ) {
 1742                           if (reg) {
 1743                               if (log.isDebugEnabled()) {
 1744                                   log.debug("Connection is keep alive, registering back for OP_READ");
 1745                               }
 1746                               if (event) {
 1747                                   this.add(attachment.getChannel(),SelectionKey.OP_READ);
 1748                               } else {
 1749                                   reg(sk,attachment,SelectionKey.OP_READ);
 1750                               }
 1751                           }
 1752                       } else {
 1753                           if (log.isDebugEnabled()) {
 1754                               log.debug("Send file connection is being closed");
 1755                           }
 1756                           cancelledKey(sk,SocketStatus.STOP,false);
 1757                       }
 1758                   } else if ( attachment.interestOps() == 0 && reg ) {
 1759                       if (log.isDebugEnabled()) {
 1760                           log.debug("OP_WRITE for sendilfe:"+sd.fileName);
 1761                       }
 1762                       if (event) {
 1763                           add(attachment.getChannel(),SelectionKey.OP_WRITE);
 1764                       } else {
 1765                           reg(sk,attachment,SelectionKey.OP_WRITE);
 1766                       }
 1767                   }
 1768               }catch ( IOException x ) {
 1769                   if ( log.isDebugEnabled() ) log.debug("Unable to complete sendfile request:", x);
 1770                   cancelledKey(sk,SocketStatus.ERROR,false);
 1771                   return false;
 1772               }catch ( Throwable t ) {
 1773                   log.error("",t);
 1774                   cancelledKey(sk, SocketStatus.ERROR, false);
 1775                   return false;
 1776               }finally {
 1777                   if (sc!=null) sc.setSendFile(false);
 1778               }
 1779               return true;
 1780           }
 1781   
 1782           protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
 1783               //this is a must, so that we don't have multiple threads messing with the socket
 1784               reg(sk,attachment,sk.interestOps()& (~readyOps));
 1785           }
 1786           
 1787           protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
 1788               sk.interestOps(intops); 
 1789               attachment.interestOps(intops);
 1790               //attachment.setCometOps(intops);
 1791           }
 1792   
 1793           protected void timeout(int keyCount, boolean hasEvents) {
 1794               long now = System.currentTimeMillis();
 1795               //don't process timeouts too frequently, but if the selector simply timed out
 1796               //then we can check timeouts to avoid gaps
 1797               if ( ((keyCount>0 || hasEvents) ||(now < nextExpiration)) && (!close) ) {
 1798                   return;
 1799               }
 1800               long prevExp = nextExpiration; //for logging purposes only
 1801               nextExpiration = now + socketProperties.getTimeoutInterval();
 1802               //timeout
 1803               Set<SelectionKey> keys = selector.keys();
 1804               int keycount = 0;
 1805               for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {
 1806                   SelectionKey key = iter.next();
 1807                   keycount++;
 1808                   try {
 1809                       KeyAttachment ka = (KeyAttachment) key.attachment();
 1810                       if ( ka == null ) {
 1811                           cancelledKey(key, SocketStatus.ERROR,false); //we don't support any keys without attachments
 1812                       } else if ( ka.getError() ) {
 1813                           cancelledKey(key, SocketStatus.ERROR,true);//TODO this is not yet being used
 1814                       } else if (ka.getComet() && ka.getCometNotify() ) {
 1815                           ka.setCometNotify(false);
 1816                           reg(key,ka,0);//avoid multiple calls, this gets reregistered after invokation
 1817                           //if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
 1818                           if (!processSocket(ka.getChannel(), SocketStatus.OPEN)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
 1819                       }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ ||
 1820                       		  (ka.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
 1821                           //only timeout sockets that we are waiting for a read from - or write (send file)
 1822                           long delta = now - ka.getLastAccess();
 1823                           long timeout = (ka.getTimeout()==-1)?((long) socketProperties.getSoTimeout()):(ka.getTimeout());
 1824                           boolean isTimedout = delta > timeout;
 1825                           if ( close ) {
 1826                               key.interestOps(0); 
 1827                               ka.interestOps(0); //avoid duplicate stop calls
 1828                               processKey(key,ka);
 1829                           } else if (isTimedout) {
 1830                               key.interestOps(0); 
 1831                               ka.interestOps(0); //avoid duplicate timeout calls
 1832                               cancelledKey(key, SocketStatus.TIMEOUT,true);
 1833                           } else {
 1834                               long nextTime = now+(timeout-delta);
 1835                               nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration;
 1836                           }
 1837                       }//end if
 1838                   }catch ( CancelledKeyException ckx ) {
 1839                       cancelledKey(key, SocketStatus.ERROR,false);
 1840                   }
 1841               }//for
 1842               if ( log.isDebugEnabled() ) log.debug("timeout completed: keys processed="+keycount+"; now="+now+"; nextExpiration="+prevExp+"; "+
 1843                                                     "keyCount="+keyCount+"; hasEvents="+hasEvents +"; eval="+( (now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
 1844   
 1845           }
 1846       }
 1847   
 1848   // ----------------------------------------------------- Key Attachment Class   
 1849       public static class KeyAttachment {
 1850           
 1851           public KeyAttachment() {
 1852               
 1853           }
 1854           public void reset(Poller poller, NioChannel channel, long soTimeout) {
 1855               this.channel = channel;
 1856               this.poller = poller;
 1857               lastAccess = System.currentTimeMillis();
 1858               currentAccess = false;
 1859               comet = false;
 1860               timeout = soTimeout;
 1861               error = false;
 1862               lastRegistered = 0;
 1863               sendfileData = null;
 1864               if ( readLatch!=null ) try {for (int i=0; i<(int)readLatch.getCount();i++) readLatch.countDown();}catch (Exception ignore){}
 1865               readLatch = null;
 1866               if ( writeLatch!=null ) try {for (int i=0; i<(int)writeLatch.getCount();i++) writeLatch.countDown();}catch (Exception ignore){}
 1867               writeLatch = null;
 1868               cometNotify = false;
 1869               cometOps = SelectionKey.OP_READ;
 1870               sendfileData = null;
 1871           }
 1872           
 1873           public void reset() {
 1874               reset(null,null,-1);
 1875           }
 1876           
 1877           public Poller getPoller() { return poller;}
 1878           public void setPoller(Poller poller){this.poller = poller;}
 1879           public long getLastAccess() { return lastAccess; }
 1880           public void access() { access(System.currentTimeMillis()); }
 1881           public void access(long access) { lastAccess = access; }
 1882           public void setComet(boolean comet) { this.comet = comet; }
 1883           public boolean getComet() { return comet; }
 1884           public void setCometNotify(boolean notify) { this.cometNotify = notify; }
 1885           public boolean getCometNotify() { return cometNotify; }
 1886           public void setCometOps(int ops) { this.cometOps = ops; }
 1887           public int getCometOps() { return cometOps; }
 1888           public boolean getCurrentAccess() { return currentAccess; }
 1889           public void setCurrentAccess(boolean access) { currentAccess = access; }
 1890           public void setTimeout(long timeout) {this.timeout = timeout;}
 1891           public long getTimeout() {return this.timeout;}
 1892           public boolean getError() { return error; }
 1893           public void setError(boolean error) { this.error = error; }
 1894           public NioChannel getChannel() { return channel;}
 1895           public void setChannel(NioChannel channel) { this.channel = channel;}
 1896           protected Poller poller = null;
 1897           protected int interestOps = 0;
 1898           public int interestOps() { return interestOps;}
 1899           public int interestOps(int ops) { this.interestOps  = ops; return ops; }
 1900           public CountDownLatch getReadLatch() { return readLatch; }
 1901           public CountDownLatch getWriteLatch() { return writeLatch; }
 1902           protected CountDownLatch resetLatch(CountDownLatch latch) {
 1903               if ( latch==null || latch.getCount() == 0 ) return null;
 1904               else throw new IllegalStateException("Latch must be at count 0");
 1905           }
 1906           public void resetReadLatch() { readLatch = resetLatch(readLatch); }
 1907           public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
 1908           
 1909           protected CountDownLatch startLatch(CountDownLatch latch, int cnt) { 
 1910               if ( latch == null || latch.getCount() == 0 ) {
 1911                   return new CountDownLatch(cnt);
 1912               }
 1913               else throw new IllegalStateException("Latch must be at count 0 or null.");
 1914           }
 1915           public void startReadLatch(int cnt) { readLatch = startLatch(readLatch,cnt);}
 1916           public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch,cnt);}
 1917           
 1918           
 1919           protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
 1920               if ( latch == null ) throw new IllegalStateException("Latch cannot be null");
 1921               latch.await(timeout,unit);
 1922           }
 1923           public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);}
 1924           public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);}
 1925           
 1926           public long getLastRegistered() { return lastRegistered; };
 1927           public void setLastRegistered(long reg) { lastRegistered = reg; }
 1928           
 1929           public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
 1930           public SendfileData getSendfileData() { return this.sendfileData;}
 1931           
 1932           protected long lastAccess = -1;
 1933           protected boolean currentAccess = false;
 1934           protected boolean comet = false;
 1935           protected int cometOps = SelectionKey.OP_READ;
 1936           protected boolean cometNotify = false;
 1937           protected long timeout = -1;
 1938           protected boolean error = false;
 1939           protected NioChannel channel = null;
 1940           protected CountDownLatch readLatch = null;
 1941           protected CountDownLatch writeLatch = null;
 1942           protected long lastRegistered = 0;
 1943           protected SendfileData sendfileData = null;
 1944       }
 1945       // ----------------------------------------------------- Worker Inner Class
 1946   
 1947   
 1948       /**
 1949        * Server processor class.
 1950        */
 1951       protected class Worker implements Runnable {
 1952   
 1953   
 1954           protected Thread thread = null;
 1955           protected boolean available = false;
 1956           protected Object socket = null;
 1957           protected SocketStatus status = null;
 1958   
 1959   
 1960           /**
 1961            * Process an incoming TCP/IP connection on the specified socket.  Any
 1962            * exception that occurs during processing must be logged and swallowed.
 1963            * <b>NOTE</b>:  This method is called from our Connector's thread.  We
 1964            * must assign it to our own thread so that multiple simultaneous
 1965            * requests can be handled.
 1966            *
 1967            * @param socket TCP socket to process
 1968            */
 1969           protected synchronized void assign(Object socket) {
 1970   
 1971               // Wait for the Processor to get the previous Socket
 1972               while (available) {
 1973                   try {
 1974                       wait();
 1975                   } catch (InterruptedException e) {
 1976                   }
 1977               }
 1978               // Store the newly available Socket and notify our thread
 1979               this.socket = socket;
 1980               status = null;
 1981               available = true;
 1982               notifyAll();
 1983   
 1984           }
 1985   
 1986   
 1987           protected synchronized void assign(Object socket, SocketStatus status) {
 1988   
 1989               // Wait for the Processor to get the previous Socket
 1990               while (available) {
 1991                   try {
 1992                       wait();
 1993                   } catch (InterruptedException e) {
 1994                   }
 1995               }
 1996   
 1997               // Store the newly available Socket and notify our thread
 1998               this.socket = socket;
 1999               this.status = status;
 2000               available = true;
 2001               notifyAll();
 2002           }
 2003   
 2004   
 2005           /**
 2006            * Await a newly assigned Socket from our Connector, or <code>null</code>
 2007            * if we are supposed to shut down.
 2008            */
 2009           protected synchronized Object await() {
 2010   
 2011               // Wait for the Connector to provide a new Socket
 2012               while (!available) {
 2013                   try {
 2014                       wait();
 2015                   } catch (InterruptedException e) {
 2016                   }
 2017               }
 2018   
 2019               // Notify the Connector that we have received this Socket
 2020               Object socket = this.socket;
 2021               available = false;
 2022               notifyAll();
 2023   
 2024               return (socket);
 2025   
 2026           }
 2027   
 2028   
 2029           /**
 2030            * The background thread that listens for incoming TCP/IP connections and
 2031            * hands them off to an appropriate processor.
 2032            */
 2033           public void run() {
 2034   
 2035               // Process requests until we receive a shutdown signal
 2036               while (running) {
 2037                   NioChannel socket = null;
 2038                   SelectionKey key = null;
 2039                   try {
 2040                       // Wait for the next socket to be assigned
 2041                       Object channel = await();
 2042                       if (channel == null)
 2043                           continue;
 2044   
 2045                       if ( channel instanceof SocketChannel) {
 2046                           SocketChannel sc = (SocketChannel)channel;
 2047                           if ( !setSocketOptions(sc) ) {
 2048                               try {
 2049                                   sc.socket().close();
 2050                                   sc.close();
 2051                               }catch ( IOException ix ) {
 2052                                   if ( log.isDebugEnabled() ) log.debug("",ix);
 2053                               }
 2054                           } else {
 2055                               //now we have it registered, remove it from the cache
 2056                               
 2057                           }
 2058                       } else {
 2059                           socket = (NioChannel)channel;
 2060                           SocketProcessor sc = processorCache.poll();
 2061                           if ( sc == null ) sc = new SocketProcessor(socket,status);
 2062                           else sc.reset(socket,status);
 2063                           sc.run();
 2064                       }
 2065                   }catch(CancelledKeyException cx) {
 2066                       if (socket!=null && key!=null) socket.getPoller().cancelledKey(key,null,false);
 2067                   } catch (OutOfMemoryError oom) {
 2068                       try {
 2069                           oomParachuteData = null;
 2070                           releaseCaches();
 2071                           log.error("", oom);
 2072                       }catch ( Throwable oomt ) {
 2073                           try {
 2074                               System.err.println(oomParachuteMsg);
 2075                               oomt.printStackTrace();
 2076                           }catch (Throwable letsHopeWeDontGetHere){}
 2077                       }
 2078                   } finally {
 2079                       //dereference socket to let GC do its job
 2080                       socket = null;
 2081                       // Finish up this request
 2082                       recycleWorkerThread(this);
 2083                   }
 2084               }
 2085           }
 2086   
 2087   
 2088           /**
 2089            * Start the background processing thread.
 2090            */
 2091           public void start() {
 2092               thread = new Thread(this);
 2093               thread.setName(getName() + "-" + (++curThreads));
 2094               thread.setDaemon(true);
 2095               thread.setPriority(getThreadPriority());
 2096               thread.start();
 2097           }
 2098   
 2099   
 2100       }
 2101   
 2102       // ------------------------------------------------ Application Buffer Handler
 2103       public class NioBufferHandler implements ApplicationBufferHandler {
 2104           protected ByteBuffer readbuf = null;
 2105           protected ByteBuffer writebuf = null;
 2106           
 2107           public NioBufferHandler(int readsize, int writesize, boolean direct) {
 2108               if ( direct ) {
 2109                   readbuf = ByteBuffer.allocateDirect(readsize);
 2110                   writebuf = ByteBuffer.allocateDirect(writesize);
 2111               }else {
 2112                   readbuf = ByteBuffer.allocate(readsize);
 2113                   writebuf = ByteBuffer.allocate(writesize);
 2114               }
 2115           }
 2116           
 2117           public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;}
 2118           public ByteBuffer getReadBuffer() {return readbuf;}
 2119           public ByteBuffer getWriteBuffer() {return writebuf;}
 2120   
 2121       }
 2122   
 2123       // ------------------------------------------------ Handler Inner Interface
 2124   
 2125   
 2126       /**
 2127        * Bare bones interface used for socket processing. Per thread data is to be
 2128        * stored in the ThreadWithAttributes extra folders, or alternately in
 2129        * thread local fields.
 2130        */
 2131       public interface Handler {
 2132           public enum SocketState {
 2133               OPEN, CLOSED, LONG
 2134           }
 2135           public SocketState process(NioChannel socket);
 2136           public SocketState event(NioChannel socket, SocketStatus status);
 2137           public void releaseCaches();
 2138           public void release(NioChannel socket);
 2139       }
 2140   
 2141   
 2142       // ------------------------------------------------- WorkerStack Inner Class
 2143   
 2144   
 2145       public class WorkerStack {
 2146   
 2147           protected Worker[] workers = null;
 2148           protected int end = 0;
 2149   
 2150           public WorkerStack(int size) {
 2151               workers = new Worker[size];
 2152           }
 2153   
 2154           /** 
 2155            * Put the object into the queue. If the queue is full (for example if
 2156            * the queue has been reduced in size) the object will be dropped.
 2157            * 
 2158            * @param   object  the object to be appended to the queue (first
 2159            *                  element).
 2160            */
 2161           public void push(Worker worker) {
 2162               if (end < workers.length) {
 2163                   workers[end++] = worker;
 2164               } else {
 2165                   curThreads--;
 2166               }
 2167           }
 2168   
 2169           /**
 2170            * Get the first object out of the queue. Return null if the queue
 2171            * is empty. 
 2172            */
 2173           public Worker pop() {
 2174               if (end > 0) {
 2175                   return workers[--end];
 2176               }
 2177               return null;
 2178           }
 2179   
 2180           /**
 2181            * Get the first object out of the queue, Return null if the queue
 2182            * is empty.
 2183            */
 2184           public Worker peek() {
 2185               return workers[end];
 2186           }
 2187   
 2188           /**
 2189            * Is the queue empty?
 2190            */
 2191           public boolean isEmpty() {
 2192               return (end == 0);
 2193           }
 2194   
 2195           /**
 2196            * How many elements are there in this queue?
 2197            */
 2198           public int size() {
 2199               return (end);
 2200           }
 2201           
 2202           /**
 2203            * Resize the queue. If there are too many objects in the queue for the
 2204            * new size, drop the excess.
 2205            * 
 2206            * @param newSize
 2207            */
 2208           public void resize(int newSize) {
 2209               Worker[] newWorkers = new Worker[newSize];
 2210               int len = workers.length;
 2211               if (newSize < len) {
 2212                   len = newSize;
 2213               }
 2214               System.arraycopy(workers, 0, newWorkers, 0, len);
 2215               workers = newWorkers;
 2216           }
 2217       }
 2218   
 2219   
 2220       // ---------------------------------------------- SocketProcessor Inner Class
 2221   
 2222   
 2223       /**
 2224        * This class is the equivalent of the Worker, but will simply use in an
 2225        * external Executor thread pool.
 2226        */
 2227       protected class SocketProcessor implements Runnable {
 2228   
 2229           protected NioChannel socket = null;
 2230           protected SocketStatus status = null; 
 2231   
 2232           public SocketProcessor(NioChannel socket, SocketStatus status) {
 2233               reset(socket,status);
 2234           }
 2235           
 2236           public void reset(NioChannel socket, SocketStatus status) {
 2237               this.socket = socket;
 2238               this.status = status;
 2239           }
 2240            
 2241           public void run() {
 2242               NioEndpoint.this.activeSocketProcessors.addAndGet(1);
 2243               SelectionKey key = null;
 2244               try {
 2245                   key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
 2246                   int handshake = -1;
 2247                   
 2248                   try {
 2249                       if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
 2250                   }catch ( IOException x ) {
 2251                       handshake = -1;
 2252                       if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
 2253                   }catch ( CancelledKeyException ckx ) {
 2254                       handshake = -1;
 2255                   }
 2256                   if ( handshake == 0 ) {
 2257                       // Process the request from this socket
 2258                       boolean closed = (status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) :
 2259                           (handler.event(socket,status)==Handler.SocketState.CLOSED);
 2260   
 2261                       if (closed) {
 2262                           // Close socket and pool
 2263                           try {
 2264                               KeyAttachment ka = null;
 2265                               if (key!=null) {
 2266                                   ka = (KeyAttachment) key.attachment();
 2267                                   if (ka!=null) ka.setComet(false);
 2268                                   socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
 2269                               }
 2270                               if (socket!=null) nioChannels.offer(socket);
 2271                               socket = null;
 2272                               if ( ka!=null ) keyCache.offer(ka);
 2273                               ka = null;
 2274                           }catch ( Exception x ) {
 2275                               log.error("",x);
 2276                           }
 2277                       } 
 2278                   } else if (handshake == -1 ) {
 2279                       KeyAttachment ka = null;
 2280                       if (key!=null) {
 2281                           ka = (KeyAttachment) key.attachment();
 2282                           socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
 2283                       }
 2284                       if (socket!=null) nioChannels.offer(socket);
 2285                       socket = null;
 2286                       if ( ka!=null ) keyCache.offer(ka);
 2287                       ka = null;
 2288                   } else {
 2289                       final SelectionKey fk = key;
 2290                       final int intops = handshake;
 2291                       final KeyAttachment ka = (KeyAttachment)fk.attachment();
 2292                       ka.getPoller().add(socket,intops);
 2293                   }
 2294               }catch(CancelledKeyException cx) {
 2295                   socket.getPoller().cancelledKey(key,null,false);
 2296               } catch (OutOfMemoryError oom) {
 2297                   try {
 2298                       oomParachuteData = null;
 2299                       socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
 2300                       releaseCaches();
 2301                       log.error("", oom);
 2302                   }catch ( Throwable oomt ) {
 2303                       try {
 2304                           System.err.println(oomParachuteMsg);
 2305                           oomt.printStackTrace();
 2306                       }catch (Throwable letsHopeWeDontGetHere){}
 2307                   }
 2308               }catch ( Throwable t ) {
 2309                   log.error("",t);
 2310                   socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
 2311               } finally {
 2312                   socket = null;
 2313                   status = null;
 2314                   //return to cache
 2315                   processorCache.offer(this);
 2316                   NioEndpoint.this.activeSocketProcessors.addAndGet(-1);            }
 2317           }
 2318   
 2319       }
 2320       
 2321       // ---------------------------------------------- TaskQueue Inner Class
 2322       public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
 2323           ThreadPoolExecutor parent = null;
 2324           NioEndpoint endpoint = null;
 2325           
 2326           public TaskQueue() {
 2327               super();
 2328           }
 2329   
 2330           public TaskQueue(int initialCapacity) {
 2331               super(initialCapacity);
 2332           }
 2333    
 2334           public TaskQueue(Collection<? extends Runnable> c) {
 2335               super(c);
 2336           }
 2337   
 2338           
 2339           public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
 2340               parent = tp;
 2341               this.endpoint = ep;
 2342           }
 2343           
 2344           public boolean offer(Runnable o) {
 2345               //we can't do any checks
 2346               if (parent==null) return super.offer(o);
 2347               //we are maxed out on threads, simply queue the object
 2348               if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
 2349               //we have idle threads, just add it to the queue
 2350               //this is an approximation, so it could use some tuning
 2351               if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
 2352               //if we have less threads than maximum force creation of a new thread
 2353               if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
 2354               //if we reached here, we need to add it to the queue
 2355               return super.offer(o);
 2356           }
 2357       }
 2358   
 2359       // ---------------------------------------------- ThreadFactory Inner Class
 2360       class TaskThreadFactory implements ThreadFactory {
 2361           final ThreadGroup group;
 2362           final AtomicInteger threadNumber = new AtomicInteger(1);
 2363           final String namePrefix;
 2364   
 2365           TaskThreadFactory(String namePrefix) {
 2366               SecurityManager s = System.getSecurityManager();
 2367               group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
 2368               this.namePrefix = namePrefix;
 2369           }
 2370   
 2371           public Thread newThread(Runnable r) {
 2372               Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
 2373               t.setDaemon(daemon);
 2374               t.setPriority(getThreadPriority());
 2375               return t;
 2376           }
 2377       }
 2378       
 2379       // ----------------------------------------------- SendfileData Inner Class
 2380   
 2381   
 2382       /**
 2383        * SendfileData class.
 2384        */
 2385       public static class SendfileData {
 2386           // File
 2387           public String fileName;
 2388           public FileChannel fchannel;
 2389           public long pos;
 2390           public long length;
 2391           // KeepAlive flag
 2392           public boolean keepAlive;
 2393       }
 2394   
 2395   }

Home » apache-tomcat-6.0.26-src » org.apache » tomcat » util » net » [javadoc | source]