Source code: edu/emory/mathcs/util/concurrent/DynamicArrayBlockingQueue.java
1 /* ***** BEGIN LICENSE BLOCK *****
2 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
3 *
4 * The contents of this file are subject to the Mozilla Public License Version
5 * 1.1 (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 * http://www.mozilla.org/MPL/
8 *
9 * Software distributed under the License is distributed on an "AS IS" basis,
10 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11 * for the specific language governing rights and limitations under the
12 * License.
13 *
14 * The Original Code is the Emory Utilities.
15 *
16 * The Initial Developer of the Original Code is
17 * The Distributed Computing Laboratory, Emory University.
18 * Portions created by the Initial Developer are Copyright (C) 2002
19 * the Initial Developer. All Rights Reserved.
20 *
21 * Alternatively, the contents of this file may be used under the terms of
22 * either the GNU General Public License Version 2 or later (the "GPL"), or
23 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
24 * in which case the provisions of the GPL or the LGPL are applicable instead
25 * of those above. If you wish to allow use of your version of this file only
26 * under the terms of either the GPL or the LGPL, and not to allow others to
27 * use your version of this file under the terms of the MPL, indicate your
28 * decision by deleting the provisions above and replace them with the notice
29 * and other provisions required by the GPL or the LGPL. If you do not delete
30 * the provisions above, a recipient may use your version of this file under
31 * the terms of any one of the MPL, the GPL or the LGPL.
32 *
33 * ***** END LICENSE BLOCK ***** */
34
35 package edu.emory.mathcs.util.concurrent;
36
37 import edu.emory.mathcs.util.*;
38 import java.util.*;
39
40 /**
41 * This class represents queue of objects. Data is generally stored at the
42 * bottom of the queue by <code>put</code> and related operations,
43 * and read from the top of the queue by <code>take</code> and related
44 * operations. The <code>putAtHead</code> family of operations
45 * is also provided to allow storing data at the head of the queue.
46 * The underlying implementation uses an array, so no memory allocation occur
47 * on queue operations apart from situations where array needs to be resized.
48 * The initial and maximum capacity of the queue can be specified at the
49 * construction time. The maximum capacity may be also set to "infinity",
50 * in which case <code>put</code> operations will never block.
51 *
52 * @author Dawid Kurzyniec
53 * @version 1.0
54 *
55 * @see BlockingQueue
56 *
57 */
58 public class DynamicArrayBlockingQueue extends AbstractQueue implements BlockingQueue {
59 Object[] buf;
60 int beg, end, length;
61 int maxcap;
62
63 /**
64 * Creates new queue object with initial capacity of 16 and unlimited
65 * maximum capacity.
66 */
67 public DynamicArrayBlockingQueue() {
68 this(16);
69 }
70
71 /**
72 * Creates new queue object with specified initial capacity and unlimited
73 * maximum capacity. Initial capacity must be greater than 0, otherwise
74 * <code>IllegalArgumentException</code> will be thrown.
75 *
76 * @param initcap initial capacity of the queue.
77 *
78 * @throws IllegalArgumentException if initial capacity is not greater
79 * than 0.
80 */
81 public DynamicArrayBlockingQueue(int initcap) {
82 this(initcap, 0);
83 }
84
85 /**
86 * Creates new queue object with specified initial capacity and specified
87 * maximum capacity. Initial capacity must be greater than 0, otherwise
88 * <code>IllegalArgumentException</code> will be thrown. Maximum
89 * capacity of less than or equal to 0 indicates no limit. Otherwise if
90 * the maximum capacity is positive number, it has to be greater than
91 * or equal to initial capacity, or <code>IllegalArgumentException</code>
92 * will be thrown.
93 *
94 * @param initcap initial capacity of the queue.
95 * @param maxcap maximum capacity of the queue.
96 *
97 * @throws IllegalArgumentException if initial capacity is not greater
98 * than 0, or maximum capacity is positive number but less than
99 * initial capacity.
100 */
101 public DynamicArrayBlockingQueue(int initcap, int maxcap) {
102 if (initcap <=0) {
103 throw new IllegalArgumentException("Invalid initial capacity: " + initcap);
104 }
105 if (maxcap > 0 && maxcap < initcap) {
106 throw new IllegalArgumentException("Invalid max capacity: " + maxcap);
107 }
108 buf = new Object[initcap];
109 beg = end = length = 0;
110 this.maxcap = maxcap;
111 }
112
113 /**
114 * Returns the number of items in this queue.
115 */
116 public synchronized int size() {
117 return length;
118 }
119
120 public int remainingCapacity() {
121 if (maxcap <= 0) return Integer.MAX_VALUE;
122 synchronized (this) {
123 return maxcap - length;
124 }
125 }
126
127 public Iterator iterator() {
128 throw new UnsupportedOperationException();
129 }
130
131 /**
132 * Puts specified object at the bottom of the queue, if possible.
133 *
134 * @param o object to be put on the bottom of the queue.
135 */
136 public synchronized boolean offer(Object o) {
137 if (beg == end && length > 0) {
138 // queue is full
139 if (!enlargeForPut()) return false;
140 }
141 put0(o);
142 return true;
143 }
144
145 /**
146 * Puts specified object at the bottom of the queue with specified timeout
147 * and returns true if operation succeeded, false otherwise.
148 * If the length of the queue is equal to its maximum capacity, this method
149 * will block until either the length decreases (in which case it proceeds
150 * with put and returns true) or timeout expires (in which case it returns
151 * false), whichever comes first. If the timeout is a positive
152 * number, it indicates a number of milliseconds to wait for available
153 * space. If the timeout is equal to 0 and the queue is full, the method
154 * will immediately return false. If the timeout is negative, the method
155 * will never timeout.
156 *
157 * @param o object to be put on the bottom of the queue.
158 * @param timeout number of milliseconds to wait for available space.
159 * @return true if operation succeeded, false if it was abandoned due
160 * to the timeout.
161 *
162 * @throws InterruptedException if operation is interrupted by other thread
163 */
164 public synchronized boolean offer(Object o, long timeout, TimeUnit granularity)
165 throws InterruptedException
166 {
167 if (full()) {
168 // queue is full
169 if (!enlargeForPut()) {
170 if (timeout == 0) return false;
171 // must wait
172 long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
173 long startTime = msecs >= 0 ? System.currentTimeMillis(): 0;
174 long waitTime = msecs;
175 while (length >= maxcap) {
176 if (startTime > 0) {
177 // wait until timeout, or return false if expired
178 if (waitTime <= 0) return false;
179 wait(waitTime);
180 waitTime = msecs - (System.currentTimeMillis() - startTime);
181 }
182 else {
183 wait();
184 }
185 }
186 }
187
188 }
189 put0(o);
190 return true;
191 }
192
193 public synchronized void put(Object o) throws InterruptedException {
194 if (full()) {
195 if (!enlargeForPut()) {
196 while (length >= maxcap) {
197 wait();
198 }
199 }
200
201 }
202 put0(o);
203 }
204
205 /**
206 * Inserts specified object at the top of the queue, so the next call
207 * to <code>get()<code> would return this object.
208 *
209 * @param o object to be inserted on the top of the queue.
210 * @throws InterruptedException if operation is interrupted by other thread.
211 */
212 public synchronized boolean offerAtHead(Object o) {
213 if (full()) {
214 if (!enlargeForPutAtHead()) return false;
215 }
216 putAtHead0(o);
217 return true;
218 }
219
220 /**
221 * Inserts specified object at the top of the queue, so the next call
222 * to <code>poll()<code> or <code>take()</code> would return this object.
223 * If the length of the queue is equal to its maximum capacity, this method
224 * will block until the length decreases so that the operation can proceed.
225 *
226 * @param o object to be put at the head of the queue.
227 * @throws InterruptedException if operation is interrupted by other thread.
228 */
229 public synchronized void putAtHead(Object o) throws InterruptedException {
230 if (full()) {
231 if (!enlargeForPutAtHead()) {
232 // must wait
233 while (length >= maxcap) {
234 wait();
235 }
236 }
237 }
238 putAtHead0(o);
239 }
240
241 /**
242 * Inserts specified object at the top of the queue, so the next call
243 * to <code>get()<code> would return this object, with specified timeout
244 * and returns true if operation succeeded, false otherwise.
245 * If the length of the queue is equal to its maximum capacity, this method
246 * will block until either the length decreases (in which case it proceeds
247 * with insert and returns true) or timeout expires (in which case it returns
248 * false), whichever comes first. If the timeout is a positive
249 * number, it indicates a number of milliseconds to wait for available
250 * space. If the timeout is equal to 0 and the queue is full, the method
251 * will immediately return false. If the timeout is negative, the method
252 * will never timeout.
253 *
254 * @param o object to be inserted on the top of the queue.
255 * @param timeout number of milliseconds to wait for available space.
256 * @return true if operation succeeded, false if it was abandoned due
257 * to the timeout.
258 * @throws InterruptedException if operation is interrupted by other thread.
259 */
260 public synchronized boolean offerAtHead(Object o, int timeout, TimeUnit granularity)
261 throws InterruptedException
262 {
263 if (full()) {
264 if (!enlargeForPutAtHead()) {
265 // must wait
266 if (timeout == 0) return false;
267 long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
268 long startTime = msecs >= 0 ? System.currentTimeMillis(): 0;
269 long waitTime = msecs;
270 while (length >= maxcap) {
271 if (startTime > 0) {
272 // wait until timeout, or return false if expired
273 if (waitTime <= 0) return false;
274 wait(waitTime);
275 waitTime = msecs - (System.currentTimeMillis() - startTime);
276 }
277 else {
278 wait();
279 }
280 }
281 }
282 }
283 putAtHead0(o);
284 return true;
285 }
286
287 /**
288 * Retrieve and remove the first element from the queue, waiting
289 * if no objects are present on the queue.
290 * @return the object
291 * @throws InterruptedException if interrupted while waiting.
292 */
293 public synchronized Object take() throws InterruptedException {
294 while (length <= 0) {
295 wait();
296 }
297 return take0();
298 }
299
300 /**
301 * Gets the object from the top of the queue.
302 * If the queue is empty, this method
303 * will block until either the length increases (in which case it proceeds
304 * with get) or timeout expires (in which case it returns null),
305 * whichever comes first. If the timeout is a positive
306 * number, it indicates a number of milliseconds to wait.
307 * If the timeout is equal to 0 and the queue is empty, the method
308 * will immediately return null. If the timeout is negative, the method
309 * will never timeout.
310 *
311 * @param timeout number of milliseconds to wait for available space.
312 * @return object from the top of the queue or null on timeout.
313 * @throws InterruptedException if operation is interrupted by other thread.
314 */
315 public synchronized Object poll(long timeout, TimeUnit granularity)
316 throws InterruptedException
317 {
318 if (length <= 0) {
319 if (timeout == 0) return null;
320 long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
321 long startTime = msecs >= 0 ? System.currentTimeMillis(): 0;
322 long waitTime = msecs;
323
324 do {
325 if (startTime > 0) {
326 // wait until timeout, or return null if expired
327 if (waitTime <= 0) return null;
328 wait(waitTime);
329 waitTime = msecs - (System.currentTimeMillis() - startTime);
330 }
331 else {
332 wait();
333 }
334 }
335 while (length <= 0);
336 }
337 return take0();
338 }
339
340 public Object peek() {
341 if (length <= 0) return null;
342 return peek0();
343 }
344
345 /**
346 * Remove and return an element from the queue if one is available.
347 *
348 * @return an element previously on the queue, or <tt>null</tt> if the
349 * queue is empty.
350 */
351 public synchronized Object poll() {
352 if (length <= 0) return null;
353 return take0();
354 }
355
356 private Object take0() {
357 Object ret = buf[beg];
358 buf[beg] = null;
359 beg++;
360 if (beg >= buf.length)
361 beg = 0;
362 length--;
363 return ret;
364 }
365
366 private Object peek0() {
367 return buf[beg];
368 }
369
370 private void put0(Object o) {
371 buf[end++] = o;
372 if (end >= buf.length) end = 0;
373 length++;
374 notifyAll();
375 }
376
377 private void putAtHead0(Object o) {
378 beg--;
379 if (beg < 0) beg = buf.length - 1;
380 buf[beg] = o;
381 length++;
382 notifyAll();
383 }
384
385 private boolean enlargeForPut() {
386 if (maxcap <= 0 || length < maxcap) {
387 int newcap = maxcap <= 0 ? length * 2 : Math.min(maxcap, length * 2);
388 Object[] newbuf = new Object[newcap];
389 System.arraycopy(buf, beg, newbuf, 0, length - beg);
390 System.arraycopy(buf, 0, newbuf, length - beg, end);
391 buf = newbuf;
392 beg = 0;
393 end = length;
394 return true;
395 }
396 return false;
397 }
398
399 private boolean enlargeForPutAtHead() {
400 if (maxcap <= 0 || length < maxcap) {
401 // can resize
402 int newcap = maxcap <= 0 ? length * 2 : Math.min(maxcap, length * 2);
403 Object[] newbuf = new Object[newcap];
404 System.arraycopy(buf, beg, newbuf, 1, length - beg);
405 System.arraycopy(buf, 0, newbuf, beg + 1, end);
406 buf = newbuf;
407 beg = 1;
408 end = length + 1;
409 return true;
410 }
411 return false;
412 }
413
414 private boolean full() {
415 return (beg == end && length > 0);
416 }
417
418 // /**
419 // * Returns the string representation of this queue.
420 // */
421 // public String toString() {
422 // String s = "(cap=" + buf.length + ", len=" + length +
423 // ", beg=" + beg + ", end=" + end + ": |";
424 // for (int i=0; i<buf.length; i++) {
425 // if (buf[i] == null) s += "."; else s += "*";
426 // }
427 // s+="|)";
428 // return s;
429 // }
430
431 }