Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: edu/emory/mathcs/util/concurrent/BoundedLinkedQueue.java


1   /*
2     File: BoundedLinkedQueue.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    11Jun1998  dl               Create public version
12    17Jul1998  dl               Simplified by eliminating wait counts
13    25aug1998  dl               added peek
14    10oct1999  dl               lock on node object to ensure visibility
15    27jan2000  dl               setCapacity forces immediate permit reconcile
16  */
17  
18  package edu.emory.mathcs.util.concurrent;
19  
20  import java.util.*;
21  import edu.emory.mathcs.util.concurrent.*;
22  import edu.emory.mathcs.util.*;
23  
24  /**
25   * A bounded variant of
26   * LinkedQueue
27   * class. This class may be
28   * preferable to
29   * BoundedBuffer
30   * because it allows a bit more
31   * concurency among puts and takes,  because it does not
32   * pre-allocate fixed storage for elements, and allows
33   * capacity to be dynamically reset.
34   * On the other hand, since it allocates a node object
35   * on each put, it can be slow on systems with slow
36   * allocation and GC.
37   * Also, it may be
38   * preferable to
39   * LinkedQueue
40   * when you need to limit
41   * the capacity to prevent resource exhaustion. This protection
42   * normally does not hurt much performance-wise: When the
43   * queue is not empty or full, most puts and
44   * takes are still usually able to execute concurrently.
45   * @see LinkedQueue
46   * @see BoundedBuffer
47   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/edu/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
48   **/
49  
50  public class BoundedLinkedQueue extends AbstractQueue implements BlockingQueue {
51  
52    /*
53     * It might be a bit nicer if this were declared as
54     * a subclass of LinkedQueue, or a sibling class of
55     * a common abstract class. It shares much of the
56     * basic design and bookkeeping fields. But too
57     * many details differ to make this worth doing.
58     */
59  
60  
61  
62    /**
63     * Dummy header node of list. The first actual node, if it exists, is always
64     * at head_.next. After each take, the old first node becomes the head.
65     **/
66    protected LinkedNode head_;
67  
68    /**
69     * The last node of list. Put() appends to list, so modifies last_
70     **/
71    protected LinkedNode last_;
72  
73  
74    /**
75     * Helper monitor. Ensures that only one put at a time executes.
76     **/
77  
78    protected final Object putGuard_ = new Object();
79  
80    /**
81     * Helper monitor. Protects and provides wait queue for takes
82     **/
83  
84    protected final Object takeGuard_ = new Object();
85  
86  
87    /** Number of elements allowed **/
88    protected int capacity_;
89  
90  
91    /**
92     * One side of a split permit count.
93     * The counts represent permits to do a put. (The queue is full when zero).
94     * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
95     * (The length is never separately recorded, so this cannot be
96     * checked explicitly.)
97     * To minimize contention between puts and takes, the
98     * put side uses up all of its permits before transfering them from
99     * the take side. The take side just increments the count upon each take.
100    * Thus, most puts and take can run independently of each other unless
101    * the queue is empty or full.
102    * Initial value is queue capacity.
103    **/
104 
105   protected int putSidePutPermits_;
106 
107   /** Number of takes since last reconcile **/
108   protected int takeSidePutPermits_ = 0;
109 
110 
111   /**
112    * Create a queue with the given capacity
113    * @exception IllegalArgumentException if capacity less or equal to zero
114    **/
115   public BoundedLinkedQueue(int capacity) {
116     if (capacity <= 0) throw new IllegalArgumentException();
117     capacity_ = capacity;
118     putSidePutPermits_ = capacity;
119     head_ =  new LinkedNode(null);
120     last_ = head_;
121   }
122 
123   /**
124    * Create a queue with the current default capacity
125    **/
126 
127   public BoundedLinkedQueue() {
128     this(DefaultChannelCapacity.get());
129   }
130 
131   /**
132    * Move put permits from take side to put side;
133    * return the number of put side permits that are available.
134    * Call only under synch on puGuard_ AND this.
135    **/
136   protected final int reconcilePutPermits() {
137     putSidePutPermits_ += takeSidePutPermits_;
138     takeSidePutPermits_ = 0;
139     return putSidePutPermits_;
140   }
141 
142 
143   /** Return the current capacity of this queue **/
144   public synchronized int capacity() { return capacity_; }
145 
146 
147   /**
148    * Return the number of elements in the queue.
149    * This is only a snapshot value, that may be in the midst
150    * of changing. The returned value will be unreliable in the presence of
151    * active puts and takes, and should only be used as a heuristic
152    * estimate, for example for resource monitoring purposes.
153    **/
154   public synchronized int size() {
155     /*
156       This should ideally synch on putGuard_, but
157       doing so would cause it to block waiting for an in-progress
158       put, which might be stuck. So we instead use whatever
159       value of putSidePutPermits_ that we happen to read.
160     */
161     return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
162   }
163 
164   public synchronized int remainingCapacity() {
165     /*
166       This should ideally synch on putGuard_, but
167       doing so would cause it to block waiting for an in-progress
168       put, which might be stuck. So we instead use whatever
169       value of putSidePutPermits_ that we happen to read.
170     */
171     return takeSidePutPermits_ + putSidePutPermits_;
172   }
173 
174   /**
175    * Reset the capacity of this queue.
176    * If the new capacity is less than the old capacity,
177    * existing elements are NOT removed, but
178    * incoming puts will not proceed until the number of elements
179    * is less than the new capacity.
180    * @exception IllegalArgumentException if capacity less or equal to zero
181    **/
182 
183   public void setCapacity(int newCapacity) {
184     if (newCapacity <= 0) throw new IllegalArgumentException();
185     synchronized (putGuard_) {
186       synchronized(this) {
187         takeSidePutPermits_ += (newCapacity - capacity_);
188         capacity_ = newCapacity;
189 
190         // Force immediate reconcilation.
191         reconcilePutPermits();
192         notifyAll();
193       }
194     }
195   }
196 
197 
198   /** Main mechanics for take/poll **/
199   protected synchronized Object extract() {
200     synchronized(head_) {
201       Object x = null;
202       LinkedNode first = head_.next;
203       if (first != null) {
204         x = first.value;
205         first.value = null;
206         head_ = first;
207         ++takeSidePutPermits_;
208         notify();
209       }
210       return x;
211     }
212   }
213 
214   public Object peek() {
215     synchronized(head_) {
216       LinkedNode first = head_.next;
217       if (first != null)
218         return first.value;
219       else
220         return null;
221     }
222   }
223 
224   public Object take() throws InterruptedException {
225     if (Thread.interrupted()) throw new InterruptedException();
226     Object x = extract();
227     if (x != null)
228       return x;
229     else {
230       synchronized(takeGuard_) {
231         try {
232           for (;;) {
233             x = extract();
234             if (x != null) {
235               return x;
236             }
237             else {
238               takeGuard_.wait();
239             }
240           }
241         }
242         catch(InterruptedException ex) {
243           takeGuard_.notify();
244           throw ex;
245         }
246       }
247     }
248   }
249 
250   public Object poll() {
251       return extract();
252   }
253 
254   public Object poll(long timeout, TimeUnit granularity) throws InterruptedException {
255     if (Thread.interrupted()) throw new InterruptedException();
256     long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
257     Object x = extract();
258     if (x != null)
259       return x;
260     else {
261       synchronized(takeGuard_) {
262         try {
263           long waitTime = msecs;
264           long start = (msecs <= 0)? 0: System.currentTimeMillis();
265           for (;;) {
266             x = extract();
267             if (x != null || waitTime <= 0) {
268               return x;
269             }
270             else {
271               takeGuard_.wait(waitTime);
272               waitTime = msecs - (System.currentTimeMillis() - start);
273             }
274           }
275         }
276         catch(InterruptedException ex) {
277           takeGuard_.notify();
278           throw ex;
279         }
280       }
281     }
282   }
283 
284   /** Notify a waiting take if needed **/
285   protected final void allowTake() {
286     synchronized(takeGuard_) {
287       takeGuard_.notify();
288     }
289   }
290 
291 
292   /**
293    * Create and insert a node.
294    * Call only under synch on putGuard_
295    **/
296   protected void insert(Object x) {
297     --putSidePutPermits_;
298     LinkedNode p = new LinkedNode(x);
299     synchronized(last_) {
300       last_.next = p;
301       last_ = p;
302     }
303   }
304 
305 
306   /*
307      put and offer(ms) differ only in policy before insert/allowTake
308   */
309 
310   public void put(Object x) throws InterruptedException {
311     if (x == null) throw new IllegalArgumentException();
312     if (Thread.interrupted()) throw new InterruptedException();
313 
314     synchronized(putGuard_) {
315 
316       if (putSidePutPermits_ <= 0) { // wait for permit.
317         synchronized(this) {
318           if (reconcilePutPermits() <= 0) {
319             try {
320               for(;;) {
321                 wait();
322                 if (reconcilePutPermits() > 0) {
323                   break;
324                 }
325               }
326             }
327             catch (InterruptedException ex) {
328               notify();
329               throw ex;
330             }
331           }
332         }
333       }
334       insert(x);
335     }
336     // call outside of lock to loosen put/take coupling
337     allowTake();
338   }
339 
340   public boolean offer(Object x) {
341       if (x == null) throw new IllegalArgumentException();
342       synchronized (putGuard_) {
343           if (putSidePutPermits_ <= 0) {
344               synchronized (this) {
345                   if (reconcilePutPermits() <= 0) return false;
346               }
347           }
348           insert(x);
349       }
350 
351       allowTake();
352       return true;
353   }
354 
355   public boolean offer(Object x, long timeout, TimeUnit granularity)
356       throws InterruptedException
357   {
358     if (x == null) throw new IllegalArgumentException();
359     if (Thread.interrupted()) throw new InterruptedException();
360     long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
361 
362     synchronized(putGuard_) {
363 
364       if (putSidePutPermits_ <= 0) {
365         synchronized(this) {
366           if (reconcilePutPermits() <= 0) {
367             if (msecs <= 0)
368               return false;
369             else {
370               try {
371                 long waitTime = msecs;
372                 long start = System.currentTimeMillis();
373 
374                 for(;;) {
375                   wait(waitTime);
376                   if (reconcilePutPermits() > 0) {
377                     break;
378                   }
379                   else {
380                     waitTime = msecs - (System.currentTimeMillis() - start);
381                     if (waitTime <= 0) {
382                       return false;
383                     }
384                   }
385                 }
386               }
387               catch (InterruptedException ex) {
388                 notify();
389                 throw ex;
390               }
391             }
392           }
393         }
394       }
395 
396       insert(x);
397     }
398 
399     allowTake();
400     return true;
401   }
402 
403   public boolean isEmpty() {
404     synchronized(head_) {
405       return head_.next == null;
406     }
407   }
408 
409   public Iterator iterator() {
410       throw new UnsupportedOperationException();
411   }
412 }