Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » tribes » transport » nio » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.catalina.tribes.transport.nio;
   19   
   20   import java.io.IOException;
   21   import java.net.ServerSocket;
   22   import java.nio.channels.SelectableChannel;
   23   import java.nio.channels.SelectionKey;
   24   import java.nio.channels.Selector;
   25   import java.nio.channels.ServerSocketChannel;
   26   import java.nio.channels.SocketChannel;
   27   import java.util.Iterator;
   28   
   29   import org.apache.catalina.tribes.ChannelReceiver;
   30   import org.apache.catalina.tribes.io.ListenCallback;
   31   import org.apache.catalina.tribes.io.ObjectReader;
   32   import org.apache.catalina.tribes.transport.Constants;
   33   import org.apache.catalina.tribes.transport.ReceiverBase;
   34   import org.apache.catalina.tribes.transport.RxTaskPool;
   35   import org.apache.catalina.tribes.transport.AbstractRxTask;
   36   import org.apache.catalina.tribes.util.StringManager;
   37   import java.util.LinkedList;
   38   import java.util.Set;
   39   import java.nio.channels.CancelledKeyException;
   40   
   41   /**
   42    * @author Filip Hanik
   43    * @version $Revision: 538977 $ $Date: 2007-05-17 17:43:49 +0200 (jeu., 17 mai 2007) $
   44    */
   45   public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {
   46   
   47       protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(NioReceiver.class);
   48   
   49       /**
   50        * The string manager for this package.
   51        */
   52       protected StringManager sm = StringManager.getManager(Constants.Package);
   53   
   54       /**
   55        * The descriptive information about this implementation.
   56        */
   57       private static final String info = "NioReceiver/1.0";
   58   
   59       private Selector selector = null;
   60       private ServerSocketChannel serverChannel = null;
   61   
   62       protected LinkedList events = new LinkedList();
   63   //    private Object interestOpsMutex = new Object();
   64   
   65       public NioReceiver() {
   66       }
   67   
   68       /**
   69        * Return descriptive information about this implementation and the
   70        * corresponding version number, in the format
   71        * <code>&lt;description&gt;/&lt;version&gt;</code>.
   72        */
   73       public String getInfo() {
   74           return (info);
   75       }
   76   
   77   //    public Object getInterestOpsMutex() {
   78   //        return interestOpsMutex;
   79   //    }
   80   
   81       public void stop() {
   82           this.stopListening();
   83           super.stop();
   84       }
   85   
   86       /**
   87        * start cluster receiver
   88        * @throws Exception
   89        * @see org.apache.catalina.tribes.ClusterReceiver#start()
   90        */
   91       public void start() throws IOException {
   92           super.start();
   93           try {
   94               setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
   95           } catch (Exception x) {
   96               log.fatal("ThreadPool can initilzed. Listener not started", x);
   97               if ( x instanceof IOException ) throw (IOException)x;
   98               else throw new IOException(x.getMessage());
   99           }
  100           try {
  101               getBind();
  102               bind();
  103               Thread t = new Thread(this, "NioReceiver");
  104               t.setDaemon(true);
  105               t.start();
  106           } catch (Exception x) {
  107               log.fatal("Unable to start cluster receiver", x);
  108               if ( x instanceof IOException ) throw (IOException)x;
  109               else throw new IOException(x.getMessage());
  110           }
  111       }
  112       
  113       public AbstractRxTask createRxTask() {
  114           NioReplicationTask thread = new NioReplicationTask(this,this);
  115           thread.setUseBufferPool(this.getUseBufferPool());
  116           thread.setRxBufSize(getRxBufSize());
  117           thread.setOptions(getWorkerThreadOptions());
  118           return thread;
  119       }
  120       
  121       
  122       
  123       protected void bind() throws IOException {
  124           // allocate an unbound server socket channel
  125           serverChannel = ServerSocketChannel.open();
  126           // Get the associated ServerSocket to bind it with
  127           ServerSocket serverSocket = serverChannel.socket();
  128           // create a new Selector for use below
  129           selector = Selector.open();
  130           // set the port the server channel will listen to
  131           //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
  132           bind(serverSocket,getTcpListenPort(),getAutoBind());
  133           // set non-blocking mode for the listening socket
  134           serverChannel.configureBlocking(false);
  135           // register the ServerSocketChannel with the Selector
  136           serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  137           
  138       }
  139       
  140       public void addEvent(Runnable event) {
  141           if ( selector != null ) {
  142               synchronized (events) {
  143                   events.add(event);
  144               }
  145               if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
  146               if ( isListening() && selector!=null ) selector.wakeup();
  147           }
  148       }
  149   
  150       public void events() {
  151           if ( events.size() == 0 ) return;
  152           synchronized (events) {
  153               Runnable r = null;
  154               while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
  155                   try {
  156                       if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
  157                       r.run();
  158                   } catch ( Exception x ) {
  159                       log.error("",x);
  160                   }
  161               }
  162               events.clear();
  163           }
  164       }
  165       
  166       public static void cancelledKey(SelectionKey key) {
  167           ObjectReader reader = (ObjectReader)key.attachment();
  168           if ( reader != null ) {
  169               reader.setCancelled(true);
  170               reader.finish();
  171           }
  172           key.cancel(); 
  173           key.attach(null);
  174           try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
  175           try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
  176           
  177       }
  178       protected long lastCheck = System.currentTimeMillis();
  179       protected void socketTimeouts() {
  180           long now = System.currentTimeMillis();
  181           if ( (now-lastCheck) < getSelectorTimeout() ) return;
  182           //timeout
  183           Selector tmpsel = selector;
  184           Set keys =  (isListening()&&tmpsel!=null)?tmpsel.keys():null;
  185           if ( keys == null ) return;
  186           for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
  187               SelectionKey key = (SelectionKey) iter.next();
  188               try {
  189   //                if (key.interestOps() == SelectionKey.OP_READ) {
  190   //                    //only timeout sockets that we are waiting for a read from
  191   //                    ObjectReader ka = (ObjectReader) key.attachment();
  192   //                    long delta = now - ka.getLastAccess();
  193   //                    if (delta > (long) getTimeout()) {
  194   //                        cancelledKey(key);
  195   //                    }
  196   //                }
  197   //                else
  198                   if ( key.interestOps() == 0 ) {
  199                       //check for keys that didn't make it in.
  200                       ObjectReader ka = (ObjectReader) key.attachment();
  201                       if ( ka != null ) {
  202                           long delta = now - ka.getLastAccess();
  203                           if (delta > (long) getTimeout() && (!ka.isAccessed())) {
  204                               log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess()));
  205   //                            System.out.println("Interest:"+key.interestOps());
  206   //                            System.out.println("Ready Ops:"+key.readyOps());
  207   //                            System.out.println("Valid:"+key.isValid());
  208                               ka.setLastAccess(now);
  209                               //key.interestOps(SelectionKey.OP_READ);
  210                           }//end if
  211                       } else {
  212                           cancelledKey(key);
  213                       }//end if
  214                   }//end if
  215               }catch ( CancelledKeyException ckx ) {
  216                   cancelledKey(key);
  217               }
  218           }
  219           lastCheck = System.currentTimeMillis();
  220       }
  221   
  222   
  223       /**
  224        * get data from channel and store in byte array
  225        * send it to cluster
  226        * @throws IOException
  227        * @throws java.nio.channels.ClosedChannelException
  228        */
  229       protected void listen() throws Exception {
  230           if (doListen()) {
  231               log.warn("ServerSocketChannel already started");
  232               return;
  233           }
  234           
  235           setListen(true);
  236   
  237           while (doListen() && selector != null) {
  238               // this may block for a long time, upon return the
  239               // selected set contains keys of the ready channels
  240               try {
  241                   events();
  242                   socketTimeouts();
  243                   int n = selector.select(getTcpSelectorTimeout());
  244                   if (n == 0) {
  245                       //there is a good chance that we got here
  246                       //because the TcpReplicationThread called
  247                       //selector wakeup().
  248                       //if that happens, we must ensure that that
  249                       //thread has enough time to call interestOps
  250   //                    synchronized (interestOpsMutex) {
  251                           //if we got the lock, means there are no
  252                           //keys trying to register for the
  253                           //interestOps method
  254   //                    }
  255                       continue; // nothing to do
  256                   }
  257                   // get an iterator over the set of selected keys
  258                   Iterator it = selector.selectedKeys().iterator();
  259                   // look at each key in the selected set
  260                   while (it.hasNext()) {
  261                       SelectionKey key = (SelectionKey) it.next();
  262                       // Is a new connection coming in?
  263                       if (key.isAcceptable()) {
  264                           ServerSocketChannel server = (ServerSocketChannel) key.channel();
  265                           SocketChannel channel = server.accept();
  266                           channel.socket().setReceiveBufferSize(getRxBufSize());
  267                           channel.socket().setSendBufferSize(getTxBufSize());
  268                           channel.socket().setTcpNoDelay(getTcpNoDelay());
  269                           channel.socket().setKeepAlive(getSoKeepAlive());
  270                           channel.socket().setOOBInline(getOoBInline());
  271                           channel.socket().setReuseAddress(getSoReuseAddress());
  272                           channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
  273                           channel.socket().setTrafficClass(getSoTrafficClass());
  274                           channel.socket().setSoTimeout(getTimeout());
  275                           Object attach = new ObjectReader(channel);
  276                           registerChannel(selector,
  277                                           channel,
  278                                           SelectionKey.OP_READ,
  279                                           attach);
  280                       }
  281                       // is there data to read on this channel?
  282                       if (key.isReadable()) {
  283                           readDataFromSocket(key);
  284                       } else {
  285                           key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
  286                       }
  287   
  288                       // remove key from selected set, it's been handled
  289                       it.remove();
  290                   }
  291               } catch (java.nio.channels.ClosedSelectorException cse) {
  292                   // ignore is normal at shutdown or stop listen socket
  293               } catch (java.nio.channels.CancelledKeyException nx) {
  294                   log.warn("Replication client disconnected, error when polling key. Ignoring client.");
  295               } catch (Throwable x) {
  296                   try {
  297                       log.error("Unable to process request in NioReceiver", x);
  298                   }catch ( Throwable tx ) {
  299                       //in case an out of memory error, will affect the logging framework as well
  300                       tx.printStackTrace();
  301                   }
  302               }
  303   
  304           }
  305           serverChannel.close();
  306           if (selector != null)
  307               selector.close();
  308       }
  309   
  310       
  311   
  312       /**
  313        * Close Selector.
  314        *
  315        * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
  316        */
  317       protected void stopListening() {
  318           setListen(false);
  319           if (selector != null) {
  320               try {
  321                   selector.wakeup();
  322                   selector.close();
  323               } catch (Exception x) {
  324                   log.error("Unable to close cluster receiver selector.", x);
  325               } finally {
  326                   selector = null;
  327               }
  328           }
  329       }
  330   
  331       // ----------------------------------------------------------
  332   
  333       /**
  334        * Register the given channel with the given selector for
  335        * the given operations of interest
  336        */
  337       protected void registerChannel(Selector selector,
  338                                      SelectableChannel channel,
  339                                      int ops,
  340                                      Object attach) throws Exception {
  341           if (channel == null)return; // could happen
  342           // set the new channel non-blocking
  343           channel.configureBlocking(false);
  344           // register it with the selector
  345           channel.register(selector, ops, attach);
  346       }
  347   
  348       /**
  349        * Start thread and listen
  350        */
  351       public void run() {
  352           try {
  353               listen();
  354           } catch (Exception x) {
  355               log.error("Unable to run replication listener.", x);
  356           }
  357       }
  358   
  359       // ----------------------------------------------------------
  360   
  361       /**
  362        * Sample data handler method for a channel with data ready to read.
  363        * @param key A SelectionKey object associated with a channel
  364        *  determined by the selector to be ready for reading.  If the
  365        *  channel returns an EOF condition, it is closed here, which
  366        *  automatically invalidates the associated key.  The selector
  367        *  will then de-register the channel on the next select call.
  368        */
  369       protected void readDataFromSocket(SelectionKey key) throws Exception {
  370           NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
  371           if (task == null) {
  372               // No threads/tasks available, do nothing, the selection
  373               // loop will keep calling this method until a
  374               // thread becomes available, the thread pool itself has a waiting mechanism
  375               // so we will not wait here.
  376               if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
  377           } else {
  378               // invoking this wakes up the worker thread then returns
  379               //add task to thread pool
  380               task.serviceChannel(key);
  381               getExecutor().execute(task);
  382           }
  383       }
  384   
  385   
  386   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » tribes » transport » nio » [javadoc | source]