Save This Page
Home » openjdk-7 » java » util » concurrent » [javadoc | source]
    1   /*
    2    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
    3    *
    4    * This code is free software; you can redistribute it and/or modify it
    5    * under the terms of the GNU General Public License version 2 only, as
    6    * published by the Free Software Foundation.  Sun designates this
    7    * particular file as subject to the "Classpath" exception as provided
    8    * by Sun in the LICENSE file that accompanied this code.
    9    *
   10    * This code is distributed in the hope that it will be useful, but WITHOUT
   11    * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
   12    * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
   13    * version 2 for more details (a copy is included in the LICENSE file that
   14    * accompanied this code).
   15    *
   16    * You should have received a copy of the GNU General Public License version
   17    * 2 along with this work; if not, write to the Free Software Foundation,
   18    * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
   19    *
   20    * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
   21    * CA 95054 USA or visit www.sun.com if you need additional information or
   22    * have any questions.
   23    */
   24   
   25   /*
   26    * This file is available under and governed by the GNU General Public
   27    * License version 2 only, as published by the Free Software Foundation.
   28    * However, the following notice accompanied the original version of this
   29    * file:
   30    *
   31    * Written by Doug Lea with assistance from members of JCP JSR-166
   32    * Expert Group and released to the public domain, as explained at
   33    * http://creativecommons.org/licenses/publicdomain
   34    */
   35   
   36   package java.util.concurrent;
   37   
   38   import java.util.concurrent.locks;
   39   import java.util;
   40   
   41   /**
   42    * An unbounded {@linkplain BlockingQueue blocking queue} that uses
   43    * the same ordering rules as class {@link PriorityQueue} and supplies
   44    * blocking retrieval operations.  While this queue is logically
   45    * unbounded, attempted additions may fail due to resource exhaustion
   46    * (causing <tt>OutOfMemoryError</tt>). This class does not permit
   47    * <tt>null</tt> elements.  A priority queue relying on {@linkplain
   48    * Comparable natural ordering} also does not permit insertion of
   49    * non-comparable objects (doing so results in
   50    * <tt>ClassCastException</tt>).
   51    *
   52    * <p>This class and its iterator implement all of the
   53    * <em>optional</em> methods of the {@link Collection} and {@link
   54    * Iterator} interfaces.  The Iterator provided in method {@link
   55    * #iterator()} is <em>not</em> guaranteed to traverse the elements of
   56    * the PriorityBlockingQueue in any particular order. If you need
   57    * ordered traversal, consider using
   58    * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
   59    * can be used to <em>remove</em> some or all elements in priority
   60    * order and place them in another collection.
   61    *
   62    * <p>Operations on this class make no guarantees about the ordering
   63    * of elements with equal priority. If you need to enforce an
   64    * ordering, you can define custom classes or comparators that use a
   65    * secondary key to break ties in primary priority values.  For
   66    * example, here is a class that applies first-in-first-out
   67    * tie-breaking to comparable elements. To use it, you would insert a
   68    * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
   69    *
   70    * <pre>
   71    * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
   72    *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
   73    *   final static AtomicLong seq = new AtomicLong();
   74    *   final long seqNum;
   75    *   final E entry;
   76    *   public FIFOEntry(E entry) {
   77    *     seqNum = seq.getAndIncrement();
   78    *     this.entry = entry;
   79    *   }
   80    *   public E getEntry() { return entry; }
   81    *   public int compareTo(FIFOEntry&lt;E&gt; other) {
   82    *     int res = entry.compareTo(other.entry);
   83    *     if (res == 0 &amp;&amp; other.entry != this.entry)
   84    *       res = (seqNum &lt; other.seqNum ? -1 : 1);
   85    *     return res;
   86    *   }
   87    * }</pre>
   88    *
   89    * <p>This class is a member of the
   90    * <a href="{@docRoot}/../technotes/guides/collections/index.html">
   91    * Java Collections Framework</a>.
   92    *
   93    * @since 1.5
   94    * @author Doug Lea
   95    * @param <E> the type of elements held in this collection
   96    */
   97   public class PriorityBlockingQueue<E> extends AbstractQueue<E>
   98       implements BlockingQueue<E>, java.io.Serializable {
   99       private static final long serialVersionUID = 5595510919245408276L;
  100   
  101       private final PriorityQueue<E> q;
  102       private final ReentrantLock lock = new ReentrantLock(true);
  103       private final Condition notEmpty = lock.newCondition();
  104   
  105       /**
  106        * Creates a <tt>PriorityBlockingQueue</tt> with the default
  107        * initial capacity (11) that orders its elements according to
  108        * their {@linkplain Comparable natural ordering}.
  109        */
  110       public PriorityBlockingQueue() {
  111           q = new PriorityQueue<E>();
  112       }
  113   
  114       /**
  115        * Creates a <tt>PriorityBlockingQueue</tt> with the specified
  116        * initial capacity that orders its elements according to their
  117        * {@linkplain Comparable natural ordering}.
  118        *
  119        * @param initialCapacity the initial capacity for this priority queue
  120        * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
  121        *         than 1
  122        */
  123       public PriorityBlockingQueue(int initialCapacity) {
  124           q = new PriorityQueue<E>(initialCapacity, null);
  125       }
  126   
  127       /**
  128        * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
  129        * capacity that orders its elements according to the specified
  130        * comparator.
  131        *
  132        * @param initialCapacity the initial capacity for this priority queue
  133        * @param  comparator the comparator that will be used to order this
  134        *         priority queue.  If {@code null}, the {@linkplain Comparable
  135        *         natural ordering} of the elements will be used.
  136        * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
  137        *         than 1
  138        */
  139       public PriorityBlockingQueue(int initialCapacity,
  140                                    Comparator<? super E> comparator) {
  141           q = new PriorityQueue<E>(initialCapacity, comparator);
  142       }
  143   
  144       /**
  145        * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
  146        * in the specified collection.  If the specified collection is a
  147        * {@link SortedSet} or a {@link PriorityQueue},  this
  148        * priority queue will be ordered according to the same ordering.
  149        * Otherwise, this priority queue will be ordered according to the
  150        * {@linkplain Comparable natural ordering} of its elements.
  151        *
  152        * @param  c the collection whose elements are to be placed
  153        *         into this priority queue
  154        * @throws ClassCastException if elements of the specified collection
  155        *         cannot be compared to one another according to the priority
  156        *         queue's ordering
  157        * @throws NullPointerException if the specified collection or any
  158        *         of its elements are null
  159        */
  160       public PriorityBlockingQueue(Collection<? extends E> c) {
  161           q = new PriorityQueue<E>(c);
  162       }
  163   
  164       /**
  165        * Inserts the specified element into this priority queue.
  166        *
  167        * @param e the element to add
  168        * @return <tt>true</tt> (as specified by {@link Collection#add})
  169        * @throws ClassCastException if the specified element cannot be compared
  170        *         with elements currently in the priority queue according to the
  171        *         priority queue's ordering
  172        * @throws NullPointerException if the specified element is null
  173        */
  174       public boolean add(E e) {
  175           return offer(e);
  176       }
  177   
  178       /**
  179        * Inserts the specified element into this priority queue.
  180        *
  181        * @param e the element to add
  182        * @return <tt>true</tt> (as specified by {@link Queue#offer})
  183        * @throws ClassCastException if the specified element cannot be compared
  184        *         with elements currently in the priority queue according to the
  185        *         priority queue's ordering
  186        * @throws NullPointerException if the specified element is null
  187        */
  188       public boolean offer(E e) {
  189           final ReentrantLock lock = this.lock;
  190           lock.lock();
  191           try {
  192               boolean ok = q.offer(e);
  193               assert ok;
  194               notEmpty.signal();
  195               return true;
  196           } finally {
  197               lock.unlock();
  198           }
  199       }
  200   
  201       /**
  202        * Inserts the specified element into this priority queue. As the queue is
  203        * unbounded this method will never block.
  204        *
  205        * @param e the element to add
  206        * @throws ClassCastException if the specified element cannot be compared
  207        *         with elements currently in the priority queue according to the
  208        *         priority queue's ordering
  209        * @throws NullPointerException if the specified element is null
  210        */
  211       public void put(E e) {
  212           offer(e); // never need to block
  213       }
  214   
  215       /**
  216        * Inserts the specified element into this priority queue. As the queue is
  217        * unbounded this method will never block.
  218        *
  219        * @param e the element to add
  220        * @param timeout This parameter is ignored as the method never blocks
  221        * @param unit This parameter is ignored as the method never blocks
  222        * @return <tt>true</tt>
  223        * @throws ClassCastException if the specified element cannot be compared
  224        *         with elements currently in the priority queue according to the
  225        *         priority queue's ordering
  226        * @throws NullPointerException if the specified element is null
  227        */
  228       public boolean offer(E e, long timeout, TimeUnit unit) {
  229           return offer(e); // never need to block
  230       }
  231   
  232       public E poll() {
  233           final ReentrantLock lock = this.lock;
  234           lock.lock();
  235           try {
  236               return q.poll();
  237           } finally {
  238               lock.unlock();
  239           }
  240       }
  241   
  242       public E take() throws InterruptedException {
  243           final ReentrantLock lock = this.lock;
  244           lock.lockInterruptibly();
  245           try {
  246               try {
  247                   while (q.size() == 0)
  248                       notEmpty.await();
  249               } catch (InterruptedException ie) {
  250                   notEmpty.signal(); // propagate to non-interrupted thread
  251                   throw ie;
  252               }
  253               E x = q.poll();
  254               assert x != null;
  255               return x;
  256           } finally {
  257               lock.unlock();
  258           }
  259       }
  260   
  261       public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  262           long nanos = unit.toNanos(timeout);
  263           final ReentrantLock lock = this.lock;
  264           lock.lockInterruptibly();
  265           try {
  266               for (;;) {
  267                   E x = q.poll();
  268                   if (x != null)
  269                       return x;
  270                   if (nanos <= 0)
  271                       return null;
  272                   try {
  273                       nanos = notEmpty.awaitNanos(nanos);
  274                   } catch (InterruptedException ie) {
  275                       notEmpty.signal(); // propagate to non-interrupted thread
  276                       throw ie;
  277                   }
  278               }
  279           } finally {
  280               lock.unlock();
  281           }
  282       }
  283   
  284       public E peek() {
  285           final ReentrantLock lock = this.lock;
  286           lock.lock();
  287           try {
  288               return q.peek();
  289           } finally {
  290               lock.unlock();
  291           }
  292       }
  293   
  294       /**
  295        * Returns the comparator used to order the elements in this queue,
  296        * or <tt>null</tt> if this queue uses the {@linkplain Comparable
  297        * natural ordering} of its elements.
  298        *
  299        * @return the comparator used to order the elements in this queue,
  300        *         or <tt>null</tt> if this queue uses the natural
  301        *         ordering of its elements
  302        */
  303       public Comparator<? super E> comparator() {
  304           return q.comparator();
  305       }
  306   
  307       public int size() {
  308           final ReentrantLock lock = this.lock;
  309           lock.lock();
  310           try {
  311               return q.size();
  312           } finally {
  313               lock.unlock();
  314           }
  315       }
  316   
  317       /**
  318        * Always returns <tt>Integer.MAX_VALUE</tt> because
  319        * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
  320        * @return <tt>Integer.MAX_VALUE</tt>
  321        */
  322       public int remainingCapacity() {
  323           return Integer.MAX_VALUE;
  324       }
  325   
  326       /**
  327        * Removes a single instance of the specified element from this queue,
  328        * if it is present.  More formally, removes an element {@code e} such
  329        * that {@code o.equals(e)}, if this queue contains one or more such
  330        * elements.  Returns {@code true} if and only if this queue contained
  331        * the specified element (or equivalently, if this queue changed as a
  332        * result of the call).
  333        *
  334        * @param o element to be removed from this queue, if present
  335        * @return <tt>true</tt> if this queue changed as a result of the call
  336        */
  337       public boolean remove(Object o) {
  338           final ReentrantLock lock = this.lock;
  339           lock.lock();
  340           try {
  341               return q.remove(o);
  342           } finally {
  343               lock.unlock();
  344           }
  345       }
  346   
  347       /**
  348        * Returns {@code true} if this queue contains the specified element.
  349        * More formally, returns {@code true} if and only if this queue contains
  350        * at least one element {@code e} such that {@code o.equals(e)}.
  351        *
  352        * @param o object to be checked for containment in this queue
  353        * @return <tt>true</tt> if this queue contains the specified element
  354        */
  355       public boolean contains(Object o) {
  356           final ReentrantLock lock = this.lock;
  357           lock.lock();
  358           try {
  359               return q.contains(o);
  360           } finally {
  361               lock.unlock();
  362           }
  363       }
  364   
  365       /**
  366        * Returns an array containing all of the elements in this queue.
  367        * The returned array elements are in no particular order.
  368        *
  369        * <p>The returned array will be "safe" in that no references to it are
  370        * maintained by this queue.  (In other words, this method must allocate
  371        * a new array).  The caller is thus free to modify the returned array.
  372        *
  373        * <p>This method acts as bridge between array-based and collection-based
  374        * APIs.
  375        *
  376        * @return an array containing all of the elements in this queue
  377        */
  378       public Object[] toArray() {
  379           final ReentrantLock lock = this.lock;
  380           lock.lock();
  381           try {
  382               return q.toArray();
  383           } finally {
  384               lock.unlock();
  385           }
  386       }
  387   
  388   
  389       public String toString() {
  390           final ReentrantLock lock = this.lock;
  391           lock.lock();
  392           try {
  393               return q.toString();
  394           } finally {
  395               lock.unlock();
  396           }
  397       }
  398   
  399       /**
  400        * @throws UnsupportedOperationException {@inheritDoc}
  401        * @throws ClassCastException            {@inheritDoc}
  402        * @throws NullPointerException          {@inheritDoc}
  403        * @throws IllegalArgumentException      {@inheritDoc}
  404        */
  405       public int drainTo(Collection<? super E> c) {
  406           if (c == null)
  407               throw new NullPointerException();
  408           if (c == this)
  409               throw new IllegalArgumentException();
  410           final ReentrantLock lock = this.lock;
  411           lock.lock();
  412           try {
  413               int n = 0;
  414               E e;
  415               while ( (e = q.poll()) != null) {
  416                   c.add(e);
  417                   ++n;
  418               }
  419               return n;
  420           } finally {
  421               lock.unlock();
  422           }
  423       }
  424   
  425       /**
  426        * @throws UnsupportedOperationException {@inheritDoc}
  427        * @throws ClassCastException            {@inheritDoc}
  428        * @throws NullPointerException          {@inheritDoc}
  429        * @throws IllegalArgumentException      {@inheritDoc}
  430        */
  431       public int drainTo(Collection<? super E> c, int maxElements) {
  432           if (c == null)
  433               throw new NullPointerException();
  434           if (c == this)
  435               throw new IllegalArgumentException();
  436           if (maxElements <= 0)
  437               return 0;
  438           final ReentrantLock lock = this.lock;
  439           lock.lock();
  440           try {
  441               int n = 0;
  442               E e;
  443               while (n < maxElements && (e = q.poll()) != null) {
  444                   c.add(e);
  445                   ++n;
  446               }
  447               return n;
  448           } finally {
  449               lock.unlock();
  450           }
  451       }
  452   
  453       /**
  454        * Atomically removes all of the elements from this queue.
  455        * The queue will be empty after this call returns.
  456        */
  457       public void clear() {
  458           final ReentrantLock lock = this.lock;
  459           lock.lock();
  460           try {
  461               q.clear();
  462           } finally {
  463               lock.unlock();
  464           }
  465       }
  466   
  467       /**
  468        * Returns an array containing all of the elements in this queue; the
  469        * runtime type of the returned array is that of the specified array.
  470        * The returned array elements are in no particular order.
  471        * If the queue fits in the specified array, it is returned therein.
  472        * Otherwise, a new array is allocated with the runtime type of the
  473        * specified array and the size of this queue.
  474        *
  475        * <p>If this queue fits in the specified array with room to spare
  476        * (i.e., the array has more elements than this queue), the element in
  477        * the array immediately following the end of the queue is set to
  478        * <tt>null</tt>.
  479        *
  480        * <p>Like the {@link #toArray()} method, this method acts as bridge between
  481        * array-based and collection-based APIs.  Further, this method allows
  482        * precise control over the runtime type of the output array, and may,
  483        * under certain circumstances, be used to save allocation costs.
  484        *
  485        * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
  486        * The following code can be used to dump the queue into a newly
  487        * allocated array of <tt>String</tt>:
  488        *
  489        * <pre>
  490        *     String[] y = x.toArray(new String[0]);</pre>
  491        *
  492        * Note that <tt>toArray(new Object[0])</tt> is identical in function to
  493        * <tt>toArray()</tt>.
  494        *
  495        * @param a the array into which the elements of the queue are to
  496        *          be stored, if it is big enough; otherwise, a new array of the
  497        *          same runtime type is allocated for this purpose
  498        * @return an array containing all of the elements in this queue
  499        * @throws ArrayStoreException if the runtime type of the specified array
  500        *         is not a supertype of the runtime type of every element in
  501        *         this queue
  502        * @throws NullPointerException if the specified array is null
  503        */
  504       public <T> T[] toArray(T[] a) {
  505           final ReentrantLock lock = this.lock;
  506           lock.lock();
  507           try {
  508               return q.toArray(a);
  509           } finally {
  510               lock.unlock();
  511           }
  512       }
  513   
  514       /**
  515        * Returns an iterator over the elements in this queue. The
  516        * iterator does not return the elements in any particular order.
  517        * The returned <tt>Iterator</tt> is a "weakly consistent"
  518        * iterator that will never throw {@link
  519        * ConcurrentModificationException}, and guarantees to traverse
  520        * elements as they existed upon construction of the iterator, and
  521        * may (but is not guaranteed to) reflect any modifications
  522        * subsequent to construction.
  523        *
  524        * @return an iterator over the elements in this queue
  525        */
  526       public Iterator<E> iterator() {
  527           return new Itr(toArray());
  528       }
  529   
  530       /**
  531        * Snapshot iterator that works off copy of underlying q array.
  532        */
  533       private class Itr implements Iterator<E> {
  534           final Object[] array; // Array of all elements
  535           int cursor;           // index of next element to return;
  536           int lastRet;          // index of last element, or -1 if no such
  537   
  538           Itr(Object[] array) {
  539               lastRet = -1;
  540               this.array = array;
  541           }
  542   
  543           public boolean hasNext() {
  544               return cursor < array.length;
  545           }
  546   
  547           public E next() {
  548               if (cursor >= array.length)
  549                   throw new NoSuchElementException();
  550               lastRet = cursor;
  551               return (E)array[cursor++];
  552           }
  553   
  554           public void remove() {
  555               if (lastRet < 0)
  556                   throw new IllegalStateException();
  557               Object x = array[lastRet];
  558               lastRet = -1;
  559               // Traverse underlying queue to find == element,
  560               // not just a .equals element.
  561               lock.lock();
  562               try {
  563                   for (Iterator it = q.iterator(); it.hasNext(); ) {
  564                       if (it.next() == x) {
  565                           it.remove();
  566                           return;
  567                       }
  568                   }
  569               } finally {
  570                   lock.unlock();
  571               }
  572           }
  573       }
  574   
  575       /**
  576        * Saves the state to a stream (that is, serializes it).  This
  577        * merely wraps default serialization within lock.  The
  578        * serialization strategy for items is left to underlying
  579        * Queue. Note that locking is not needed on deserialization, so
  580        * readObject is not defined, just relying on default.
  581        */
  582       private void writeObject(java.io.ObjectOutputStream s)
  583           throws java.io.IOException {
  584           lock.lock();
  585           try {
  586               s.defaultWriteObject();
  587           } finally {
  588               lock.unlock();
  589           }
  590       }
  591   
  592   }

Save This Page
Home » openjdk-7 » java » util » concurrent » [javadoc | source]