Save This Page
Home » apache-tomcat-6.0.16-src » org.apache.jk » common » [javadoc | source]
    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one or more
    3    *  contributor license agreements.  See the NOTICE file distributed with
    4    *  this work for additional information regarding copyright ownership.
    5    *  The ASF licenses this file to You under the Apache License, Version 2.0
    6    *  (the "License"); you may not use this file except in compliance with
    7    *  the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   
   18   package org.apache.jk.common;
   19   
   20   import java.net.URLEncoder;
   21   import java.io.File;
   22   import java.io.FileOutputStream;
   23   import java.io.IOException;
   24   import javax.management.ObjectName;
   25   
   26   import org.apache.jk.core.JkHandler;
   27   import org.apache.jk.core.Msg;
   28   import org.apache.jk.core.MsgContext;
   29   import org.apache.jk.core.JkChannel;
   30   import org.apache.jk.core.WorkerEnv;
   31   import org.apache.coyote.Request;
   32   import org.apache.coyote.RequestGroupInfo;
   33   import org.apache.coyote.RequestInfo;
   34   import org.apache.tomcat.util.modeler.Registry;
   35   import org.apache.tomcat.util.threads.ThreadPool;
   36   import org.apache.tomcat.util.threads.ThreadPoolRunnable;
   37   
   38   
   39   /** Pass messages using unix domain sockets.
   40    *
   41    * @author Costin Manolache
   42    */
   43   public class ChannelUn extends JniHandler implements JkChannel {
   44       static final int CH_OPEN=4;
   45       static final int CH_CLOSE=5;
   46       static final int CH_READ=6;
   47       static final int CH_WRITE=7;
   48   
   49       String file;
   50       ThreadPool tp = ThreadPool.createThreadPool(true);
   51   
   52       /* ==================== Tcp socket options ==================== */
   53   
   54       public ThreadPool getThreadPool() {
   55           return tp;
   56       }
   57       
   58       public void setFile( String f ) {
   59           file=f;
   60       }
   61       
   62       public String getFile() {
   63           return file;
   64       }
   65       
   66       /* ==================== ==================== */
   67       int socketNote=1;
   68       int isNote=2;
   69       int osNote=3;
   70       
   71       int localId=0;
   72       
   73       public void init() throws IOException {
   74           if( file==null ) {
   75               log.debug("No file, disabling unix channel");
   76               return;
   77               //throw new IOException( "No file for the unix socket channel");
   78           }
   79           if( wEnv!=null && wEnv.getLocalId() != 0 ) {
   80               localId=wEnv.getLocalId();
   81           }
   82   
   83           if( localId != 0 ) {
   84               file=file+ localId;
   85           }
   86           File socketFile=new File( file );
   87           if( !socketFile.isAbsolute() ) {
   88               String home=wEnv.getJkHome();
   89               if( home==null ) {
   90                   log.debug("No jkhome");
   91               } else {
   92                   File homef=new File( home );
   93                   socketFile=new File( homef, file );
   94                   log.debug( "Making the file absolute " +socketFile);
   95               }
   96           }
   97           
   98           if( ! socketFile.exists() ) {
   99               try {
  100                   FileOutputStream fos=new FileOutputStream(socketFile);
  101                   fos.write( 1 );
  102                   fos.close();
  103               } catch( Throwable t ) {
  104                   log.error("Attempting to create the file failed, disabling channel" 
  105                           + socketFile);
  106                   return;
  107               }
  108           }
  109           // The socket file cannot be removed ...
  110           if (!socketFile.delete()) {
  111               log.error( "Can't remove socket file " + socketFile);
  112               return;
  113           }
  114           
  115   
  116           super.initNative( "channel.un:" + file );
  117   
  118           if( apr==null || ! apr.isLoaded() ) {
  119               log.debug("Apr is not available, disabling unix channel ");
  120               apr=null;
  121               return;
  122           }
  123           
  124           // Set properties and call init.
  125           setNativeAttribute( "file", file );
  126           // unixListenSocket=apr.unSocketListen( file, 10 );
  127   
  128           setNativeAttribute( "listen", "10" );
  129           // setNativeAttribute( "debug", "10" );
  130   
  131           // Initialize the thread pool and execution chain
  132           if( next==null && wEnv!=null ) {
  133               if( nextName!=null ) 
  134                   setNext( wEnv.getHandler( nextName ) );
  135               if( next==null )
  136                   next=wEnv.getHandler( "dispatch" );
  137               if( next==null )
  138                   next=wEnv.getHandler( "request" );
  139           }
  140   
  141           super.initJkComponent();
  142           JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");        
  143           // Run a thread that will accept connections.
  144           if( this.domain != null ) {
  145               try {
  146                   tpOName=new ObjectName(domain + ":type=ThreadPool,name=" + 
  147   				       getChannelName());
  148   
  149                   Registry.getRegistry(null, null)
  150   		    .registerComponent(tp, tpOName, null);
  151   
  152   		rgOName = new ObjectName
  153   		    (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
  154   		Registry.getRegistry(null, null)
  155   		    .registerComponent(global, rgOName, null);
  156               } catch (Exception e) {
  157                   log.error("Can't register threadpool" );
  158               }
  159           }
  160           tp.start();
  161           AprAcceptor acceptAjp=new AprAcceptor(  this );
  162           tp.runIt( acceptAjp);
  163           log.info("JK: listening on unix socket: " + file );
  164           
  165       }
  166   
  167       ObjectName tpOName;
  168       ObjectName rgOName;
  169       RequestGroupInfo global=new RequestGroupInfo();
  170       int count = 0;
  171       int JMXRequestNote;
  172   
  173       public void start() throws IOException {
  174       }
  175   
  176       public void destroy() throws IOException {
  177           if( apr==null ) return;
  178           try {
  179               if( tp != null )
  180                   tp.shutdown();
  181               
  182               //apr.unSocketClose( unixListenSocket,3);
  183               super.destroyJkComponent();
  184   
  185               if(tpOName != null) {
  186   		Registry.getRegistry(null, null).unregisterComponent(tpOName);
  187   	    }
  188   	    if(rgOName != null) {
  189   		Registry.getRegistry(null, null).unregisterComponent(rgOName);
  190   	    }
  191           } catch(Exception e) {
  192               log.error("Error in destroy",e);
  193           }
  194       }
  195   
  196       public void registerRequest(Request req, MsgContext ep, int count) {
  197   	if(this.domain != null) {
  198   	    try {
  199   
  200   		RequestInfo rp=req.getRequestProcessor();
  201   		rp.setGlobalProcessor(global);
  202   		ObjectName roname = new ObjectName
  203   		    (getDomain() + ":type=RequestProcessor,worker="+
  204   		     getChannelName()+",name=JkRequest" +count);
  205   		ep.setNote(JMXRequestNote, roname);
  206                           
  207   		Registry.getRegistry(null, null).registerComponent( rp, roname, null);
  208   	    } catch( Exception ex ) {
  209   		log.warn("Error registering request");
  210   	    }
  211   	}
  212       }
  213   
  214   
  215       /** Open a connection - since we're listening that will block in
  216           accept
  217       */
  218       public int open(MsgContext ep) throws IOException {
  219           // Will associate a jk_endpoint with ep and call open() on it.
  220           // jk_channel_un will accept a connection and set the socket info
  221           // in the endpoint. MsgContext will represent an active connection.
  222           return super.nativeDispatch( ep.getMsg(0), ep, CH_OPEN, 1 );
  223       }
  224       
  225       public void close(MsgContext ep) throws IOException {
  226           super.nativeDispatch( ep.getMsg(0), ep, CH_CLOSE, 1 );
  227       }
  228   
  229       public int send( Msg msg, MsgContext ep)
  230           throws IOException
  231       {
  232           return super.nativeDispatch( msg, ep, CH_WRITE, 0 );
  233       }
  234   
  235       public int receive( Msg msg, MsgContext ep )
  236           throws IOException
  237       {
  238           int rc=super.nativeDispatch( msg, ep, CH_READ, 1 );
  239   
  240           if( rc!=0 ) {
  241               log.error("receive error:   " + rc, new Throwable());
  242               return -1;
  243           }
  244           
  245           msg.processHeader();
  246           
  247           if (log.isDebugEnabled())
  248                log.debug("receive:  total read = " + msg.getLen());
  249   
  250   	return msg.getLen();
  251       }
  252   
  253       public int flush( Msg msg, MsgContext ep) throws IOException {
  254   	return OK;
  255       }
  256   
  257       public boolean isSameAddress( MsgContext ep ) {
  258   	return false; // Not supporting shutdown on this channel.
  259       }
  260   
  261       boolean running=true;
  262       
  263       /** Accept incoming connections, dispatch to the thread pool
  264        */
  265       void acceptConnections() {
  266           if( apr==null ) return;
  267   
  268           if( log.isDebugEnabled() )
  269               log.debug("Accepting ajp connections on " + file);
  270           
  271           while( running ) {
  272               try {
  273                   MsgContext ep=this.createMsgContext();
  274   
  275                   // blocking - opening a server connection.
  276                   int status=this.open(ep);
  277                   if( status != 0 && status != 2 ) {
  278                       log.error( "Error acceptin connection on " + file );
  279                       break;
  280                   }
  281   
  282                   //    if( log.isDebugEnabled() )
  283                   //     log.debug("Accepted ajp connections ");
  284           
  285                   AprConnection ajpConn= new AprConnection(this, ep);
  286                   tp.runIt( ajpConn );
  287               } catch( Exception ex ) {
  288                   ex.printStackTrace();
  289               }
  290           }
  291       }
  292   
  293       /** Process a single ajp connection.
  294        */
  295       void processConnection(MsgContext ep) {
  296           if( log.isDebugEnabled() )
  297               log.debug( "New ajp connection ");
  298           try {
  299               MsgAjp recv=new MsgAjp();
  300               while( running ) {
  301                   int res=this.receive( recv, ep );
  302                   if( res<0 ) {
  303                       // EOS
  304                       break;
  305                   }
  306                   ep.setType(0);
  307                   log.debug( "Process msg ");
  308                   int status=next.invoke( recv, ep );
  309               }
  310               if( log.isDebugEnabled() )
  311                   log.debug( "Closing un channel");
  312               try{
  313                   Request req = (Request)ep.getRequest();
  314                   if( req != null ) {
  315                       ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
  316                       if( roname != null ) {
  317                           Registry.getRegistry(null, null).unregisterComponent(roname);
  318                       }
  319                       req.getRequestProcessor().setGlobalProcessor(null);
  320                   }
  321               } catch( Exception ee) {
  322                   log.error( "Error, releasing connection",ee);
  323               }
  324               this.close( ep );
  325           } catch( Exception ex ) {
  326               ex.printStackTrace();
  327           }
  328       }
  329   
  330       public int invoke( Msg msg, MsgContext ep ) throws IOException {
  331           int type=ep.getType();
  332   
  333           switch( type ) {
  334           case JkHandler.HANDLE_RECEIVE_PACKET:
  335               return receive( msg, ep );
  336           case JkHandler.HANDLE_SEND_PACKET:
  337               return send( msg, ep );
  338           case JkHandler.HANDLE_FLUSH:
  339               return flush( msg, ep );
  340           }
  341   
  342           // return next.invoke( msg, ep );
  343           return OK;
  344       }
  345   
  346       public String getChannelName() {
  347           String encodedAddr = "";
  348           String address = file;
  349           if (address != null) {
  350               encodedAddr = "" + address;
  351               if (encodedAddr.startsWith("/"))
  352                   encodedAddr = encodedAddr.substring(1);
  353               encodedAddr = URLEncoder.encode(encodedAddr) ;
  354           }
  355           return ("jk-" + encodedAddr);
  356       }
  357   
  358       private static org.apache.juli.logging.Log log=
  359           org.apache.juli.logging.LogFactory.getLog( ChannelUn.class );
  360   }
  361   
  362   class AprAcceptor implements ThreadPoolRunnable {
  363       ChannelUn wajp;
  364       
  365       AprAcceptor(ChannelUn wajp ) {
  366           this.wajp=wajp;
  367       }
  368   
  369       public Object[] getInitData() {
  370           return null;
  371       }
  372   
  373       public void runIt(Object thD[]) {
  374           wajp.acceptConnections();
  375       }
  376   }
  377   
  378   class AprConnection implements ThreadPoolRunnable {
  379       ChannelUn wajp;
  380       MsgContext ep;
  381   
  382       AprConnection(ChannelUn wajp, MsgContext ep) {
  383           this.wajp=wajp;
  384           this.ep=ep;
  385       }
  386   
  387   
  388       public Object[] getInitData() {
  389           return null;
  390       }
  391       
  392       public void runIt(Object perTh[]) {
  393           wajp.processConnection(ep);
  394       }
  395   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache.jk » common » [javadoc | source]