Save This Page
Home » apache-tomcat-6.0.26-src » org.apache.jk » common » [javadoc | source]
    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one or more
    3    *  contributor license agreements.  See the NOTICE file distributed with
    4    *  this work for additional information regarding copyright ownership.
    5    *  The ASF licenses this file to You under the Apache License, Version 2.0
    6    *  (the "License"); you may not use this file except in compliance with
    7    *  the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   
   18   package org.apache.jk.common;
   19   
   20   import java.io.BufferedInputStream;
   21   import java.io.BufferedOutputStream;
   22   import java.io.IOException;
   23   import java.io.InputStream;
   24   import java.io.OutputStream;
   25   import java.net.URLEncoder;
   26   import java.net.InetAddress;
   27   import java.net.ServerSocket;
   28   import java.net.Socket;
   29   import java.net.SocketException;
   30   
   31   import javax.management.ListenerNotFoundException;
   32   import javax.management.MBeanNotificationInfo;
   33   import javax.management.Notification;
   34   import javax.management.NotificationBroadcaster;
   35   import javax.management.NotificationBroadcasterSupport;
   36   import javax.management.NotificationFilter;
   37   import javax.management.NotificationListener;
   38   import javax.management.ObjectName;
   39   
   40   import org.apache.jk.core.JkHandler;
   41   import org.apache.jk.core.Msg;
   42   import org.apache.jk.core.MsgContext;
   43   import org.apache.jk.core.JkChannel;
   44   import org.apache.jk.core.WorkerEnv;
   45   import org.apache.coyote.Request;
   46   import org.apache.coyote.RequestGroupInfo;
   47   import org.apache.coyote.RequestInfo;
   48   import org.apache.coyote.ActionCode;
   49   import org.apache.tomcat.util.modeler.Registry;
   50   import org.apache.tomcat.util.threads.ThreadPool;
   51   import org.apache.tomcat.util.threads.ThreadPoolRunnable;
   52   
   53   /** 
   54    * Accept ( and send ) TCP messages.
   55    *
   56    * @author Costin Manolache
   57    * @author Bill Barker
   58    * jmx:mbean name="jk:service=ChannelNioSocket"
   59    *            description="Accept socket connections"
   60    * jmx:notification name="org.apache.coyote.INVOKE
   61    * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
   62    * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
   63    * jmx:notification-handler name="org.apache.jk.JK_FLUSH
   64    *
   65    * Jk can use multiple protocols/transports.
   66    * Various container adapters should load this object ( as a bean ),
   67    * set configurations and use it. Note that the connector will handle
   68    * all incoming protocols - it's not specific to ajp1x. The protocol
   69    * is abstracted by MsgContext/Message/Channel.
   70    *
   71    * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
   72    * TCP, Ajp14 API etc.
   73    * As we add other protocols/transports/APIs this will change, the current goal
   74    * is to get the same level of functionality as in the original jk connector.
   75    *
   76    * XXX Make the 'message type' pluggable
   77    */
   78   public class ChannelSocket extends JkHandler
   79       implements NotificationBroadcaster, JkChannel {
   80       private static org.apache.juli.logging.Log log =
   81           org.apache.juli.logging.LogFactory.getLog( ChannelSocket.class );
   82   
   83       private int startPort=8009;
   84       private int maxPort=8019; // 0 for backward compat.
   85       private int port=startPort;
   86       private InetAddress inet;
   87       private int serverTimeout;
   88       private boolean tcpNoDelay=true; // nodelay to true by default
   89       private int linger=100;
   90       private int socketTimeout;
   91       private int bufferSize = -1;
   92       private int packetSize = AjpConstants.MAX_PACKET_SIZE;
   93   
   94       private long requestCount=0;
   95       
   96       ThreadPool tp=ThreadPool.createThreadPool(true);
   97   
   98       /* ==================== Tcp socket options ==================== */
   99   
  100       /**
  101        * jmx:managed-constructor description="default constructor"
  102        */
  103       public ChannelSocket() {
  104           // This should be integrated with the  domain setup
  105       }
  106       
  107       public ThreadPool getThreadPool() {
  108           return tp;
  109       }
  110   
  111       public long getRequestCount() {
  112           return requestCount;
  113       }
  114       
  115       /** Set the port for the ajp13 channel.
  116        *  To support seemless load balancing and jni, we treat this
  117        *  as the 'base' port - we'll try up until we find one that is not
  118        *  used. We'll also provide the 'difference' to the main coyote
  119        *  handler - that will be our 'sessionID' and the position in
  120        *  the scoreboard and the suffix for the unix domain socket.
  121        *
  122        * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
  123        */
  124       public void setPort( int port ) {
  125           this.startPort=port;
  126           this.port=port;
  127           this.maxPort=port+10;
  128       }
  129   
  130       public int getPort() {
  131           return port;
  132       }
  133   
  134       public void setAddress(InetAddress inet) {
  135           this.inet=inet;
  136       }
  137   
  138       /**
  139        * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
  140        */
  141       public void setAddress(String inet) {
  142           try {
  143               this.inet= InetAddress.getByName( inet );
  144           } catch( Exception ex ) {
  145               log.error("Error parsing "+inet,ex);
  146           }
  147       }
  148   
  149       public String getAddress() {
  150           if( inet!=null)
  151               return inet.toString();
  152           return "/0.0.0.0";
  153       }
  154   
  155       /**
  156        * Sets the timeout in ms of the server sockets created by this
  157        * server. This method allows the developer to make servers
  158        * more or less responsive to having their server sockets
  159        * shut down.
  160        *
  161        * <p>By default this value is 1000ms.
  162        */
  163       public void setServerTimeout(int timeout) {
  164   	this.serverTimeout = timeout;
  165       }
  166       public int getServerTimeout() {
  167           return serverTimeout;
  168       }
  169   
  170       public void setTcpNoDelay( boolean b ) {
  171   	tcpNoDelay=b;
  172       }
  173   
  174       public boolean getTcpNoDelay() {
  175           return tcpNoDelay;
  176       }
  177       
  178       public void setSoLinger( int i ) {
  179   	linger=i;
  180       }
  181   
  182       public int getSoLinger() {
  183           return linger;
  184       }
  185       
  186       public void setSoTimeout( int i ) {
  187   	socketTimeout=i;
  188       }
  189   
  190       public int getSoTimeout() {
  191   	return socketTimeout;
  192       }
  193   
  194       public void setMaxPort( int i ) {
  195           maxPort=i;
  196       }
  197   
  198       public int getMaxPort() {
  199           return maxPort;
  200       }
  201   
  202       public void setBufferSize(int bs) {
  203           bufferSize = bs;
  204       }
  205   
  206       public int getBufferSize() {
  207           return bufferSize;
  208       }
  209   
  210       public void setPacketSize(int ps) {
  211           if(ps < AjpConstants.MAX_PACKET_SIZE) {
  212               ps = AjpConstants.MAX_PACKET_SIZE;
  213           }
  214           packetSize = ps;
  215       }
  216   
  217       public int getPacketSize() {
  218           return packetSize;
  219       }
  220   
  221       /** At startup we'll look for the first free port in the range.
  222           The difference between this port and the beggining of the range
  223           is the 'id'.
  224           This is usefull for lb cases ( less config ).
  225       */
  226       public int getInstanceId() {
  227           return port-startPort;
  228       }
  229   
  230       /** If set to false, the thread pool will be created in
  231        *  non-daemon mode, and will prevent main from exiting
  232        */
  233       public void setDaemon( boolean b ) {
  234           tp.setDaemon( b );
  235       }
  236   
  237       public boolean getDaemon() {
  238           return tp.getDaemon();
  239       }
  240   
  241   
  242       public void setMaxThreads( int i ) {
  243           if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
  244           tp.setMaxThreads(i);
  245       }
  246       
  247       public void setMinSpareThreads( int i ) {
  248           if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
  249           tp.setMinSpareThreads(i);
  250       }
  251       
  252       public void setMaxSpareThreads( int i ) {
  253           if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
  254           tp.setMaxSpareThreads(i);
  255       }
  256   
  257       public int getMaxThreads() {
  258           return tp.getMaxThreads();   
  259       }
  260       
  261       public int getMinSpareThreads() {
  262           return tp.getMinSpareThreads();   
  263       }
  264   
  265       public int getMaxSpareThreads() {
  266           return tp.getMaxSpareThreads();   
  267       }
  268   
  269       public void setBacklog(int i) {
  270       }
  271       
  272       
  273       /* ==================== ==================== */
  274       ServerSocket sSocket;
  275       final int socketNote=1;
  276       final int isNote=2;
  277       final int osNote=3;
  278       final int notifNote=4;
  279       boolean paused = false;
  280   
  281       public void pause() throws Exception {
  282           synchronized(this) {
  283               paused = true;
  284               unLockSocket();
  285           }
  286       }
  287   
  288       public void resume() throws Exception {
  289           synchronized(this) {
  290               paused = false;
  291               notify();
  292           }
  293       }
  294   
  295   
  296       public void accept( MsgContext ep ) throws IOException {
  297           if( sSocket==null ) return;
  298           synchronized(this) {
  299               while(paused) {
  300                   try{ 
  301                       wait();
  302                   } catch(InterruptedException ie) {
  303                       //Ignore, since can't happen
  304                   }
  305               }
  306           }
  307           Socket s=sSocket.accept();
  308           ep.setNote( socketNote, s );
  309           if(log.isDebugEnabled() )
  310               log.debug("Accepted socket " + s );
  311   
  312           try {
  313               setSocketOptions(s);
  314           } catch(SocketException sex) {
  315               log.debug("Error initializing Socket Options", sex);
  316           }
  317           
  318           requestCount++;
  319   
  320           InputStream is=new BufferedInputStream(s.getInputStream());
  321           OutputStream os;
  322           if( bufferSize > 0 )
  323               os = new BufferedOutputStream( s.getOutputStream(), bufferSize);
  324           else
  325               os = s.getOutputStream();
  326           ep.setNote( isNote, is );
  327           ep.setNote( osNote, os );
  328           ep.setControl( tp );
  329       }
  330   
  331       private void setSocketOptions(Socket s) throws SocketException {
  332           if( socketTimeout > 0 ) 
  333               s.setSoTimeout( socketTimeout );
  334           
  335           s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state
  336   
  337           if( linger > 0 )
  338               s.setSoLinger( true, linger);
  339       }
  340   
  341       public void resetCounters() {
  342           requestCount=0;
  343       }
  344   
  345       /** Called after you change some fields at runtime using jmx.
  346           Experimental for now.
  347       */
  348       public void reinit() throws IOException {
  349           destroy();
  350           init();
  351       }
  352   
  353       /**
  354        * jmx:managed-operation
  355        */
  356       public void init() throws IOException {
  357           // Find a port.
  358           if (startPort == 0) {
  359               port = 0;
  360               if(log.isInfoEnabled())
  361                   log.info("JK: ajp13 disabling channelSocket");
  362               running = true;
  363               return;
  364           }
  365           if (maxPort < startPort)
  366               maxPort = startPort;
  367           for( int i=startPort; i<=maxPort; i++ ) {
  368               try {
  369                   if( inet == null ) {
  370                       sSocket = new ServerSocket( i, 0 );
  371                   } else {
  372                       sSocket=new ServerSocket( i, 0, inet );
  373                   }
  374                   port=i;
  375                   break;
  376               } catch( IOException ex ) {
  377                   if(log.isInfoEnabled())
  378                       log.info("Port busy " + i + " " + ex.toString());
  379                   continue;
  380               }
  381           }
  382   
  383           if( sSocket==null ) {
  384               log.error("Can't find free port " + startPort + " " + maxPort );
  385               return;
  386           }
  387           if(log.isInfoEnabled())
  388               log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
  389   
  390           // If this is not the base port and we are the 'main' channleSocket and
  391           // SHM didn't already set the localId - we'll set the instance id
  392           if( "channelSocket".equals( name ) &&
  393               port != startPort &&
  394               (wEnv.getLocalId()==0) ) {
  395               wEnv.setLocalId(  port - startPort );
  396           }
  397           if( serverTimeout > 0 )
  398               sSocket.setSoTimeout( serverTimeout );
  399   
  400           // XXX Reverse it -> this is a notification generator !!
  401           if( next==null && wEnv!=null ) {
  402               if( nextName!=null )
  403                   setNext( wEnv.getHandler( nextName ) );
  404               if( next==null )
  405                   next=wEnv.getHandler( "dispatch" );
  406               if( next==null )
  407                   next=wEnv.getHandler( "request" );
  408           }
  409           JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
  410           running = true;
  411   
  412           // Run a thread that will accept connections.
  413           // XXX Try to find a thread first - not sure how...
  414           if( this.domain != null ) {
  415               try {
  416                   tpOName=new ObjectName(domain + ":type=ThreadPool,name=" + 
  417                                          getChannelName());
  418   
  419                   Registry.getRegistry(null, null)
  420                       .registerComponent(tp, tpOName, null);
  421   
  422                   rgOName = new ObjectName
  423                       (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
  424                   Registry.getRegistry(null, null)
  425                       .registerComponent(global, rgOName, null);
  426               } catch (Exception e) {
  427                   log.error("Can't register threadpool" );
  428               }
  429           }
  430   
  431           tp.start();
  432           SocketAcceptor acceptAjp=new SocketAcceptor(  this );
  433           tp.runIt( acceptAjp);
  434   
  435       }
  436   
  437       ObjectName tpOName;
  438       ObjectName rgOName;
  439       RequestGroupInfo global=new RequestGroupInfo();
  440       int JMXRequestNote;
  441   
  442       public void start() throws IOException{
  443           if( sSocket==null )
  444               init();
  445       }
  446   
  447       public void stop() throws IOException {
  448           destroy();
  449       }
  450   
  451       public void registerRequest(Request req, MsgContext ep, int count) {
  452           if(this.domain != null) {
  453               try {
  454                   RequestInfo rp=req.getRequestProcessor();
  455                   rp.setGlobalProcessor(global);
  456                   ObjectName roname = new ObjectName
  457                       (getDomain() + ":type=RequestProcessor,worker="+
  458                        getChannelName()+",name=JkRequest" +count);
  459                   ep.setNote(JMXRequestNote, roname);
  460                           
  461                   Registry.getRegistry(null, null).registerComponent( rp, roname, null);
  462               } catch( Exception ex ) {
  463                   log.warn("Error registering request");
  464               }
  465           }
  466       }
  467   
  468       public void open(MsgContext ep) throws IOException {
  469       }
  470   
  471       
  472       public void close(MsgContext ep) throws IOException {
  473           Socket s=(Socket)ep.getNote( socketNote );
  474           s.close();
  475       }
  476   
  477       private void unLockSocket() throws IOException {
  478           // Need to create a connection to unlock the accept();
  479           Socket s;
  480           InetAddress ladr = inet;
  481   
  482           if(port == 0)
  483               return;
  484           if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
  485               ladr = InetAddress.getLocalHost();
  486           }
  487           s=new Socket(ladr, port );
  488           // setting soLinger to a small value will help shutdown the
  489           // connection quicker
  490           s.setSoLinger(true, 0);
  491   
  492   	s.close();
  493       }
  494   
  495       public void destroy() throws IOException {
  496           running = false;
  497           try {
  498               /* If we disabled the channel return */
  499               if (port == 0)
  500                   return;
  501               tp.shutdown();
  502   
  503   	    if(!paused) {
  504   		unLockSocket();
  505   	    }
  506   
  507               sSocket.close(); // XXX?
  508               
  509               if( tpOName != null )  {
  510                   Registry.getRegistry(null, null).unregisterComponent(tpOName);
  511               }
  512               if( rgOName != null ) {
  513                   Registry.getRegistry(null, null).unregisterComponent(rgOName);
  514               }
  515           } catch(Exception e) {
  516               log.info("Error shutting down the channel " + port + " " +
  517                       e.toString());
  518               if( log.isDebugEnabled() ) log.debug("Trace", e);
  519           }
  520       }
  521   
  522       public int send( Msg msg, MsgContext ep)
  523           throws IOException    {
  524           msg.end(); // Write the packet header
  525           byte buf[]=msg.getBuffer();
  526           int len=msg.getLen();
  527           
  528           if(log.isTraceEnabled() )
  529               log.trace("send() " + len + " " + buf[4] );
  530   
  531           OutputStream os=(OutputStream)ep.getNote( osNote );
  532           os.write( buf, 0, len );
  533           return len;
  534       }
  535   
  536       public int flush( Msg msg, MsgContext ep)
  537           throws IOException    {
  538           if( bufferSize > 0 ) {
  539               OutputStream os=(OutputStream)ep.getNote( osNote );
  540               os.flush();
  541           }
  542           return 0;
  543       }
  544   
  545       public int receive( Msg msg, MsgContext ep )
  546           throws IOException    {
  547           if (log.isDebugEnabled()) {
  548               log.debug("receive() ");
  549           }
  550   
  551           byte buf[]=msg.getBuffer();
  552           int hlen=msg.getHeaderLength();
  553           
  554   	// XXX If the length in the packet header doesn't agree with the
  555   	// actual number of bytes read, it should probably return an error
  556   	// value.  Also, callers of this method never use the length
  557   	// returned -- should probably return true/false instead.
  558   
  559           int rd = this.read(ep, buf, 0, hlen );
  560           
  561           if(rd < 0) {
  562               // Most likely normal apache restart.
  563               // log.warn("Wrong message " + rd );
  564               return rd;
  565           }
  566   
  567           msg.processHeader();
  568   
  569           /* After processing the header we know the body
  570              length
  571           */
  572           int blen=msg.getLen();
  573           
  574   	// XXX check if enough space - it's assert()-ed !!!
  575           
  576    	int total_read = 0;
  577           
  578           total_read = this.read(ep, buf, hlen, blen);
  579           
  580           if ((total_read <= 0) && (blen > 0)) {
  581               log.warn("can't read body, waited #" + blen);
  582               return  -1;
  583           }
  584           
  585           if (total_read != blen) {
  586                log.warn( "incomplete read, waited #" + blen +
  587                           " got only " + total_read);
  588               return -2;
  589           }
  590           
  591   	return total_read;
  592       }
  593       
  594       /**
  595        * Read N bytes from the InputStream, and ensure we got them all
  596        * Under heavy load we could experience many fragmented packets
  597        * just read Unix Network Programming to recall that a call to
  598        * read didn't ensure you got all the data you want
  599        *
  600        * from read() Linux manual
  601        *
  602        * On success, the number of bytes read is returned (zero indicates end
  603        * of file),and the file position is advanced by this number.
  604        * It is not an error if this number is smaller than the number of bytes
  605        * requested; this may happen for example because fewer bytes
  606        * are actually available right now (maybe because we were close to
  607        * end-of-file, or because we are reading from a pipe, or  from  a
  608        * terminal),  or  because  read()  was interrupted by a signal.
  609        * On error, -1 is returned, and errno is set appropriately. In this
  610        * case it is left unspecified whether the file position (if any) changes.
  611        *
  612        **/
  613       public int read( MsgContext ep, byte[] b, int offset, int len)
  614           throws IOException    {
  615           InputStream is=(InputStream)ep.getNote( isNote );
  616           int pos = 0;
  617           int got;
  618   
  619           while(pos < len) {
  620               try {
  621                   got = is.read(b, pos + offset, len - pos);
  622               } catch(SocketException sex) {
  623                   if(pos > 0) {
  624                       log.info("Error reading data after "+pos+"bytes",sex);
  625                   } else {
  626                       log.debug("Error reading data", sex);
  627                   }
  628                   got = -1;
  629               }
  630               if (log.isTraceEnabled()) {
  631                   log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
  632                             offset + " " + len + " = " + got );
  633               }
  634   
  635               // connection just closed by remote. 
  636               if (got <= 0) {
  637                   // This happens periodically, as apache restarts
  638                   // periodically.
  639                   // It should be more gracefull ! - another feature for Ajp14
  640                   // log.warn( "server has closed the current connection (-1)" );
  641                   return -3;
  642               }
  643   
  644               pos += got;
  645           }
  646           return pos;
  647       }
  648       
  649       protected boolean running=true;
  650       
  651       /** Accept incoming connections, dispatch to the thread pool
  652        */
  653       void acceptConnections() {
  654           if( log.isDebugEnabled() )
  655               log.debug("Accepting ajp connections on " + port);
  656           while( running ) {
  657   	    try{
  658                   MsgContext ep=createMsgContext(packetSize);
  659                   ep.setSource(this);
  660                   ep.setWorkerEnv( wEnv );
  661                   this.accept(ep);
  662   
  663                   if( !running ) break;
  664                   
  665                   // Since this is a long-running connection, we don't care
  666                   // about the small GC
  667                   SocketConnection ajpConn=
  668                       new SocketConnection(this, ep);
  669                   tp.runIt( ajpConn );
  670   	    }catch(Exception ex) {
  671                   if (running)
  672                       log.warn("Exception executing accept" ,ex);
  673   	    }
  674           }
  675       }
  676   
  677       /** Process a single ajp connection.
  678        */
  679       void processConnection(MsgContext ep) {
  680           try {
  681               MsgAjp recv=new MsgAjp(packetSize);
  682               while( running ) {
  683                   if(paused) { // Drop the connection on pause
  684                       break;
  685                   }
  686                   int status= this.receive( recv, ep );
  687                   if( status <= 0 ) {
  688                       if( status==-3)
  689                           log.debug( "server has been restarted or reset this connection" );
  690                       else 
  691                           log.warn("Closing ajp connection " + status );
  692                       break;
  693                   }
  694                   ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
  695                   
  696                   ep.setType( 0 );
  697                   // Will call next
  698                   status= this.invoke( recv, ep );
  699                   if( status!= JkHandler.OK ) {
  700                       log.warn("processCallbacks status " + status );
  701                       ep.action(ActionCode.ACTION_CLOSE, ep.getRequest().getResponse());
  702                       break;
  703                   }
  704               }
  705           } catch( Exception ex ) {
  706               String msg = ex.getMessage();
  707               if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
  708                   log.debug( "Server has been restarted or reset this connection");
  709               else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
  710                   log.debug( "connection timeout reached");            
  711               else
  712                   log.error( "Error, processing connection", ex);
  713           } finally {
  714   	    	/*
  715   	    	 * Whatever happened to this connection (remote closed it, timeout, read error)
  716   	    	 * the socket SHOULD be closed, or we may be in situation where the webserver
  717   	    	 * will continue to think the socket is still open and will forward request
  718   	    	 * to tomcat without receiving ever a reply
  719   	    	 */
  720               try {
  721                   this.close( ep );
  722               }
  723               catch( Exception e) {
  724                   log.error( "Error, closing connection", e);
  725               }
  726               try{
  727                   Request req = (Request)ep.getRequest();
  728                   if( req != null ) {
  729                       ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
  730                       if( roname != null ) {
  731                           Registry.getRegistry(null, null).unregisterComponent(roname);
  732                       }
  733                       req.getRequestProcessor().setGlobalProcessor(null);
  734                   }
  735               } catch( Exception ee) {
  736                   log.error( "Error, releasing connection",ee);
  737               }
  738           }
  739       }
  740   
  741       // XXX This should become handleNotification
  742       public int invoke( Msg msg, MsgContext ep ) throws IOException {
  743           int type=ep.getType();
  744   
  745           switch( type ) {
  746           case JkHandler.HANDLE_RECEIVE_PACKET:
  747               if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
  748               return receive( msg, ep );
  749           case JkHandler.HANDLE_SEND_PACKET:
  750               return send( msg, ep );
  751           case JkHandler.HANDLE_FLUSH:
  752               return flush( msg, ep );
  753           }
  754   
  755           if( log.isDebugEnabled() )
  756               log.debug("Call next " + type + " " + next);
  757   
  758           // Send notification
  759           if( nSupport!=null ) {
  760               Notification notif=(Notification)ep.getNote(notifNote);
  761               if( notif==null ) {
  762                   notif=new Notification("channelSocket.message", ep, requestCount );
  763                   ep.setNote( notifNote, notif);
  764               }
  765               nSupport.sendNotification(notif);
  766           }
  767   
  768           if( next != null ) {
  769               return next.invoke( msg, ep );
  770           } else {
  771               log.info("No next ");
  772           }
  773   
  774           return OK;
  775       }
  776       
  777       public boolean isSameAddress(MsgContext ep) {
  778           Socket s=(Socket)ep.getNote( socketNote );
  779           return isSameAddress( s.getLocalAddress(), s.getInetAddress());
  780       }
  781       
  782       public String getChannelName() {
  783           String encodedAddr = "";
  784           if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
  785               encodedAddr = getAddress();
  786               if (encodedAddr.startsWith("/"))
  787                   encodedAddr = encodedAddr.substring(1);
  788   	    encodedAddr = URLEncoder.encode(encodedAddr) + "-";
  789           }
  790           return ("jk-" + encodedAddr + port);
  791       }
  792       
  793       /**
  794        * Return <code>true</code> if the specified client and server addresses
  795        * are the same.  This method works around a bug in the IBM 1.1.8 JVM on
  796        * Linux, where the address bytes are returned reversed in some
  797        * circumstances.
  798        *
  799        * @param server The server's InetAddress
  800        * @param client The client's InetAddress
  801        */
  802       public static boolean isSameAddress(InetAddress server, InetAddress client)
  803       {
  804   	// Compare the byte array versions of the two addresses
  805   	byte serverAddr[] = server.getAddress();
  806   	byte clientAddr[] = client.getAddress();
  807   	if (serverAddr.length != clientAddr.length)
  808   	    return (false);
  809   	boolean match = true;
  810   	for (int i = 0; i < serverAddr.length; i++) {
  811   	    if (serverAddr[i] != clientAddr[i]) {
  812   		match = false;
  813   		break;
  814   	    }
  815   	}
  816   	if (match)
  817   	    return (true);
  818   
  819   	// Compare the reversed form of the two addresses
  820   	for (int i = 0; i < serverAddr.length; i++) {
  821   	    if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
  822   		return (false);
  823   	}
  824   	return (true);
  825       }
  826   
  827       public void sendNewMessageNotification(Notification notification) {
  828           if( nSupport!= null )
  829               nSupport.sendNotification(notification);
  830       }
  831   
  832       private NotificationBroadcasterSupport nSupport= null;
  833   
  834       public void addNotificationListener(NotificationListener listener,
  835                                           NotificationFilter filter,
  836                                           Object handback)
  837               throws IllegalArgumentException
  838       {
  839           if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
  840           nSupport.addNotificationListener(listener, filter, handback);
  841       }
  842   
  843       public void removeNotificationListener(NotificationListener listener)
  844               throws ListenerNotFoundException
  845       {
  846           if( nSupport!=null)
  847               nSupport.removeNotificationListener(listener);
  848       }
  849   
  850       MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
  851   
  852       public void setNotificationInfo( MBeanNotificationInfo info[]) {
  853           this.notifInfo=info;
  854       }
  855   
  856       public MBeanNotificationInfo[] getNotificationInfo() {
  857           return notifInfo;
  858       }
  859   
  860       static class SocketAcceptor implements ThreadPoolRunnable {
  861   	ChannelSocket wajp;
  862       
  863   	SocketAcceptor(ChannelSocket wajp ) {
  864   	    this.wajp=wajp;
  865   	}
  866   	
  867   	public Object[] getInitData() {
  868   	    return null;
  869   	}
  870   	
  871   	public void runIt(Object thD[]) {
  872   	    wajp.acceptConnections();
  873   	}
  874       }
  875   
  876       static class SocketConnection implements ThreadPoolRunnable {
  877   	ChannelSocket wajp;
  878   	MsgContext ep;
  879   
  880   	SocketConnection(ChannelSocket wajp, MsgContext ep) {
  881   	    this.wajp=wajp;
  882   	    this.ep=ep;
  883   	}
  884   
  885   
  886   	public Object[] getInitData() {
  887   	    return null;
  888   	}
  889   	
  890   	public void runIt(Object perTh[]) {
  891   	    wajp.processConnection(ep);
  892   	    ep = null;
  893   	}
  894       }
  895   
  896   }
  897   

Save This Page
Home » apache-tomcat-6.0.26-src » org.apache.jk » common » [javadoc | source]