Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]
    1   /*
    2    * Copyright 1999,2004 The Apache Software Foundation.
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License");
    5    * you may not use this file except in compliance with the License.
    6    * You may obtain a copy of the License at
    7    * 
    8    *      http://www.apache.org/licenses/LICENSE-2.0
    9    * 
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS,
   12    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13    * See the License for the specific language governing permissions and
   14    * limitations under the License.
   15    */
   16   
   17   package org.apache.catalina.cluster.tcp;
   18   import java.io.IOException;
   19   import java.nio.ByteBuffer;
   20   import java.nio.channels.SelectionKey;
   21   import java.nio.channels.SocketChannel;
   22   
   23   import org.apache.catalina.cluster.io.ObjectReader;
   24   
   25   /**
   26    * A worker thread class which can drain channels and echo-back the input. Each
   27    * instance is constructed with a reference to the owning thread pool object.
   28    * When started, the thread loops forever waiting to be awakened to service the
   29    * channel associated with a SelectionKey object. The worker is tasked by
   30    * calling its serviceChannel() method with a SelectionKey object. The
   31    * serviceChannel() method stores the key reference in the thread object then
   32    * calls notify() to wake it up. When the channel has been drained, the worker
   33    * thread returns itself to its parent pool.
   34    * 
   35    * @author Filip Hanik
   36    * 
   37    * @version $Revision: 303987 $, $Date: 2005-07-08 16:50:30 -0400 (Fri, 08 Jul 2005) $
   38    */
   39   public class TcpReplicationThread extends WorkerThread {
   40       public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
   41       private static org.apache.commons.logging.Log log =
   42           org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
   43       private ByteBuffer buffer = ByteBuffer.allocate (1024);
   44       private SelectionKey key;
   45       private boolean sendAck=true;
   46   
   47       
   48       TcpReplicationThread ()
   49       {
   50       }
   51   
   52       // loop forever waiting for work to do
   53       public synchronized void run()
   54       {
   55           while (doRun) {
   56               try {
   57                   // sleep and release object lock
   58                   this.wait();
   59               } catch (InterruptedException e) {
   60                   if(log.isInfoEnabled())
   61                       log.info("TCP worker thread interrupted in cluster",e);
   62                   // clear interrupt status
   63                   Thread.interrupted();
   64               }
   65               if (key == null) {
   66                   continue;	// just in case
   67               }
   68               try {
   69                   drainChannel (key);
   70               } catch (Exception e) {
   71                   log.error ("TCP Worker thread in cluster caught '"
   72                       + e + "' closing channel", e);
   73   
   74                   // close channel and nudge selector
   75                   try {
   76                       key.channel().close();
   77                   } catch (IOException ex) {
   78                       log.error("Unable to close channel.",ex);
   79                   }
   80                   key.selector().wakeup();
   81               }
   82               key = null;
   83               // done, ready for more, return to pool
   84               this.pool.returnWorker (this);
   85           }
   86       }
   87   
   88       /**
   89        * Called to initiate a unit of work by this worker thread
   90        * on the provided SelectionKey object.  This method is
   91        * synchronized, as is the run() method, so only one key
   92        * can be serviced at a given time.
   93        * Before waking the worker thread, and before returning
   94        * to the main selection loop, this key's interest set is
   95        * updated to remove OP_READ.  This will cause the selector
   96        * to ignore read-readiness for this channel while the
   97        * worker thread is servicing it.
   98        */
   99       synchronized void serviceChannel (SelectionKey key, boolean sendAck)
  100       {
  101           this.key = key;
  102           this.sendAck=sendAck;
  103           key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
  104           key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
  105           this.notify();		// awaken the thread
  106       }
  107   
  108       /**
  109        * The actual code which drains the channel associated with
  110        * the given key.  This method assumes the key has been
  111        * modified prior to invocation to turn off selection
  112        * interest in OP_READ.  When this method completes it
  113        * re-enables OP_READ and calls wakeup() on the selector
  114        * so the selector will resume watching this channel.
  115        */
  116       protected void drainChannel (SelectionKey key)
  117           throws Exception
  118       {
  119           boolean packetReceived=false;
  120           SocketChannel channel = (SocketChannel) key.channel();
  121           int count;
  122           buffer.clear();			// make buffer empty
  123           ObjectReader reader = (ObjectReader)key.attachment();
  124           // loop while data available, channel is non-blocking
  125           while ((count = channel.read (buffer)) > 0) {
  126               buffer.flip();		// make buffer readable
  127               reader.append(buffer.array(),0,count);
  128               buffer.clear();		// make buffer empty
  129           }
  130           //check to see if any data is available
  131           int pkgcnt = reader.execute();
  132           if (log.isTraceEnabled()) {
  133               log.trace("sending " + pkgcnt + " ack packages to " + channel.socket().getLocalPort() );
  134           }
  135           
  136           if (sendAck) {
  137               while ( pkgcnt > 0 ) {
  138                   sendAck(key,channel);
  139                   pkgcnt--;
  140               }
  141           }
  142           
  143           if (count < 0) {
  144               // close channel on EOF, invalidates the key
  145               channel.close();
  146               return;
  147           }
  148           
  149           //acquire the interestOps mutex
  150           Object mutex = this.getPool().getInterestOpsMutex();
  151           synchronized (mutex) {
  152               // cycle the selector so this key is active again
  153               key.selector().wakeup();
  154               // resume interest in OP_READ, OP_WRITE
  155               int resumeOps = key.interestOps() | SelectionKey.OP_READ;
  156               key.interestOps(resumeOps);
  157           }
  158           
  159       }
  160   
  161       /**
  162        * send a reply-acknowledgement (6,2,3)
  163        * @param key
  164        * @param channel
  165        */
  166       protected void sendAck(SelectionKey key, SocketChannel channel) {
  167           
  168           try {
  169               channel.write(ByteBuffer.wrap(ACK_COMMAND));
  170               if (log.isTraceEnabled()) {
  171                   log.trace("ACK sent to " + channel.socket().getPort());
  172               }
  173           } catch ( java.io.IOException x ) {
  174               log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
  175           }
  176       }
  177   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]