Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]
    1   /*
    2    * Copyright 1999,2004 The Apache Software Foundation.
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License");
    5    * you may not use this file except in compliance with the License.
    6    * You may obtain a copy of the License at
    7    * 
    8    *      http://www.apache.org/licenses/LICENSE-2.0
    9    * 
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS,
   12    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13    * See the License for the specific language governing permissions and
   14    * limitations under the License.
   15    */
   16   
   17   package org.apache.catalina.cluster.tcp;
   18   
   19   import java.io.IOException;
   20   import java.net.InetAddress;
   21   import java.util.LinkedList;
   22   
   23   /**
   24    * Send cluster messages with a pool of sockets (25).
   25    * 
   26    * FIXME support processing stats
   27    * 
   28    * @author Filip Hanik
   29    * @author Peter Rossbach
   30    * @version 1.2
   31    */
   32   
   33   public class PooledSocketSender extends DataSender {
   34   
   35       private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
   36               .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class);
   37   
   38       /**
   39        * The descriptive information about this implementation.
   40        */
   41       private static final String info = "PooledSocketSender/2.0";
   42   
   43       // ----------------------------------------------------- Instance Variables
   44   
   45       private int maxPoolSocketLimit = 25;
   46   
   47       private SenderQueue senderQueue = null;
   48   
   49       //  ----------------------------------------------------- Constructor
   50   
   51      /**
   52       * @param domain replication cluster domain (session domain)
   53       * @param host replication node tcp address
   54       * @param port replication node tcp port
   55       */
   56       public PooledSocketSender(String domain,InetAddress host, int port) {
   57           super(domain,host, port);
   58           senderQueue = new SenderQueue(this, maxPoolSocketLimit);
   59       }
   60      
   61       //  ----------------------------------------------------- Public Properties
   62   
   63       /**
   64        * Return descriptive information about this implementation and the
   65        * corresponding version number, in the format
   66        * <code>&lt;description&gt;/&lt;version&gt;</code>.
   67        */
   68       public String getInfo() {
   69   
   70           return (info);
   71   
   72       }
   73   
   74       public void setMaxPoolSocketLimit(int limit) {
   75           maxPoolSocketLimit = limit;
   76           senderQueue.setLimit(limit);
   77       }
   78   
   79       public int getMaxPoolSocketLimit() {
   80           return maxPoolSocketLimit;
   81       }
   82   
   83       public int getInPoolSize() {
   84           return senderQueue.getInPoolSize();
   85       }
   86   
   87       public int getInUsePoolSize() {
   88           return senderQueue.getInUsePoolSize();
   89       }
   90   
   91       //  ----------------------------------------------------- Public Methode
   92   
   93       public synchronized void connect() throws java.io.IOException {
   94           //do nothing, happens in the socket sender itself
   95           senderQueue.open();
   96           setSocketConnected(true);
   97           connectCounter++;
   98       }
   99   
  100       public synchronized void disconnect() {
  101           senderQueue.close();
  102           setSocketConnected(false);
  103           disconnectCounter++;
  104       }
  105   
  106       /**
  107        * send message and use a pool of SocketSenders
  108        * 
  109        * @param messageId Message unique identifier
  110        * @param data Message data
  111        * @throws java.io.IOException
  112        */
  113       public void sendMessage(ClusterData data) throws IOException {
  114           //get a socket sender from the pool
  115           if(!isConnected()) {
  116               synchronized(this) {
  117                   if(!isConnected())
  118                       connect();
  119               }
  120           }
  121           SocketSender sender = senderQueue.getSender(0);
  122           if (sender == null) {
  123               log.warn(sm.getString("PoolSocketSender.noMoreSender", this.getAddress(), new Integer(this.getPort())));
  124               return;
  125           }
  126           //send the message
  127           try {
  128               sender.sendMessage(data);
  129           } finally {
  130               //return the connection to the pool
  131               senderQueue.returnSender(sender);
  132           }
  133           addStats(data.getMessage().length);
  134       }
  135   
  136       public String toString() {
  137           StringBuffer buf = new StringBuffer("PooledSocketSender[");
  138           buf.append(getAddress()).append(":").append(getPort()).append("]");
  139           return buf.toString();
  140       }
  141   
  142       //  ----------------------------------------------------- Inner Class
  143   
  144       private class SenderQueue {
  145           private int limit = 25;
  146   
  147           PooledSocketSender parent = null;
  148   
  149           private LinkedList queue = new LinkedList();
  150   
  151           private LinkedList inuse = new LinkedList();
  152   
  153           private Object mutex = new Object();
  154   
  155           private boolean isOpen = true;
  156   
  157           public SenderQueue(PooledSocketSender parent, int limit) {
  158               this.limit = limit;
  159               this.parent = parent;
  160           }
  161   
  162           /**
  163            * @return Returns the limit.
  164            */
  165           public int getLimit() {
  166               return limit;
  167           }
  168           /**
  169            * @param limit The limit to set.
  170            */
  171           public void setLimit(int limit) {
  172               this.limit = limit;
  173           }
  174           /**
  175            * @return
  176            */
  177           public int getInUsePoolSize() {
  178               return inuse.size();
  179           }
  180   
  181           /**
  182            * @return
  183            */
  184           public int getInPoolSize() {
  185               return queue.size();
  186           }
  187   
  188           public SocketSender getSender(long timeout) {
  189               SocketSender sender = null;
  190               long start = System.currentTimeMillis();
  191               long delta = 0;
  192               do {
  193                   synchronized (mutex) {
  194                       if (!isOpen)
  195                           throw new IllegalStateException(
  196                                   "Socket pool is closed.");
  197                       if (queue.size() > 0) {
  198                           sender = (SocketSender) queue.removeFirst();
  199                       } else if (inuse.size() < limit) {
  200                           sender = getNewSocketSender();
  201                       } else {
  202                           try {
  203                               mutex.wait(timeout);
  204                           } catch (Exception x) {
  205                               PooledSocketSender.log.warn(sm.getString("PoolSocketSender.senderQueue.sender.failed",parent.getAddress(),new Integer(parent.getPort())),x);
  206                           }//catch
  207                       }//end if
  208                       if (sender != null) {
  209                           inuse.add(sender);
  210                       }
  211                   }//synchronized
  212                   delta = System.currentTimeMillis() - start;
  213               } while ((isOpen) && (sender == null)
  214                       && (timeout == 0 ? true : (delta < timeout)));
  215               //to do
  216               return sender;
  217           }
  218   
  219           public void returnSender(SocketSender sender) {
  220               //to do
  221               synchronized (mutex) {
  222                   queue.add(sender);
  223                   inuse.remove(sender);
  224                   mutex.notify();
  225               }
  226           }
  227   
  228           private SocketSender getNewSocketSender() {
  229               //new SocketSender(
  230               SocketSender sender = new SocketSender(getDomain(),parent.getAddress(), parent
  231                       .getPort());
  232               sender.setKeepAliveMaxRequestCount(parent
  233                       .getKeepAliveMaxRequestCount());
  234               sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
  235               sender.setAckTimeout(parent.getAckTimeout());
  236               sender.setWaitForAck(parent.isWaitForAck());
  237               sender.setResend(parent.isResend());
  238               return sender;
  239   
  240           }
  241   
  242           public void close() {
  243               synchronized (mutex) {
  244                   for (int i = 0; i < queue.size(); i++) {
  245                       SocketSender sender = (SocketSender) queue.get(i);
  246                       sender.disconnect();
  247                   }//for
  248                   for (int i = 0; i < inuse.size(); i++) {
  249                       SocketSender sender = (SocketSender) inuse.get(i);
  250                       sender.disconnect();
  251                   }//for
  252                   queue.clear();
  253                   inuse.clear();
  254                   isOpen = false;
  255                   mutex.notifyAll();
  256               }
  257           }
  258   
  259           public void open() {
  260               synchronized (mutex) {
  261                   isOpen = true;
  262                   mutex.notifyAll();
  263               }
  264           }
  265       }
  266   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]