Source code: edu/emory/mathcs/util/concurrent/BoundedLinkedQueue.java
1 /*
2 File: BoundedLinkedQueue.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 17Jul1998 dl Simplified by eliminating wait counts
13 25aug1998 dl added peek
14 10oct1999 dl lock on node object to ensure visibility
15 27jan2000 dl setCapacity forces immediate permit reconcile
16 */
17
18 package edu.emory.mathcs.util.concurrent;
19
20 import java.util.*;
21 import edu.emory.mathcs.util.concurrent.*;
22 import edu.emory.mathcs.util.*;
23
24 /**
25 * A bounded variant of
26 * LinkedQueue
27 * class. This class may be
28 * preferable to
29 * BoundedBuffer
30 * because it allows a bit more
31 * concurency among puts and takes, because it does not
32 * pre-allocate fixed storage for elements, and allows
33 * capacity to be dynamically reset.
34 * On the other hand, since it allocates a node object
35 * on each put, it can be slow on systems with slow
36 * allocation and GC.
37 * Also, it may be
38 * preferable to
39 * LinkedQueue
40 * when you need to limit
41 * the capacity to prevent resource exhaustion. This protection
42 * normally does not hurt much performance-wise: When the
43 * queue is not empty or full, most puts and
44 * takes are still usually able to execute concurrently.
45 * @see LinkedQueue
46 * @see BoundedBuffer
47 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/edu/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
48 **/
49
50 public class BoundedLinkedQueue extends AbstractQueue implements BlockingQueue {
51
52 /*
53 * It might be a bit nicer if this were declared as
54 * a subclass of LinkedQueue, or a sibling class of
55 * a common abstract class. It shares much of the
56 * basic design and bookkeeping fields. But too
57 * many details differ to make this worth doing.
58 */
59
60
61
62 /**
63 * Dummy header node of list. The first actual node, if it exists, is always
64 * at head_.next. After each take, the old first node becomes the head.
65 **/
66 protected LinkedNode head_;
67
68 /**
69 * The last node of list. Put() appends to list, so modifies last_
70 **/
71 protected LinkedNode last_;
72
73
74 /**
75 * Helper monitor. Ensures that only one put at a time executes.
76 **/
77
78 protected final Object putGuard_ = new Object();
79
80 /**
81 * Helper monitor. Protects and provides wait queue for takes
82 **/
83
84 protected final Object takeGuard_ = new Object();
85
86
87 /** Number of elements allowed **/
88 protected int capacity_;
89
90
91 /**
92 * One side of a split permit count.
93 * The counts represent permits to do a put. (The queue is full when zero).
94 * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
95 * (The length is never separately recorded, so this cannot be
96 * checked explicitly.)
97 * To minimize contention between puts and takes, the
98 * put side uses up all of its permits before transfering them from
99 * the take side. The take side just increments the count upon each take.
100 * Thus, most puts and take can run independently of each other unless
101 * the queue is empty or full.
102 * Initial value is queue capacity.
103 **/
104
105 protected int putSidePutPermits_;
106
107 /** Number of takes since last reconcile **/
108 protected int takeSidePutPermits_ = 0;
109
110
111 /**
112 * Create a queue with the given capacity
113 * @exception IllegalArgumentException if capacity less or equal to zero
114 **/
115 public BoundedLinkedQueue(int capacity) {
116 if (capacity <= 0) throw new IllegalArgumentException();
117 capacity_ = capacity;
118 putSidePutPermits_ = capacity;
119 head_ = new LinkedNode(null);
120 last_ = head_;
121 }
122
123 /**
124 * Create a queue with the current default capacity
125 **/
126
127 public BoundedLinkedQueue() {
128 this(DefaultChannelCapacity.get());
129 }
130
131 /**
132 * Move put permits from take side to put side;
133 * return the number of put side permits that are available.
134 * Call only under synch on puGuard_ AND this.
135 **/
136 protected final int reconcilePutPermits() {
137 putSidePutPermits_ += takeSidePutPermits_;
138 takeSidePutPermits_ = 0;
139 return putSidePutPermits_;
140 }
141
142
143 /** Return the current capacity of this queue **/
144 public synchronized int capacity() { return capacity_; }
145
146
147 /**
148 * Return the number of elements in the queue.
149 * This is only a snapshot value, that may be in the midst
150 * of changing. The returned value will be unreliable in the presence of
151 * active puts and takes, and should only be used as a heuristic
152 * estimate, for example for resource monitoring purposes.
153 **/
154 public synchronized int size() {
155 /*
156 This should ideally synch on putGuard_, but
157 doing so would cause it to block waiting for an in-progress
158 put, which might be stuck. So we instead use whatever
159 value of putSidePutPermits_ that we happen to read.
160 */
161 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
162 }
163
164 public synchronized int remainingCapacity() {
165 /*
166 This should ideally synch on putGuard_, but
167 doing so would cause it to block waiting for an in-progress
168 put, which might be stuck. So we instead use whatever
169 value of putSidePutPermits_ that we happen to read.
170 */
171 return takeSidePutPermits_ + putSidePutPermits_;
172 }
173
174 /**
175 * Reset the capacity of this queue.
176 * If the new capacity is less than the old capacity,
177 * existing elements are NOT removed, but
178 * incoming puts will not proceed until the number of elements
179 * is less than the new capacity.
180 * @exception IllegalArgumentException if capacity less or equal to zero
181 **/
182
183 public void setCapacity(int newCapacity) {
184 if (newCapacity <= 0) throw new IllegalArgumentException();
185 synchronized (putGuard_) {
186 synchronized(this) {
187 takeSidePutPermits_ += (newCapacity - capacity_);
188 capacity_ = newCapacity;
189
190 // Force immediate reconcilation.
191 reconcilePutPermits();
192 notifyAll();
193 }
194 }
195 }
196
197
198 /** Main mechanics for take/poll **/
199 protected synchronized Object extract() {
200 synchronized(head_) {
201 Object x = null;
202 LinkedNode first = head_.next;
203 if (first != null) {
204 x = first.value;
205 first.value = null;
206 head_ = first;
207 ++takeSidePutPermits_;
208 notify();
209 }
210 return x;
211 }
212 }
213
214 public Object peek() {
215 synchronized(head_) {
216 LinkedNode first = head_.next;
217 if (first != null)
218 return first.value;
219 else
220 return null;
221 }
222 }
223
224 public Object take() throws InterruptedException {
225 if (Thread.interrupted()) throw new InterruptedException();
226 Object x = extract();
227 if (x != null)
228 return x;
229 else {
230 synchronized(takeGuard_) {
231 try {
232 for (;;) {
233 x = extract();
234 if (x != null) {
235 return x;
236 }
237 else {
238 takeGuard_.wait();
239 }
240 }
241 }
242 catch(InterruptedException ex) {
243 takeGuard_.notify();
244 throw ex;
245 }
246 }
247 }
248 }
249
250 public Object poll() {
251 return extract();
252 }
253
254 public Object poll(long timeout, TimeUnit granularity) throws InterruptedException {
255 if (Thread.interrupted()) throw new InterruptedException();
256 long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
257 Object x = extract();
258 if (x != null)
259 return x;
260 else {
261 synchronized(takeGuard_) {
262 try {
263 long waitTime = msecs;
264 long start = (msecs <= 0)? 0: System.currentTimeMillis();
265 for (;;) {
266 x = extract();
267 if (x != null || waitTime <= 0) {
268 return x;
269 }
270 else {
271 takeGuard_.wait(waitTime);
272 waitTime = msecs - (System.currentTimeMillis() - start);
273 }
274 }
275 }
276 catch(InterruptedException ex) {
277 takeGuard_.notify();
278 throw ex;
279 }
280 }
281 }
282 }
283
284 /** Notify a waiting take if needed **/
285 protected final void allowTake() {
286 synchronized(takeGuard_) {
287 takeGuard_.notify();
288 }
289 }
290
291
292 /**
293 * Create and insert a node.
294 * Call only under synch on putGuard_
295 **/
296 protected void insert(Object x) {
297 --putSidePutPermits_;
298 LinkedNode p = new LinkedNode(x);
299 synchronized(last_) {
300 last_.next = p;
301 last_ = p;
302 }
303 }
304
305
306 /*
307 put and offer(ms) differ only in policy before insert/allowTake
308 */
309
310 public void put(Object x) throws InterruptedException {
311 if (x == null) throw new IllegalArgumentException();
312 if (Thread.interrupted()) throw new InterruptedException();
313
314 synchronized(putGuard_) {
315
316 if (putSidePutPermits_ <= 0) { // wait for permit.
317 synchronized(this) {
318 if (reconcilePutPermits() <= 0) {
319 try {
320 for(;;) {
321 wait();
322 if (reconcilePutPermits() > 0) {
323 break;
324 }
325 }
326 }
327 catch (InterruptedException ex) {
328 notify();
329 throw ex;
330 }
331 }
332 }
333 }
334 insert(x);
335 }
336 // call outside of lock to loosen put/take coupling
337 allowTake();
338 }
339
340 public boolean offer(Object x) {
341 if (x == null) throw new IllegalArgumentException();
342 synchronized (putGuard_) {
343 if (putSidePutPermits_ <= 0) {
344 synchronized (this) {
345 if (reconcilePutPermits() <= 0) return false;
346 }
347 }
348 insert(x);
349 }
350
351 allowTake();
352 return true;
353 }
354
355 public boolean offer(Object x, long timeout, TimeUnit granularity)
356 throws InterruptedException
357 {
358 if (x == null) throw new IllegalArgumentException();
359 if (Thread.interrupted()) throw new InterruptedException();
360 long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
361
362 synchronized(putGuard_) {
363
364 if (putSidePutPermits_ <= 0) {
365 synchronized(this) {
366 if (reconcilePutPermits() <= 0) {
367 if (msecs <= 0)
368 return false;
369 else {
370 try {
371 long waitTime = msecs;
372 long start = System.currentTimeMillis();
373
374 for(;;) {
375 wait(waitTime);
376 if (reconcilePutPermits() > 0) {
377 break;
378 }
379 else {
380 waitTime = msecs - (System.currentTimeMillis() - start);
381 if (waitTime <= 0) {
382 return false;
383 }
384 }
385 }
386 }
387 catch (InterruptedException ex) {
388 notify();
389 throw ex;
390 }
391 }
392 }
393 }
394 }
395
396 insert(x);
397 }
398
399 allowTake();
400 return true;
401 }
402
403 public boolean isEmpty() {
404 synchronized(head_) {
405 return head_.next == null;
406 }
407 }
408
409 public Iterator iterator() {
410 throw new UnsupportedOperationException();
411 }
412 }