Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]
    1   /*
    2    * Copyright 1999,2004-2005 The Apache Software Foundation.
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License");
    5    * you may not use this file except in compliance with the License.
    6    * You may obtain a copy of the License at
    7    * 
    8    *      http://www.apache.org/licenses/LICENSE-2.0
    9    * 
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS,
   12    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13    * See the License for the specific language governing permissions and
   14    * limitations under the License.
   15    */
   16   
   17   package org.apache.catalina.cluster.tcp;
   18   
   19   
   20   import java.net.InetSocketAddress;
   21   import java.net.ServerSocket;
   22   import java.nio.channels.SelectableChannel;
   23   import java.nio.channels.SelectionKey;
   24   import java.nio.channels.Selector;
   25   import java.nio.channels.ServerSocketChannel;
   26   import java.nio.channels.SocketChannel;
   27   import java.util.Iterator;
   28   
   29   import org.apache.catalina.cluster.io.ObjectReader;
   30   
   31   /**
   32   * FIXME i18n log messages
   33   * FIXME jmx support
   34   * @author Peter Rossbach
   35   * @author Filip Hanik
   36   * @version $Revision: 349504 $ $Date: 2005-11-28 16:05:40 -0500 (Mon, 28 Nov 2005) $
   37   */
   38   public class ReplicationListener extends ClusterReceiverBase
   39   {
   40   
   41       /**
   42        * The descriptive information about this implementation.
   43        */
   44       private static final String info = "ReplicationListener/1.2";
   45       
   46       private ThreadPool pool = null;
   47       private int tcpThreadCount;
   48       private long tcpSelectorTimeout;    
   49       private Selector selector = null;
   50       
   51       private Object interestOpsMutex = new Object();
   52       
   53       public ReplicationListener() {
   54       }
   55       
   56       /**
   57        * Return descriptive information about this implementation and the
   58        * corresponding version number, in the format
   59        * <code>&lt;description&gt;/&lt;version&gt;</code>.
   60        */
   61       public String getInfo() {
   62   
   63           return (info);
   64   
   65       }
   66    
   67       public long getTcpSelectorTimeout() {
   68           return tcpSelectorTimeout;
   69       }
   70       public void setTcpSelectorTimeout(long tcpSelectorTimeout) {
   71           this.tcpSelectorTimeout = tcpSelectorTimeout;
   72       }
   73       public int getTcpThreadCount() {
   74           return tcpThreadCount;
   75       }
   76       public void setTcpThreadCount(int tcpThreadCount) {
   77           this.tcpThreadCount = tcpThreadCount;
   78       }
   79       public Object getInterestOpsMutex() {
   80           return interestOpsMutex;
   81       }
   82   
   83       /**
   84        * start cluster receiver
   85        * @throws Exception
   86        * @see org.apache.catalina.cluster.ClusterReceiver#start()
   87        */
   88       public void start() {
   89               try {
   90                   pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex);
   91               } catch (Exception e) {
   92                   log.error("ThreadPool can initilzed. Listener not started",e);
   93                   return ;
   94               }
   95               super.start() ;
   96        }
   97       
   98   
   99       /**
  100        * get data from channel and store in byte array
  101        * send it to cluster
  102        * @throws IOException
  103        * @throws java.nio.channels.ClosedChannelException
  104        */
  105       protected void listen ()
  106           throws Exception
  107       {
  108           if (doListen) {
  109               log.warn("ServerSocketChannel allready started");
  110               return;
  111           }
  112           doListen=true;
  113           // allocate an unbound server socket channel
  114           ServerSocketChannel serverChannel = ServerSocketChannel.open();
  115           // Get the associated ServerSocket to bind it with
  116           ServerSocket serverSocket = serverChannel.socket();
  117           // create a new Selector for use below
  118           selector = Selector.open();
  119           // set the port the server channel will listen to
  120           serverSocket.bind (new InetSocketAddress (getBind(),getTcpListenPort()));
  121           // set non-blocking mode for the listening socket
  122           serverChannel.configureBlocking (false);
  123           // register the ServerSocketChannel with the Selector
  124           serverChannel.register (selector, SelectionKey.OP_ACCEPT);
  125           while (doListen && selector != null) {
  126               // this may block for a long time, upon return the
  127               // selected set contains keys of the ready channels
  128               try {
  129   
  130                   int n = selector.select(tcpSelectorTimeout);
  131                   if (n == 0) {
  132                       //there is a good chance that we got here 
  133                       //because the TcpReplicationThread called
  134                       //selector wakeup().
  135                       //if that happens, we must ensure that that
  136                       //thread has enough time to call interestOps
  137                       synchronized (interestOpsMutex) {
  138                           //if we got the lock, means there are no
  139                           //keys trying to register for the 
  140                           //interestOps method
  141                       }
  142                       continue; // nothing to do
  143                   }
  144                   // get an iterator over the set of selected keys
  145                   Iterator it = selector.selectedKeys().iterator();
  146                   // look at each key in the selected set
  147                   while (it.hasNext()) {
  148                       SelectionKey key = (SelectionKey) it.next();
  149                       // Is a new connection coming in?
  150                       if (key.isAcceptable()) {
  151                           ServerSocketChannel server =
  152                               (ServerSocketChannel) key.channel();
  153                           SocketChannel channel = server.accept();
  154                           Object attach = new ObjectReader(channel, selector,
  155                                       this) ;
  156                           registerChannel(selector,
  157                                           channel,
  158                                           SelectionKey.OP_READ,
  159                                           attach);
  160                       }
  161                       // is there data to read on this channel?
  162                       if (key.isReadable()) {
  163                           readDataFromSocket(key);
  164                       } else {
  165                           key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
  166                       }
  167   
  168                       // remove key from selected set, it's been handled
  169                       it.remove();
  170                   }
  171               } catch (java.nio.channels.ClosedSelectorException cse) {
  172                   // ignore is normal at shutdown or stop listen socket
  173               } catch (java.nio.channels.CancelledKeyException nx) {
  174                   log.warn(
  175                       "Replication client disconnected, error when polling key. Ignoring client.");
  176               } catch (Exception x) {
  177                   log.error("Unable to process request in ReplicationListener", x);
  178               }
  179   
  180           }
  181           serverChannel.close();
  182           if(selector != null)
  183               selector.close();
  184       }
  185   
  186       /**
  187        * Close Selector.
  188        *
  189        * @see org.apache.catalina.cluster.tcp.ClusterReceiverBase#stopListening()
  190        */
  191       protected void stopListening() {
  192           // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
  193           doListen = false;
  194           if ( selector != null ) {
  195               try {
  196                   for(int i = 0; i < getTcpThreadCount(); i++) {
  197                       selector.wakeup();
  198                   }
  199                   selector.close();
  200               } catch ( Exception x ) {
  201                   log.error("Unable to close cluster receiver selector.",x);
  202               } finally {
  203                   selector = null;                
  204               }
  205           }
  206      }
  207           
  208       // ----------------------------------------------------------
  209   
  210       /**
  211        * Register the given channel with the given selector for
  212        * the given operations of interest
  213        */
  214       protected void registerChannel (Selector selector,
  215                                       SelectableChannel channel,
  216                                       int ops,
  217                                       Object attach)
  218       throws Exception {
  219           if (channel == null) return; // could happen
  220           // set the new channel non-blocking
  221           channel.configureBlocking (false);
  222           // register it with the selector
  223           channel.register (selector, ops, attach);
  224       }
  225   
  226       // ----------------------------------------------------------
  227   
  228       /**
  229        * Sample data handler method for a channel with data ready to read.
  230        * @param key A SelectionKey object associated with a channel
  231        *  determined by the selector to be ready for reading.  If the
  232        *  channel returns an EOF condition, it is closed here, which
  233        *  automatically invalidates the associated key.  The selector
  234        *  will then de-register the channel on the next select call.
  235        */
  236       protected void readDataFromSocket (SelectionKey key)
  237           throws Exception
  238       {
  239           TcpReplicationThread worker = (TcpReplicationThread)pool.getWorker();
  240           if (worker == null) {
  241               // No threads available, do nothing, the selection
  242               // loop will keep calling this method until a
  243               // thread becomes available.
  244               // FIXME: This design could be improved.
  245               if(log.isDebugEnabled())
  246                   log.debug("No TcpReplicationThread available");
  247           } else {
  248               // invoking this wakes up the worker thread then returns
  249               worker.serviceChannel(key, isSendAck());
  250           }
  251       }
  252   
  253       
  254   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]