Save This Page
Home » apache-tomcat-6.0.16-src » org.apache.jk » common » [javadoc | source]
    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one or more
    3    *  contributor license agreements.  See the NOTICE file distributed with
    4    *  this work for additional information regarding copyright ownership.
    5    *  The ASF licenses this file to You under the Apache License, Version 2.0
    6    *  (the "License"); you may not use this file except in compliance with
    7    *  the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   
   18   package org.apache.jk.common;
   19   
   20   import java.io.BufferedInputStream;
   21   import java.io.BufferedOutputStream;
   22   import java.io.IOException;
   23   import java.io.InputStream;
   24   import java.io.OutputStream;
   25   import java.net.URLEncoder;
   26   import java.net.InetAddress;
   27   import java.net.ServerSocket;
   28   import java.net.Socket;
   29   import java.net.SocketException;
   30   
   31   import javax.management.ListenerNotFoundException;
   32   import javax.management.MBeanNotificationInfo;
   33   import javax.management.Notification;
   34   import javax.management.NotificationBroadcaster;
   35   import javax.management.NotificationBroadcasterSupport;
   36   import javax.management.NotificationFilter;
   37   import javax.management.NotificationListener;
   38   import javax.management.ObjectName;
   39   
   40   import org.apache.jk.core.JkHandler;
   41   import org.apache.jk.core.Msg;
   42   import org.apache.jk.core.MsgContext;
   43   import org.apache.jk.core.JkChannel;
   44   import org.apache.jk.core.WorkerEnv;
   45   import org.apache.coyote.Request;
   46   import org.apache.coyote.RequestGroupInfo;
   47   import org.apache.coyote.RequestInfo;
   48   import org.apache.tomcat.util.modeler.Registry;
   49   import org.apache.tomcat.util.threads.ThreadPool;
   50   import org.apache.tomcat.util.threads.ThreadPoolRunnable;
   51   
   52   /** 
   53    * Accept ( and send ) TCP messages.
   54    *
   55    * @author Costin Manolache
   56    * @author Bill Barker
   57    * jmx:mbean name="jk:service=ChannelNioSocket"
   58    *            description="Accept socket connections"
   59    * jmx:notification name="org.apache.coyote.INVOKE
   60    * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
   61    * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
   62    * jmx:notification-handler name="org.apache.jk.JK_FLUSH
   63    *
   64    * Jk can use multiple protocols/transports.
   65    * Various container adapters should load this object ( as a bean ),
   66    * set configurations and use it. Note that the connector will handle
   67    * all incoming protocols - it's not specific to ajp1x. The protocol
   68    * is abstracted by MsgContext/Message/Channel.
   69    *
   70    * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
   71    * TCP, Ajp14 API etc.
   72    * As we add other protocols/transports/APIs this will change, the current goal
   73    * is to get the same level of functionality as in the original jk connector.
   74    *
   75    * XXX Make the 'message type' pluggable
   76    */
   77   public class ChannelSocket extends JkHandler
   78       implements NotificationBroadcaster, JkChannel {
   79       private static org.apache.juli.logging.Log log =
   80           org.apache.juli.logging.LogFactory.getLog( ChannelSocket.class );
   81   
   82       private int startPort=8009;
   83       private int maxPort=8019; // 0 for backward compat.
   84       private int port=startPort;
   85       private InetAddress inet;
   86       private int serverTimeout;
   87       private boolean tcpNoDelay=true; // nodelay to true by default
   88       private int linger=100;
   89       private int socketTimeout;
   90       private int bufferSize = -1;
   91       private int packetSize = 8*1024;
   92   
   93       private long requestCount=0;
   94       
   95       ThreadPool tp=ThreadPool.createThreadPool(true);
   96   
   97       /* ==================== Tcp socket options ==================== */
   98   
   99       /**
  100        * jmx:managed-constructor description="default constructor"
  101        */
  102       public ChannelSocket() {
  103           // This should be integrated with the  domain setup
  104       }
  105       
  106       public ThreadPool getThreadPool() {
  107           return tp;
  108       }
  109   
  110       public long getRequestCount() {
  111           return requestCount;
  112       }
  113       
  114       /** Set the port for the ajp13 channel.
  115        *  To support seemless load balancing and jni, we treat this
  116        *  as the 'base' port - we'll try up until we find one that is not
  117        *  used. We'll also provide the 'difference' to the main coyote
  118        *  handler - that will be our 'sessionID' and the position in
  119        *  the scoreboard and the suffix for the unix domain socket.
  120        *
  121        * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
  122        */
  123       public void setPort( int port ) {
  124           this.startPort=port;
  125           this.port=port;
  126           this.maxPort=port+10;
  127       }
  128   
  129       public int getPort() {
  130           return port;
  131       }
  132   
  133       public void setAddress(InetAddress inet) {
  134           this.inet=inet;
  135       }
  136   
  137       /**
  138        * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
  139        */
  140       public void setAddress(String inet) {
  141           try {
  142               this.inet= InetAddress.getByName( inet );
  143           } catch( Exception ex ) {
  144               log.error("Error parsing "+inet,ex);
  145           }
  146       }
  147   
  148       public String getAddress() {
  149           if( inet!=null)
  150               return inet.toString();
  151           return "/0.0.0.0";
  152       }
  153   
  154       /**
  155        * Sets the timeout in ms of the server sockets created by this
  156        * server. This method allows the developer to make servers
  157        * more or less responsive to having their server sockets
  158        * shut down.
  159        *
  160        * <p>By default this value is 1000ms.
  161        */
  162       public void setServerTimeout(int timeout) {
  163   	this.serverTimeout = timeout;
  164       }
  165       public int getServerTimeout() {
  166           return serverTimeout;
  167       }
  168   
  169       public void setTcpNoDelay( boolean b ) {
  170   	tcpNoDelay=b;
  171       }
  172   
  173       public boolean getTcpNoDelay() {
  174           return tcpNoDelay;
  175       }
  176       
  177       public void setSoLinger( int i ) {
  178   	linger=i;
  179       }
  180   
  181       public int getSoLinger() {
  182           return linger;
  183       }
  184       
  185       public void setSoTimeout( int i ) {
  186   	socketTimeout=i;
  187       }
  188   
  189       public int getSoTimeout() {
  190   	return socketTimeout;
  191       }
  192   
  193       public void setMaxPort( int i ) {
  194           maxPort=i;
  195       }
  196   
  197       public int getMaxPort() {
  198           return maxPort;
  199       }
  200   
  201       public void setBufferSize(int bs) {
  202           bufferSize = bs;
  203       }
  204   
  205       public int getBufferSize() {
  206           return bufferSize;
  207       }
  208   
  209       public void setPacketSize(int ps) {
  210           if(ps < 8*1024) {
  211               ps = 8*1024;
  212           }
  213           packetSize = ps;
  214       }
  215   
  216       public int getPacketSize() {
  217           return packetSize;
  218       }
  219   
  220       /** At startup we'll look for the first free port in the range.
  221           The difference between this port and the beggining of the range
  222           is the 'id'.
  223           This is usefull for lb cases ( less config ).
  224       */
  225       public int getInstanceId() {
  226           return port-startPort;
  227       }
  228   
  229       /** If set to false, the thread pool will be created in
  230        *  non-daemon mode, and will prevent main from exiting
  231        */
  232       public void setDaemon( boolean b ) {
  233           tp.setDaemon( b );
  234       }
  235   
  236       public boolean getDaemon() {
  237           return tp.getDaemon();
  238       }
  239   
  240   
  241       public void setMaxThreads( int i ) {
  242           if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
  243           tp.setMaxThreads(i);
  244       }
  245       
  246       public void setMinSpareThreads( int i ) {
  247           if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
  248           tp.setMinSpareThreads(i);
  249       }
  250       
  251       public void setMaxSpareThreads( int i ) {
  252           if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
  253           tp.setMaxSpareThreads(i);
  254       }
  255   
  256       public int getMaxThreads() {
  257           return tp.getMaxThreads();   
  258       }
  259       
  260       public int getMinSpareThreads() {
  261           return tp.getMinSpareThreads();   
  262       }
  263   
  264       public int getMaxSpareThreads() {
  265           return tp.getMaxSpareThreads();   
  266       }
  267   
  268       public void setBacklog(int i) {
  269       }
  270       
  271       
  272       /* ==================== ==================== */
  273       ServerSocket sSocket;
  274       final int socketNote=1;
  275       final int isNote=2;
  276       final int osNote=3;
  277       final int notifNote=4;
  278       boolean paused = false;
  279   
  280       public void pause() throws Exception {
  281           synchronized(this) {
  282               paused = true;
  283               unLockSocket();
  284           }
  285       }
  286   
  287       public void resume() throws Exception {
  288           synchronized(this) {
  289               paused = false;
  290               notify();
  291           }
  292       }
  293   
  294   
  295       public void accept( MsgContext ep ) throws IOException {
  296           if( sSocket==null ) return;
  297           synchronized(this) {
  298               while(paused) {
  299                   try{ 
  300                       wait();
  301                   } catch(InterruptedException ie) {
  302                       //Ignore, since can't happen
  303                   }
  304               }
  305           }
  306           Socket s=sSocket.accept();
  307           ep.setNote( socketNote, s );
  308           if(log.isDebugEnabled() )
  309               log.debug("Accepted socket " + s );
  310   
  311           try {
  312               setSocketOptions(s);
  313           } catch(SocketException sex) {
  314               log.debug("Error initializing Socket Options", sex);
  315           }
  316           
  317           requestCount++;
  318   
  319           InputStream is=new BufferedInputStream(s.getInputStream());
  320           OutputStream os;
  321           if( bufferSize > 0 )
  322               os = new BufferedOutputStream( s.getOutputStream(), bufferSize);
  323           else
  324               os = s.getOutputStream();
  325           ep.setNote( isNote, is );
  326           ep.setNote( osNote, os );
  327           ep.setControl( tp );
  328       }
  329   
  330       private void setSocketOptions(Socket s) throws SocketException {
  331           if( socketTimeout > 0 ) 
  332               s.setSoTimeout( socketTimeout );
  333           
  334           s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state
  335   
  336           if( linger > 0 )
  337               s.setSoLinger( true, linger);
  338       }
  339   
  340       public void resetCounters() {
  341           requestCount=0;
  342       }
  343   
  344       /** Called after you change some fields at runtime using jmx.
  345           Experimental for now.
  346       */
  347       public void reinit() throws IOException {
  348           destroy();
  349           init();
  350       }
  351   
  352       /**
  353        * jmx:managed-operation
  354        */
  355       public void init() throws IOException {
  356           // Find a port.
  357           if (startPort == 0) {
  358               port = 0;
  359               if(log.isInfoEnabled())
  360                   log.info("JK: ajp13 disabling channelSocket");
  361               running = true;
  362               return;
  363           }
  364           if (maxPort < startPort)
  365               maxPort = startPort;
  366           for( int i=startPort; i<=maxPort; i++ ) {
  367               try {
  368                   if( inet == null ) {
  369                       sSocket = new ServerSocket( i, 0 );
  370                   } else {
  371                       sSocket=new ServerSocket( i, 0, inet );
  372                   }
  373                   port=i;
  374                   break;
  375               } catch( IOException ex ) {
  376                   if(log.isInfoEnabled())
  377                       log.info("Port busy " + i + " " + ex.toString());
  378                   continue;
  379               }
  380           }
  381   
  382           if( sSocket==null ) {
  383               log.error("Can't find free port " + startPort + " " + maxPort );
  384               return;
  385           }
  386           if(log.isInfoEnabled())
  387               log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
  388   
  389           // If this is not the base port and we are the 'main' channleSocket and
  390           // SHM didn't already set the localId - we'll set the instance id
  391           if( "channelSocket".equals( name ) &&
  392               port != startPort &&
  393               (wEnv.getLocalId()==0) ) {
  394               wEnv.setLocalId(  port - startPort );
  395           }
  396           if( serverTimeout > 0 )
  397               sSocket.setSoTimeout( serverTimeout );
  398   
  399           // XXX Reverse it -> this is a notification generator !!
  400           if( next==null && wEnv!=null ) {
  401               if( nextName!=null )
  402                   setNext( wEnv.getHandler( nextName ) );
  403               if( next==null )
  404                   next=wEnv.getHandler( "dispatch" );
  405               if( next==null )
  406                   next=wEnv.getHandler( "request" );
  407           }
  408           JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
  409           running = true;
  410   
  411           // Run a thread that will accept connections.
  412           // XXX Try to find a thread first - not sure how...
  413           if( this.domain != null ) {
  414               try {
  415                   tpOName=new ObjectName(domain + ":type=ThreadPool,name=" + 
  416                                          getChannelName());
  417   
  418                   Registry.getRegistry(null, null)
  419                       .registerComponent(tp, tpOName, null);
  420   
  421                   rgOName = new ObjectName
  422                       (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
  423                   Registry.getRegistry(null, null)
  424                       .registerComponent(global, rgOName, null);
  425               } catch (Exception e) {
  426                   log.error("Can't register threadpool" );
  427               }
  428           }
  429   
  430           tp.start();
  431           SocketAcceptor acceptAjp=new SocketAcceptor(  this );
  432           tp.runIt( acceptAjp);
  433   
  434       }
  435   
  436       ObjectName tpOName;
  437       ObjectName rgOName;
  438       RequestGroupInfo global=new RequestGroupInfo();
  439       int JMXRequestNote;
  440   
  441       public void start() throws IOException{
  442           if( sSocket==null )
  443               init();
  444       }
  445   
  446       public void stop() throws IOException {
  447           destroy();
  448       }
  449   
  450       public void registerRequest(Request req, MsgContext ep, int count) {
  451           if(this.domain != null) {
  452               try {
  453                   RequestInfo rp=req.getRequestProcessor();
  454                   rp.setGlobalProcessor(global);
  455                   ObjectName roname = new ObjectName
  456                       (getDomain() + ":type=RequestProcessor,worker="+
  457                        getChannelName()+",name=JkRequest" +count);
  458                   ep.setNote(JMXRequestNote, roname);
  459                           
  460                   Registry.getRegistry(null, null).registerComponent( rp, roname, null);
  461               } catch( Exception ex ) {
  462                   log.warn("Error registering request");
  463               }
  464           }
  465       }
  466   
  467       public void open(MsgContext ep) throws IOException {
  468       }
  469   
  470       
  471       public void close(MsgContext ep) throws IOException {
  472           Socket s=(Socket)ep.getNote( socketNote );
  473           s.close();
  474       }
  475   
  476       private void unLockSocket() throws IOException {
  477           // Need to create a connection to unlock the accept();
  478           Socket s;
  479           InetAddress ladr = inet;
  480   
  481           if(port == 0)
  482               return;
  483           if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
  484               ladr = InetAddress.getLocalHost();
  485           }
  486           s=new Socket(ladr, port );
  487           // setting soLinger to a small value will help shutdown the
  488           // connection quicker
  489           s.setSoLinger(true, 0);
  490   
  491   	s.close();
  492       }
  493   
  494       public void destroy() throws IOException {
  495           running = false;
  496           try {
  497               /* If we disabled the channel return */
  498               if (port == 0)
  499                   return;
  500               tp.shutdown();
  501   
  502   	    if(!paused) {
  503   		unLockSocket();
  504   	    }
  505   
  506               sSocket.close(); // XXX?
  507               
  508               if( tpOName != null )  {
  509                   Registry.getRegistry(null, null).unregisterComponent(tpOName);
  510               }
  511               if( rgOName != null ) {
  512                   Registry.getRegistry(null, null).unregisterComponent(rgOName);
  513               }
  514           } catch(Exception e) {
  515               log.info("Error shutting down the channel " + port + " " +
  516                       e.toString());
  517               if( log.isDebugEnabled() ) log.debug("Trace", e);
  518           }
  519       }
  520   
  521       public int send( Msg msg, MsgContext ep)
  522           throws IOException    {
  523           msg.end(); // Write the packet header
  524           byte buf[]=msg.getBuffer();
  525           int len=msg.getLen();
  526           
  527           if(log.isTraceEnabled() )
  528               log.trace("send() " + len + " " + buf[4] );
  529   
  530           OutputStream os=(OutputStream)ep.getNote( osNote );
  531           os.write( buf, 0, len );
  532           return len;
  533       }
  534   
  535       public int flush( Msg msg, MsgContext ep)
  536           throws IOException    {
  537           if( bufferSize > 0 ) {
  538               OutputStream os=(OutputStream)ep.getNote( osNote );
  539               os.flush();
  540           }
  541           return 0;
  542       }
  543   
  544       public int receive( Msg msg, MsgContext ep )
  545           throws IOException    {
  546           if (log.isDebugEnabled()) {
  547               log.debug("receive() ");
  548           }
  549   
  550           byte buf[]=msg.getBuffer();
  551           int hlen=msg.getHeaderLength();
  552           
  553   	// XXX If the length in the packet header doesn't agree with the
  554   	// actual number of bytes read, it should probably return an error
  555   	// value.  Also, callers of this method never use the length
  556   	// returned -- should probably return true/false instead.
  557   
  558           int rd = this.read(ep, buf, 0, hlen );
  559           
  560           if(rd < 0) {
  561               // Most likely normal apache restart.
  562               // log.warn("Wrong message " + rd );
  563               return rd;
  564           }
  565   
  566           msg.processHeader();
  567   
  568           /* After processing the header we know the body
  569              length
  570           */
  571           int blen=msg.getLen();
  572           
  573   	// XXX check if enough space - it's assert()-ed !!!
  574           
  575    	int total_read = 0;
  576           
  577           total_read = this.read(ep, buf, hlen, blen);
  578           
  579           if ((total_read <= 0) && (blen > 0)) {
  580               log.warn("can't read body, waited #" + blen);
  581               return  -1;
  582           }
  583           
  584           if (total_read != blen) {
  585                log.warn( "incomplete read, waited #" + blen +
  586                           " got only " + total_read);
  587               return -2;
  588           }
  589           
  590   	return total_read;
  591       }
  592       
  593       /**
  594        * Read N bytes from the InputStream, and ensure we got them all
  595        * Under heavy load we could experience many fragmented packets
  596        * just read Unix Network Programming to recall that a call to
  597        * read didn't ensure you got all the data you want
  598        *
  599        * from read() Linux manual
  600        *
  601        * On success, the number of bytes read is returned (zero indicates end
  602        * of file),and the file position is advanced by this number.
  603        * It is not an error if this number is smaller than the number of bytes
  604        * requested; this may happen for example because fewer bytes
  605        * are actually available right now (maybe because we were close to
  606        * end-of-file, or because we are reading from a pipe, or  from  a
  607        * terminal),  or  because  read()  was interrupted by a signal.
  608        * On error, -1 is returned, and errno is set appropriately. In this
  609        * case it is left unspecified whether the file position (if any) changes.
  610        *
  611        **/
  612       public int read( MsgContext ep, byte[] b, int offset, int len)
  613           throws IOException    {
  614           InputStream is=(InputStream)ep.getNote( isNote );
  615           int pos = 0;
  616           int got;
  617   
  618           while(pos < len) {
  619               try {
  620                   got = is.read(b, pos + offset, len - pos);
  621               } catch(SocketException sex) {
  622                   if(pos > 0) {
  623                       log.info("Error reading data after "+pos+"bytes",sex);
  624                   } else {
  625                       log.debug("Error reading data", sex);
  626                   }
  627                   got = -1;
  628               }
  629               if (log.isTraceEnabled()) {
  630                   log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
  631                             offset + " " + len + " = " + got );
  632               }
  633   
  634               // connection just closed by remote. 
  635               if (got <= 0) {
  636                   // This happens periodically, as apache restarts
  637                   // periodically.
  638                   // It should be more gracefull ! - another feature for Ajp14
  639                   // log.warn( "server has closed the current connection (-1)" );
  640                   return -3;
  641               }
  642   
  643               pos += got;
  644           }
  645           return pos;
  646       }
  647       
  648       protected boolean running=true;
  649       
  650       /** Accept incoming connections, dispatch to the thread pool
  651        */
  652       void acceptConnections() {
  653           if( log.isDebugEnabled() )
  654               log.debug("Accepting ajp connections on " + port);
  655           while( running ) {
  656   	    try{
  657                   MsgContext ep=createMsgContext(packetSize);
  658                   ep.setSource(this);
  659                   ep.setWorkerEnv( wEnv );
  660                   this.accept(ep);
  661   
  662                   if( !running ) break;
  663                   
  664                   // Since this is a long-running connection, we don't care
  665                   // about the small GC
  666                   SocketConnection ajpConn=
  667                       new SocketConnection(this, ep);
  668                   tp.runIt( ajpConn );
  669   	    }catch(Exception ex) {
  670                   if (running)
  671                       log.warn("Exception executing accept" ,ex);
  672   	    }
  673           }
  674       }
  675   
  676       /** Process a single ajp connection.
  677        */
  678       void processConnection(MsgContext ep) {
  679           try {
  680               MsgAjp recv=new MsgAjp(packetSize);
  681               while( running ) {
  682                   if(paused) { // Drop the connection on pause
  683                       break;
  684                   }
  685                   int status= this.receive( recv, ep );
  686                   if( status <= 0 ) {
  687                       if( status==-3)
  688                           log.debug( "server has been restarted or reset this connection" );
  689                       else 
  690                           log.warn("Closing ajp connection " + status );
  691                       break;
  692                   }
  693                   ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
  694                   
  695                   ep.setType( 0 );
  696                   // Will call next
  697                   status= this.invoke( recv, ep );
  698                   if( status!= JkHandler.OK ) {
  699                       log.warn("processCallbacks status " + status );
  700                       break;
  701                   }
  702               }
  703           } catch( Exception ex ) {
  704               String msg = ex.getMessage();
  705               if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
  706                   log.debug( "Server has been restarted or reset this connection");
  707               else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
  708                   log.debug( "connection timeout reached");            
  709               else
  710                   log.error( "Error, processing connection", ex);
  711           } finally {
  712   	    	/*
  713   	    	 * Whatever happened to this connection (remote closed it, timeout, read error)
  714   	    	 * the socket SHOULD be closed, or we may be in situation where the webserver
  715   	    	 * will continue to think the socket is still open and will forward request
  716   	    	 * to tomcat without receiving ever a reply
  717   	    	 */
  718               try {
  719                   this.close( ep );
  720               }
  721               catch( Exception e) {
  722                   log.error( "Error, closing connection", e);
  723               }
  724               try{
  725                   Request req = (Request)ep.getRequest();
  726                   if( req != null ) {
  727                       ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
  728                       if( roname != null ) {
  729                           Registry.getRegistry(null, null).unregisterComponent(roname);
  730                       }
  731                       req.getRequestProcessor().setGlobalProcessor(null);
  732                   }
  733               } catch( Exception ee) {
  734                   log.error( "Error, releasing connection",ee);
  735               }
  736           }
  737       }
  738   
  739       // XXX This should become handleNotification
  740       public int invoke( Msg msg, MsgContext ep ) throws IOException {
  741           int type=ep.getType();
  742   
  743           switch( type ) {
  744           case JkHandler.HANDLE_RECEIVE_PACKET:
  745               if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
  746               return receive( msg, ep );
  747           case JkHandler.HANDLE_SEND_PACKET:
  748               return send( msg, ep );
  749           case JkHandler.HANDLE_FLUSH:
  750               return flush( msg, ep );
  751           }
  752   
  753           if( log.isDebugEnabled() )
  754               log.debug("Call next " + type + " " + next);
  755   
  756           // Send notification
  757           if( nSupport!=null ) {
  758               Notification notif=(Notification)ep.getNote(notifNote);
  759               if( notif==null ) {
  760                   notif=new Notification("channelSocket.message", ep, requestCount );
  761                   ep.setNote( notifNote, notif);
  762               }
  763               nSupport.sendNotification(notif);
  764           }
  765   
  766           if( next != null ) {
  767               return next.invoke( msg, ep );
  768           } else {
  769               log.info("No next ");
  770           }
  771   
  772           return OK;
  773       }
  774       
  775       public boolean isSameAddress(MsgContext ep) {
  776           Socket s=(Socket)ep.getNote( socketNote );
  777           return isSameAddress( s.getLocalAddress(), s.getInetAddress());
  778       }
  779       
  780       public String getChannelName() {
  781           String encodedAddr = "";
  782           if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
  783               encodedAddr = getAddress();
  784               if (encodedAddr.startsWith("/"))
  785                   encodedAddr = encodedAddr.substring(1);
  786   	    encodedAddr = URLEncoder.encode(encodedAddr) + "-";
  787           }
  788           return ("jk-" + encodedAddr + port);
  789       }
  790       
  791       /**
  792        * Return <code>true</code> if the specified client and server addresses
  793        * are the same.  This method works around a bug in the IBM 1.1.8 JVM on
  794        * Linux, where the address bytes are returned reversed in some
  795        * circumstances.
  796        *
  797        * @param server The server's InetAddress
  798        * @param client The client's InetAddress
  799        */
  800       public static boolean isSameAddress(InetAddress server, InetAddress client)
  801       {
  802   	// Compare the byte array versions of the two addresses
  803   	byte serverAddr[] = server.getAddress();
  804   	byte clientAddr[] = client.getAddress();
  805   	if (serverAddr.length != clientAddr.length)
  806   	    return (false);
  807   	boolean match = true;
  808   	for (int i = 0; i < serverAddr.length; i++) {
  809   	    if (serverAddr[i] != clientAddr[i]) {
  810   		match = false;
  811   		break;
  812   	    }
  813   	}
  814   	if (match)
  815   	    return (true);
  816   
  817   	// Compare the reversed form of the two addresses
  818   	for (int i = 0; i < serverAddr.length; i++) {
  819   	    if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
  820   		return (false);
  821   	}
  822   	return (true);
  823       }
  824   
  825       public void sendNewMessageNotification(Notification notification) {
  826           if( nSupport!= null )
  827               nSupport.sendNotification(notification);
  828       }
  829   
  830       private NotificationBroadcasterSupport nSupport= null;
  831   
  832       public void addNotificationListener(NotificationListener listener,
  833                                           NotificationFilter filter,
  834                                           Object handback)
  835               throws IllegalArgumentException
  836       {
  837           if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
  838           nSupport.addNotificationListener(listener, filter, handback);
  839       }
  840   
  841       public void removeNotificationListener(NotificationListener listener)
  842               throws ListenerNotFoundException
  843       {
  844           if( nSupport!=null)
  845               nSupport.removeNotificationListener(listener);
  846       }
  847   
  848       MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
  849   
  850       public void setNotificationInfo( MBeanNotificationInfo info[]) {
  851           this.notifInfo=info;
  852       }
  853   
  854       public MBeanNotificationInfo[] getNotificationInfo() {
  855           return notifInfo;
  856       }
  857   
  858       static class SocketAcceptor implements ThreadPoolRunnable {
  859   	ChannelSocket wajp;
  860       
  861   	SocketAcceptor(ChannelSocket wajp ) {
  862   	    this.wajp=wajp;
  863   	}
  864   	
  865   	public Object[] getInitData() {
  866   	    return null;
  867   	}
  868   	
  869   	public void runIt(Object thD[]) {
  870   	    wajp.acceptConnections();
  871   	}
  872       }
  873   
  874       static class SocketConnection implements ThreadPoolRunnable {
  875   	ChannelSocket wajp;
  876   	MsgContext ep;
  877   
  878   	SocketConnection(ChannelSocket wajp, MsgContext ep) {
  879   	    this.wajp=wajp;
  880   	    this.ep=ep;
  881   	}
  882   
  883   
  884   	public Object[] getInitData() {
  885   	    return null;
  886   	}
  887   	
  888   	public void runIt(Object perTh[]) {
  889   	    wajp.processConnection(ep);
  890   	    ep = null;
  891   	}
  892       }
  893   
  894   }
  895   

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache.jk » common » [javadoc | source]