Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » util » [javadoc | source]
    1   /*
    2    * Copyright 1999,2004-2005 The Apache Software Foundation.
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License");
    5    * you may not use this file except in compliance with the License.
    6    * You may obtain a copy of the License at
    7    * 
    8    *      http://www.apache.org/licenses/LICENSE-2.0
    9    * 
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS,
   12    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13    * See the License for the specific language governing permissions and
   14    * limitations under the License.
   15    */
   16   
   17   package org.apache.catalina.cluster.util;
   18   
   19   /**
   20    * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
   21    * length when you have strange producer thread problemes.
   22    * 
   23    * FIXME add i18n support to log messages
   24    * @author Rainer Jung
   25    * @author Peter Rossbach
   26    * @version $Revision: 345567 $ $Date: 2005-11-18 16:07:23 -0500 (Fri, 18 Nov 2005) $
   27    */
   28   public class FastQueue implements IQueue {
   29   
   30       private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
   31               .getLog(FastQueue.class);
   32   
   33       /**
   34        * This is the actual queue
   35        */
   36       private SingleRemoveSynchronizedAddLock lock = null;
   37   
   38       /**
   39        * First Object at queue (consumer message)
   40        */
   41       private LinkObject first = null;
   42   
   43       /**
   44        * Last object in queue (producer Object)
   45        */
   46       private LinkObject last = null;
   47   
   48       /**
   49        * Current Queue elements size
   50        */
   51       private int size = 0;
   52   
   53       /**
   54        * check lock to detect strange threadings things
   55        */
   56       private boolean checkLock = false;
   57   
   58       /**
   59        * protocol the thread wait times
   60        */
   61       private boolean timeWait = false;
   62   
   63       /**
   64        * calc stats data
   65        */
   66       private boolean doStats = false;
   67   
   68       private boolean inAdd = false;
   69   
   70       private boolean inRemove = false;
   71   
   72       private boolean inMutex = false;
   73   
   74       /**
   75        * limit the queue legnth ( default is unlimited)
   76        */
   77       private int maxQueueLength = 0;
   78   
   79       /**
   80        * addWaitTimeout for producer
   81        */
   82       private long addWaitTimeout = 10000L;
   83   
   84       
   85       /**
   86        * removeWaitTimeout for consumer
   87        */
   88       private long removeWaitTimeout = 30000L;
   89   
   90       /**
   91        * enabled the queue
   92        */
   93       private boolean enabled = true;
   94   
   95       /**
   96        * calc all add objects
   97        */
   98       private long addCounter = 0;
   99   
  100       /**
  101        * calc all add objetcs in error state ( see limit queue length)
  102        */
  103       private long addErrorCounter = 0;
  104   
  105       /**
  106        * calc all remove objects
  107        */
  108       private long removeCounter = 0;
  109   
  110       /**
  111        * calc all remove objects failures (hupps probleme detection)
  112        */
  113       private long removeErrorCounter = 0;
  114   
  115       /**
  116        * Calc wait time thread
  117        */
  118       private long addWait = 0;
  119   
  120       /**
  121        * Calc remove time threads
  122        */
  123       private long removeWait = 0;
  124   
  125       /**
  126        *  max queue size
  127        */
  128       private int maxSize = 0;
  129   
  130       /**
  131        * avg queue size
  132        */
  133       private long avgSize = 0;
  134   
  135       private int maxSizeSample = 0;
  136   
  137       private long avgSizeSample = 0;
  138   
  139       /**
  140        *  avg size sample interval
  141        */
  142       private int sampleInterval = 100;
  143   
  144       /**
  145        * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
  146        * Timeouts
  147        */
  148       public FastQueue() {
  149           lock = new SingleRemoveSynchronizedAddLock();
  150           lock.setAddWaitTimeout(addWaitTimeout);
  151           lock.setRemoveWaitTimeout(removeWaitTimeout);
  152       }
  153   
  154       /**
  155        * get current add wait timeout
  156        * 
  157        * @return current wait timeout
  158        */
  159       public long getAddWaitTimeout() {
  160           addWaitTimeout = lock.getAddWaitTimeout();
  161           return addWaitTimeout;
  162       }
  163   
  164       /**
  165        * Set add wait timeout (default 10000 msec)
  166        * 
  167        * @param timeout
  168        */
  169       public void setAddWaitTimeout(long timeout) {
  170           addWaitTimeout = timeout;
  171           lock.setAddWaitTimeout(addWaitTimeout);
  172       }
  173   
  174       /**
  175        * get current remove wait timeout
  176        * 
  177        * @return The timeout
  178        */
  179       public long getRemoveWaitTimeout() {
  180           removeWaitTimeout = lock.getRemoveWaitTimeout();
  181           return removeWaitTimeout;
  182       }
  183   
  184       /**
  185        * set remove wait timeout ( default 30000 msec)
  186        * 
  187        * @param timeout
  188        */
  189       public void setRemoveWaitTimeout(long timeout) {
  190           removeWaitTimeout = timeout;
  191           lock.setRemoveWaitTimeout(removeWaitTimeout);
  192       }
  193   
  194       /**
  195        * get Max Queue length
  196        * 
  197        * @see org.apache.catalina.cluster.util.IQueue#getMaxQueueLength()
  198        */
  199       public int getMaxQueueLength() {
  200           return maxQueueLength;
  201       }
  202   
  203       public void setMaxQueueLength(int length) {
  204           maxQueueLength = length;
  205       }
  206   
  207       public boolean isEnabled() {
  208           return enabled;
  209       }
  210   
  211       public void setEnabled(boolean enable) {
  212           enabled = enable;
  213           if (!enabled) {
  214               lock.abortRemove();
  215           }
  216       }
  217   
  218       /**
  219        * @return Returns the checkLock.
  220        */
  221       public boolean isCheckLock() {
  222           return checkLock;
  223       }
  224   
  225       /**
  226        * @param checkLock The checkLock to set.
  227        */
  228       public void setCheckLock(boolean checkLock) {
  229           this.checkLock = checkLock;
  230       }
  231   
  232       /**
  233        * @return Returns the doStats.
  234        */
  235       public boolean isDoStats() {
  236           return doStats;
  237       }
  238   
  239       /**
  240        * @param doStats The doStats to set.
  241        */
  242       public void setDoStats(boolean doStats) {
  243           this.doStats = doStats;
  244       }
  245   
  246       /**
  247        * @return Returns the timeWait.
  248        */
  249       public boolean isTimeWait() {
  250           return timeWait;
  251       }
  252   
  253       /**
  254        * @param timeWait The timeWait to set.
  255        */
  256       public void setTimeWait(boolean timeWait) {
  257           this.timeWait = timeWait;
  258       }
  259   
  260       public int getSampleInterval() {
  261           return sampleInterval;
  262       }
  263   
  264       public void setSampleInterval(int interval) {
  265           sampleInterval = interval;
  266       }
  267   
  268       public long getAddCounter() {
  269           return addCounter;
  270       }
  271   
  272       public void setAddCounter(long counter) {
  273           addCounter = counter;
  274       }
  275   
  276       public long getAddErrorCounter() {
  277           return addErrorCounter;
  278       }
  279   
  280       public void setAddErrorCounter(long counter) {
  281           addErrorCounter = counter;
  282       }
  283   
  284       public long getRemoveCounter() {
  285           return removeCounter;
  286       }
  287   
  288       public void setRemoveCounter(long counter) {
  289           removeCounter = counter;
  290       }
  291   
  292       public long getRemoveErrorCounter() {
  293           return removeErrorCounter;
  294       }
  295   
  296       public void setRemoveErrorCounter(long counter) {
  297           removeErrorCounter = counter;
  298       }
  299   
  300       public long getAddWait() {
  301           return addWait;
  302       }
  303   
  304       public void setAddWait(long wait) {
  305           addWait = wait;
  306       }
  307   
  308       public long getRemoveWait() {
  309           return removeWait;
  310       }
  311   
  312       public void setRemoveWait(long wait) {
  313           removeWait = wait;
  314       }
  315   
  316       /**
  317        * @return The max size
  318        */
  319       public int getMaxSize() {
  320           return maxSize;
  321       }
  322   
  323       /**
  324        * @param size
  325        */
  326       public void setMaxSize(int size) {
  327           maxSize = size;
  328       }
  329   
  330       
  331       /**
  332        * Avg queue size
  333        * @return The average queue size
  334        */
  335       public long getAvgSize() {
  336           if (addCounter > 0) {
  337               return avgSize / addCounter;
  338           } else {
  339               return 0;
  340           }
  341       }
  342   
  343       /**
  344        * reset all stats data 
  345        */
  346       public void resetStatistics() {
  347           addCounter = 0;
  348           addErrorCounter = 0;
  349           removeCounter = 0;
  350           removeErrorCounter = 0;
  351           avgSize = 0;
  352           maxSize = 0;
  353           addWait = 0;
  354           removeWait = 0;
  355       }
  356   
  357       /**
  358        * unlock queue for next add 
  359        */
  360       public void unlockAdd() {
  361           lock.unlockAdd(size > 0 ? true : false);
  362       }
  363   
  364       /**
  365        * unlock queue for next remove 
  366        */
  367       public void unlockRemove() {
  368           lock.unlockRemove();
  369       }
  370   
  371       /**
  372        * start queuing
  373        */
  374       public void start() {
  375           setEnabled(true);
  376       }
  377   
  378       /**
  379        * start queuing
  380        */
  381       public void stop() {
  382           setEnabled(false);
  383       }
  384   
  385       public long getSample() {
  386           return addCounter % sampleInterval;
  387       }
  388   
  389       public int getMaxSizeSample() {
  390           return maxSizeSample;
  391       }
  392   
  393       public void setMaxSizeSample(int size) {
  394           maxSizeSample = size;
  395       }
  396   
  397       public long getAvgSizeSample() {
  398           long sample = addCounter % sampleInterval;
  399           if (sample > 0) {
  400               return avgSizeSample / sample;
  401           } else if (addCounter > 0) {
  402               return avgSizeSample / sampleInterval;
  403           } else {
  404               return 0;
  405           }
  406       }
  407   
  408       public int getSize() {
  409           int sz;
  410           sz = size;
  411           return sz;
  412       }
  413   
  414       /**
  415        * Add new data to the queue
  416        * @see org.apache.catalina.cluster.util.IQueue#add(java.lang.String, java.lang.Object)
  417        * FIXME extract some method
  418        */
  419       public boolean add(String key, Object data) {
  420           boolean ok = true;
  421           long time = 0;
  422   
  423           if (!enabled) {
  424               if (log.isInfoEnabled())
  425                   log.info("FastQueue.add: queue disabled, add aborted");
  426               return false;
  427           }
  428   
  429           if (timeWait) {
  430               time = System.currentTimeMillis();
  431           }
  432           lock.lockAdd();
  433           try {
  434               if (timeWait) {
  435                   addWait += (System.currentTimeMillis() - time);
  436               }
  437   
  438               if (log.isTraceEnabled()) {
  439                   log.trace("FastQueue.add: starting with size " + size);
  440               }
  441               if (checkLock) {
  442                   if (inAdd)
  443                       log.warn("FastQueue.add: Detected other add");
  444                   inAdd = true;
  445                   if (inMutex)
  446                       log.warn("FastQueue.add: Detected other mutex in add");
  447                   inMutex = true;
  448               }
  449   
  450               if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
  451                   ok = false;
  452                   if (log.isTraceEnabled()) {
  453                       log.trace("FastQueue.add: Could not add, since queue is full ("
  454                               + size + ">=" + maxQueueLength + ")");
  455                   }
  456   
  457               } else {
  458                   LinkObject element = new LinkObject(key, data);
  459                   if (size == 0) {
  460                       first = last = element;
  461                       size = 1;
  462                   } else {
  463                       if (last == null) {
  464                           ok = false;
  465                           log
  466                                   .error("FastQueue.add: Could not add, since last is null although size is "
  467                                           + size + " (>0)");
  468                       } else {
  469                           last.append(element);
  470                           last = element;
  471                           size++;
  472                       }
  473                   }
  474   
  475               }
  476   
  477               if (doStats) {
  478                   if (ok) {
  479                       if (addCounter % sampleInterval == 0) {
  480                           maxSizeSample = 0;
  481                           avgSizeSample = 0;
  482                       }
  483                       addCounter++;
  484                       if (size > maxSize) {
  485                           maxSize = size;
  486                       }
  487                       if (size > maxSizeSample) {
  488                           maxSizeSample = size;
  489                       }
  490                       avgSize += size;
  491                       avgSizeSample += size;
  492                   } else {
  493                       addErrorCounter++;
  494                   }
  495               }
  496   
  497               if (first == null) {
  498                   log.error("FastQueue.add: first is null, size is " + size
  499                           + " at end of add");
  500               }
  501               if (last == null) {
  502                   log.error("FastQueue.add: last is null, size is " + size
  503                           + " at end of add");
  504               }
  505   
  506               if (checkLock) {
  507                   if (!inMutex)
  508                       log.warn("FastQueue.add: Cancelled by other mutex in add");
  509                   inMutex = false;
  510                   if (!inAdd)
  511                       log.warn("FastQueue.add: Cancelled by other add");
  512                   inAdd = false;
  513               }
  514               if (log.isTraceEnabled()) {
  515                   log.trace("FastQueue.add: add ending with size " + size);
  516               }
  517   
  518               if (timeWait) {
  519                   time = System.currentTimeMillis();
  520               }
  521           } finally {
  522               lock.unlockAdd(true);
  523           }
  524           if (timeWait) {
  525               addWait += (System.currentTimeMillis() - time);
  526           }
  527           return ok;
  528       }
  529   
  530       /**
  531        * remove the complete queued object list
  532        * @see org.apache.catalina.cluster.util.IQueue#remove()
  533        * FIXME extract some method
  534        */
  535       public LinkObject remove() {
  536           LinkObject element;
  537           boolean gotLock;
  538           long time = 0;
  539   
  540           if (!enabled) {
  541               if (log.isInfoEnabled())
  542                   log.info("FastQueue.remove: queue disabled, remove aborted");
  543               return null;
  544           }
  545   
  546           if (timeWait) {
  547               time = System.currentTimeMillis();
  548           }
  549           gotLock = lock.lockRemove();
  550           try {
  551   
  552               if (!gotLock) {
  553                   if (enabled) {
  554                       if (timeWait) {
  555                           removeWait += (System.currentTimeMillis() - time);
  556                       }
  557                       if (doStats) {
  558                           removeErrorCounter++;
  559                       }
  560                       if (log.isInfoEnabled())
  561                           log.info("FastQueue.remove: Remove aborted although queue enabled");
  562                   } else {
  563                       if (log.isInfoEnabled())
  564                           log.info("FastQueue.remove: queue disabled, remove aborted");
  565                   }
  566                   return null;
  567               }
  568   
  569               if (timeWait) {
  570                   removeWait += (System.currentTimeMillis() - time);
  571               }
  572   
  573               if (log.isTraceEnabled()) {
  574                   log.trace("FastQueue.remove: remove starting with size " + size);
  575               }
  576               if (checkLock) {
  577                   if (inRemove)
  578                       log.warn("FastQueue.remove: Detected other remove");
  579                   inRemove = true;
  580                   if (inMutex)
  581                       log.warn("FastQueue.remove: Detected other mutex in remove");
  582                   inMutex = true;
  583               }
  584   
  585               element = first;
  586   
  587               if (doStats) {
  588                   if (element != null) {
  589                       removeCounter++;
  590                   } else {
  591                       removeErrorCounter++;
  592                       log
  593                               .error("FastQueue.remove: Could not remove, since first is null although size is "
  594                                       + size + " (>0)");
  595                   }
  596               }
  597   
  598               first = last = null;
  599               size = 0;
  600   
  601               if (checkLock) {
  602                   if (!inMutex)
  603                       log.warn("FastQueue.remove: Cancelled by other mutex in remove");
  604                   inMutex = false;
  605                   if (!inRemove)
  606                       log.warn("FastQueue.remove: Cancelled by other remove");
  607                   inRemove = false;
  608               }
  609               if (log.isTraceEnabled()) {
  610                   log.trace("FastQueue.remove: remove ending with size " + size);
  611               }
  612   
  613               if (timeWait) {
  614                   time = System.currentTimeMillis();
  615               }
  616           } finally {
  617               lock.unlockRemove();
  618           }
  619           if (timeWait) {
  620               removeWait += (System.currentTimeMillis() - time);
  621           }
  622           return element;
  623       }
  624   
  625   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » util » [javadoc | source]