Home » openjdk-7 » java » util » concurrent » [javadoc | source]

    1   /*
    2    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
    3    *
    4    * This code is free software; you can redistribute it and/or modify it
    5    * under the terms of the GNU General Public License version 2 only, as
    6    * published by the Free Software Foundation.  Oracle designates this
    7    * particular file as subject to the "Classpath" exception as provided
    8    * by Oracle in the LICENSE file that accompanied this code.
    9    *
   10    * This code is distributed in the hope that it will be useful, but WITHOUT
   11    * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
   12    * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
   13    * version 2 for more details (a copy is included in the LICENSE file that
   14    * accompanied this code).
   15    *
   16    * You should have received a copy of the GNU General Public License version
   17    * 2 along with this work; if not, write to the Free Software Foundation,
   18    * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
   19    *
   20    * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
   21    * or visit www.oracle.com if you need additional information or have any
   22    * questions.
   23    */
   24   
   25   /*
   26    * This file is available under and governed by the GNU General Public
   27    * License version 2 only, as published by the Free Software Foundation.
   28    * However, the following notice accompanied the original version of this
   29    * file:
   30    *
   31    * Written by Doug Lea with assistance from members of JCP JSR-166
   32    * Expert Group and released to the public domain, as explained at
   33    * http://creativecommons.org/publicdomain/zero/1.0/
   34    */
   35   
   36   
   37   package java.util.concurrent;
   38   import java.util.concurrent.locks;
   39   import java.util;
   40   
   41   /**
   42    * An unbounded {@linkplain BlockingQueue blocking queue} of
   43    * <tt>Delayed</tt> elements, in which an element can only be taken
   44    * when its delay has expired.  The <em>head</em> of the queue is that
   45    * <tt>Delayed</tt> element whose delay expired furthest in the
   46    * past.  If no delay has expired there is no head and <tt>poll</tt>
   47    * will return <tt>null</tt>. Expiration occurs when an element's
   48    * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
   49    * than or equal to zero.  Even though unexpired elements cannot be
   50    * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
   51    * treated as normal elements. For example, the <tt>size</tt> method
   52    * returns the count of both expired and unexpired elements.
   53    * This queue does not permit null elements.
   54    *
   55    * <p>This class and its iterator implement all of the
   56    * <em>optional</em> methods of the {@link Collection} and {@link
   57    * Iterator} interfaces.
   58    *
   59    * <p>This class is a member of the
   60    * <a href="{@docRoot}/../technotes/guides/collections/index.html">
   61    * Java Collections Framework</a>.
   62    *
   63    * @since 1.5
   64    * @author Doug Lea
   65    * @param <E> the type of elements held in this collection
   66    */
   67   
   68   public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
   69       implements BlockingQueue<E> {
   70   
   71       private transient final ReentrantLock lock = new ReentrantLock();
   72       private final PriorityQueue<E> q = new PriorityQueue<E>();
   73   
   74       /**
   75        * Thread designated to wait for the element at the head of
   76        * the queue.  This variant of the Leader-Follower pattern
   77        * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
   78        * minimize unnecessary timed waiting.  When a thread becomes
   79        * the leader, it waits only for the next delay to elapse, but
   80        * other threads await indefinitely.  The leader thread must
   81        * signal some other thread before returning from take() or
   82        * poll(...), unless some other thread becomes leader in the
   83        * interim.  Whenever the head of the queue is replaced with
   84        * an element with an earlier expiration time, the leader
   85        * field is invalidated by being reset to null, and some
   86        * waiting thread, but not necessarily the current leader, is
   87        * signalled.  So waiting threads must be prepared to acquire
   88        * and lose leadership while waiting.
   89        */
   90       private Thread leader = null;
   91   
   92       /**
   93        * Condition signalled when a newer element becomes available
   94        * at the head of the queue or a new thread may need to
   95        * become leader.
   96        */
   97       private final Condition available = lock.newCondition();
   98   
   99       /**
  100        * Creates a new <tt>DelayQueue</tt> that is initially empty.
  101        */
  102       public DelayQueue() {}
  103   
  104       /**
  105        * Creates a <tt>DelayQueue</tt> initially containing the elements of the
  106        * given collection of {@link Delayed} instances.
  107        *
  108        * @param c the collection of elements to initially contain
  109        * @throws NullPointerException if the specified collection or any
  110        *         of its elements are null
  111        */
  112       public DelayQueue(Collection<? extends E> c) {
  113           this.addAll(c);
  114       }
  115   
  116       /**
  117        * Inserts the specified element into this delay queue.
  118        *
  119        * @param e the element to add
  120        * @return <tt>true</tt> (as specified by {@link Collection#add})
  121        * @throws NullPointerException if the specified element is null
  122        */
  123       public boolean add(E e) {
  124           return offer(e);
  125       }
  126   
  127       /**
  128        * Inserts the specified element into this delay queue.
  129        *
  130        * @param e the element to add
  131        * @return <tt>true</tt>
  132        * @throws NullPointerException if the specified element is null
  133        */
  134       public boolean offer(E e) {
  135           final ReentrantLock lock = this.lock;
  136           lock.lock();
  137           try {
  138               q.offer(e);
  139               if (q.peek() == e) {
  140                   leader = null;
  141                   available.signal();
  142               }
  143               return true;
  144           } finally {
  145               lock.unlock();
  146           }
  147       }
  148   
  149       /**
  150        * Inserts the specified element into this delay queue. As the queue is
  151        * unbounded this method will never block.
  152        *
  153        * @param e the element to add
  154        * @throws NullPointerException {@inheritDoc}
  155        */
  156       public void put(E e) {
  157           offer(e);
  158       }
  159   
  160       /**
  161        * Inserts the specified element into this delay queue. As the queue is
  162        * unbounded this method will never block.
  163        *
  164        * @param e the element to add
  165        * @param timeout This parameter is ignored as the method never blocks
  166        * @param unit This parameter is ignored as the method never blocks
  167        * @return <tt>true</tt>
  168        * @throws NullPointerException {@inheritDoc}
  169        */
  170       public boolean offer(E e, long timeout, TimeUnit unit) {
  171           return offer(e);
  172       }
  173   
  174       /**
  175        * Retrieves and removes the head of this queue, or returns <tt>null</tt>
  176        * if this queue has no elements with an expired delay.
  177        *
  178        * @return the head of this queue, or <tt>null</tt> if this
  179        *         queue has no elements with an expired delay
  180        */
  181       public E poll() {
  182           final ReentrantLock lock = this.lock;
  183           lock.lock();
  184           try {
  185               E first = q.peek();
  186               if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  187                   return null;
  188               else
  189                   return q.poll();
  190           } finally {
  191               lock.unlock();
  192           }
  193       }
  194   
  195       /**
  196        * Retrieves and removes the head of this queue, waiting if necessary
  197        * until an element with an expired delay is available on this queue.
  198        *
  199        * @return the head of this queue
  200        * @throws InterruptedException {@inheritDoc}
  201        */
  202       public E take() throws InterruptedException {
  203           final ReentrantLock lock = this.lock;
  204           lock.lockInterruptibly();
  205           try {
  206               for (;;) {
  207                   E first = q.peek();
  208                   if (first == null)
  209                       available.await();
  210                   else {
  211                       long delay = first.getDelay(TimeUnit.NANOSECONDS);
  212                       if (delay <= 0)
  213                           return q.poll();
  214                       else if (leader != null)
  215                           available.await();
  216                       else {
  217                           Thread thisThread = Thread.currentThread();
  218                           leader = thisThread;
  219                           try {
  220                               available.awaitNanos(delay);
  221                           } finally {
  222                               if (leader == thisThread)
  223                                   leader = null;
  224                           }
  225                       }
  226                   }
  227               }
  228           } finally {
  229               if (leader == null && q.peek() != null)
  230                   available.signal();
  231               lock.unlock();
  232           }
  233       }
  234   
  235       /**
  236        * Retrieves and removes the head of this queue, waiting if necessary
  237        * until an element with an expired delay is available on this queue,
  238        * or the specified wait time expires.
  239        *
  240        * @return the head of this queue, or <tt>null</tt> if the
  241        *         specified waiting time elapses before an element with
  242        *         an expired delay becomes available
  243        * @throws InterruptedException {@inheritDoc}
  244        */
  245       public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  246           long nanos = unit.toNanos(timeout);
  247           final ReentrantLock lock = this.lock;
  248           lock.lockInterruptibly();
  249           try {
  250               for (;;) {
  251                   E first = q.peek();
  252                   if (first == null) {
  253                       if (nanos <= 0)
  254                           return null;
  255                       else
  256                           nanos = available.awaitNanos(nanos);
  257                   } else {
  258                       long delay = first.getDelay(TimeUnit.NANOSECONDS);
  259                       if (delay <= 0)
  260                           return q.poll();
  261                       if (nanos <= 0)
  262                           return null;
  263                       if (nanos < delay || leader != null)
  264                           nanos = available.awaitNanos(nanos);
  265                       else {
  266                           Thread thisThread = Thread.currentThread();
  267                           leader = thisThread;
  268                           try {
  269                               long timeLeft = available.awaitNanos(delay);
  270                               nanos -= delay - timeLeft;
  271                           } finally {
  272                               if (leader == thisThread)
  273                                   leader = null;
  274                           }
  275                       }
  276                   }
  277               }
  278           } finally {
  279               if (leader == null && q.peek() != null)
  280                   available.signal();
  281               lock.unlock();
  282           }
  283       }
  284   
  285       /**
  286        * Retrieves, but does not remove, the head of this queue, or
  287        * returns <tt>null</tt> if this queue is empty.  Unlike
  288        * <tt>poll</tt>, if no expired elements are available in the queue,
  289        * this method returns the element that will expire next,
  290        * if one exists.
  291        *
  292        * @return the head of this queue, or <tt>null</tt> if this
  293        *         queue is empty.
  294        */
  295       public E peek() {
  296           final ReentrantLock lock = this.lock;
  297           lock.lock();
  298           try {
  299               return q.peek();
  300           } finally {
  301               lock.unlock();
  302           }
  303       }
  304   
  305       public int size() {
  306           final ReentrantLock lock = this.lock;
  307           lock.lock();
  308           try {
  309               return q.size();
  310           } finally {
  311               lock.unlock();
  312           }
  313       }
  314   
  315       /**
  316        * @throws UnsupportedOperationException {@inheritDoc}
  317        * @throws ClassCastException            {@inheritDoc}
  318        * @throws NullPointerException          {@inheritDoc}
  319        * @throws IllegalArgumentException      {@inheritDoc}
  320        */
  321       public int drainTo(Collection<? super E> c) {
  322           if (c == null)
  323               throw new NullPointerException();
  324           if (c == this)
  325               throw new IllegalArgumentException();
  326           final ReentrantLock lock = this.lock;
  327           lock.lock();
  328           try {
  329               int n = 0;
  330               for (;;) {
  331                   E first = q.peek();
  332                   if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  333                       break;
  334                   c.add(q.poll());
  335                   ++n;
  336               }
  337               return n;
  338           } finally {
  339               lock.unlock();
  340           }
  341       }
  342   
  343       /**
  344        * @throws UnsupportedOperationException {@inheritDoc}
  345        * @throws ClassCastException            {@inheritDoc}
  346        * @throws NullPointerException          {@inheritDoc}
  347        * @throws IllegalArgumentException      {@inheritDoc}
  348        */
  349       public int drainTo(Collection<? super E> c, int maxElements) {
  350           if (c == null)
  351               throw new NullPointerException();
  352           if (c == this)
  353               throw new IllegalArgumentException();
  354           if (maxElements <= 0)
  355               return 0;
  356           final ReentrantLock lock = this.lock;
  357           lock.lock();
  358           try {
  359               int n = 0;
  360               while (n < maxElements) {
  361                   E first = q.peek();
  362                   if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  363                       break;
  364                   c.add(q.poll());
  365                   ++n;
  366               }
  367               return n;
  368           } finally {
  369               lock.unlock();
  370           }
  371       }
  372   
  373       /**
  374        * Atomically removes all of the elements from this delay queue.
  375        * The queue will be empty after this call returns.
  376        * Elements with an unexpired delay are not waited for; they are
  377        * simply discarded from the queue.
  378        */
  379       public void clear() {
  380           final ReentrantLock lock = this.lock;
  381           lock.lock();
  382           try {
  383               q.clear();
  384           } finally {
  385               lock.unlock();
  386           }
  387       }
  388   
  389       /**
  390        * Always returns <tt>Integer.MAX_VALUE</tt> because
  391        * a <tt>DelayQueue</tt> is not capacity constrained.
  392        *
  393        * @return <tt>Integer.MAX_VALUE</tt>
  394        */
  395       public int remainingCapacity() {
  396           return Integer.MAX_VALUE;
  397       }
  398   
  399       /**
  400        * Returns an array containing all of the elements in this queue.
  401        * The returned array elements are in no particular order.
  402        *
  403        * <p>The returned array will be "safe" in that no references to it are
  404        * maintained by this queue.  (In other words, this method must allocate
  405        * a new array).  The caller is thus free to modify the returned array.
  406        *
  407        * <p>This method acts as bridge between array-based and collection-based
  408        * APIs.
  409        *
  410        * @return an array containing all of the elements in this queue
  411        */
  412       public Object[] toArray() {
  413           final ReentrantLock lock = this.lock;
  414           lock.lock();
  415           try {
  416               return q.toArray();
  417           } finally {
  418               lock.unlock();
  419           }
  420       }
  421   
  422       /**
  423        * Returns an array containing all of the elements in this queue; the
  424        * runtime type of the returned array is that of the specified array.
  425        * The returned array elements are in no particular order.
  426        * If the queue fits in the specified array, it is returned therein.
  427        * Otherwise, a new array is allocated with the runtime type of the
  428        * specified array and the size of this queue.
  429        *
  430        * <p>If this queue fits in the specified array with room to spare
  431        * (i.e., the array has more elements than this queue), the element in
  432        * the array immediately following the end of the queue is set to
  433        * <tt>null</tt>.
  434        *
  435        * <p>Like the {@link #toArray()} method, this method acts as bridge between
  436        * array-based and collection-based APIs.  Further, this method allows
  437        * precise control over the runtime type of the output array, and may,
  438        * under certain circumstances, be used to save allocation costs.
  439        *
  440        * <p>The following code can be used to dump a delay queue into a newly
  441        * allocated array of <tt>Delayed</tt>:
  442        *
  443        * <pre>
  444        *     Delayed[] a = q.toArray(new Delayed[0]);</pre>
  445        *
  446        * Note that <tt>toArray(new Object[0])</tt> is identical in function to
  447        * <tt>toArray()</tt>.
  448        *
  449        * @param a the array into which the elements of the queue are to
  450        *          be stored, if it is big enough; otherwise, a new array of the
  451        *          same runtime type is allocated for this purpose
  452        * @return an array containing all of the elements in this queue
  453        * @throws ArrayStoreException if the runtime type of the specified array
  454        *         is not a supertype of the runtime type of every element in
  455        *         this queue
  456        * @throws NullPointerException if the specified array is null
  457        */
  458       public <T> T[] toArray(T[] a) {
  459           final ReentrantLock lock = this.lock;
  460           lock.lock();
  461           try {
  462               return q.toArray(a);
  463           } finally {
  464               lock.unlock();
  465           }
  466       }
  467   
  468       /**
  469        * Removes a single instance of the specified element from this
  470        * queue, if it is present, whether or not it has expired.
  471        */
  472       public boolean remove(Object o) {
  473           final ReentrantLock lock = this.lock;
  474           lock.lock();
  475           try {
  476               return q.remove(o);
  477           } finally {
  478               lock.unlock();
  479           }
  480       }
  481   
  482       /**
  483        * Returns an iterator over all the elements (both expired and
  484        * unexpired) in this queue. The iterator does not return the
  485        * elements in any particular order.
  486        *
  487        * <p>The returned iterator is a "weakly consistent" iterator that
  488        * will never throw {@link java.util.ConcurrentModificationException
  489        * ConcurrentModificationException}, and guarantees to traverse
  490        * elements as they existed upon construction of the iterator, and
  491        * may (but is not guaranteed to) reflect any modifications
  492        * subsequent to construction.
  493        *
  494        * @return an iterator over the elements in this queue
  495        */
  496       public Iterator<E> iterator() {
  497           return new Itr(toArray());
  498       }
  499   
  500       /**
  501        * Snapshot iterator that works off copy of underlying q array.
  502        */
  503       private class Itr implements Iterator<E> {
  504           final Object[] array; // Array of all elements
  505           int cursor;           // index of next element to return;
  506           int lastRet;          // index of last element, or -1 if no such
  507   
  508           Itr(Object[] array) {
  509               lastRet = -1;
  510               this.array = array;
  511           }
  512   
  513           public boolean hasNext() {
  514               return cursor < array.length;
  515           }
  516   
  517           @SuppressWarnings("unchecked")
  518           public E next() {
  519               if (cursor >= array.length)
  520                   throw new NoSuchElementException();
  521               lastRet = cursor;
  522               return (E)array[cursor++];
  523           }
  524   
  525           public void remove() {
  526               if (lastRet < 0)
  527                   throw new IllegalStateException();
  528               Object x = array[lastRet];
  529               lastRet = -1;
  530               // Traverse underlying queue to find == element,
  531               // not just a .equals element.
  532               lock.lock();
  533               try {
  534                   for (Iterator it = q.iterator(); it.hasNext(); ) {
  535                       if (it.next() == x) {
  536                           it.remove();
  537                           return;
  538                       }
  539                   }
  540               } finally {
  541                   lock.unlock();
  542               }
  543           }
  544       }
  545   
  546   }

Home » openjdk-7 » java » util » concurrent » [javadoc | source]