Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: edu/emory/mathcs/util/concurrent/DynamicArrayBlockingQueue.java


1   /* ***** BEGIN LICENSE BLOCK *****
2    * Version: MPL 1.1/GPL 2.0/LGPL 2.1
3    *
4    * The contents of this file are subject to the Mozilla Public License Version
5    * 1.1 (the "License"); you may not use this file except in compliance with
6    * the License. You may obtain a copy of the License at
7    * http://www.mozilla.org/MPL/
8    *
9    * Software distributed under the License is distributed on an "AS IS" basis,
10   * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11   * for the specific language governing rights and limitations under the
12   * License.
13   *
14   * The Original Code is the Emory Utilities.
15   *
16   * The Initial Developer of the Original Code is
17   * The Distributed Computing Laboratory, Emory University.
18   * Portions created by the Initial Developer are Copyright (C) 2002
19   * the Initial Developer. All Rights Reserved.
20   *
21   * Alternatively, the contents of this file may be used under the terms of
22   * either the GNU General Public License Version 2 or later (the "GPL"), or
23   * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
24   * in which case the provisions of the GPL or the LGPL are applicable instead
25   * of those above. If you wish to allow use of your version of this file only
26   * under the terms of either the GPL or the LGPL, and not to allow others to
27   * use your version of this file under the terms of the MPL, indicate your
28   * decision by deleting the provisions above and replace them with the notice
29   * and other provisions required by the GPL or the LGPL. If you do not delete
30   * the provisions above, a recipient may use your version of this file under
31   * the terms of any one of the MPL, the GPL or the LGPL.
32   *
33   * ***** END LICENSE BLOCK ***** */
34  
35  package edu.emory.mathcs.util.concurrent;
36  
37  import edu.emory.mathcs.util.*;
38  import java.util.*;
39  
40  /**
41   * This class represents queue of objects. Data is generally stored at the
42   * bottom of the queue by <code>put</code> and related operations,
43   * and read from the top of the queue by <code>take</code> and related
44   * operations. The <code>putAtHead</code> family of operations
45   * is also provided to allow storing data at the head of the queue.
46   * The underlying implementation uses an array, so no memory allocation occur
47   * on queue operations apart from situations where array needs to be resized.
48   * The initial and maximum capacity of the queue can be specified at the
49   * construction time. The maximum capacity may be also set to "infinity",
50   * in which case <code>put</code> operations will never block.
51   *
52   * @author Dawid Kurzyniec
53   * @version 1.0
54   *
55   * @see BlockingQueue
56   *
57   */
58  public class DynamicArrayBlockingQueue extends AbstractQueue implements BlockingQueue {
59      Object[] buf;
60      int beg, end, length;
61      int maxcap;
62  
63      /**
64       * Creates new queue object with initial capacity of 16 and unlimited
65       * maximum capacity.
66       */
67      public DynamicArrayBlockingQueue() {
68          this(16);
69      }
70  
71      /**
72       * Creates new queue object with specified initial capacity and unlimited
73       * maximum capacity. Initial capacity must be greater than 0, otherwise
74       * <code>IllegalArgumentException</code> will be thrown.
75       *
76       * @param initcap initial capacity of the queue.
77       *
78       * @throws IllegalArgumentException if initial capacity is not greater
79       *         than 0.
80       */
81      public DynamicArrayBlockingQueue(int initcap) {
82          this(initcap, 0);
83      }
84  
85      /**
86       * Creates new queue object with specified initial capacity and specified
87       * maximum capacity. Initial capacity must be greater than 0, otherwise
88       * <code>IllegalArgumentException</code> will be thrown. Maximum
89       * capacity of less than or equal to 0 indicates no limit. Otherwise if
90       * the maximum capacity is positive number, it has to be greater than
91       * or equal to initial capacity, or <code>IllegalArgumentException</code>
92       * will be thrown.
93       *
94       * @param initcap initial capacity of the queue.
95       * @param maxcap maximum capacity of the queue.
96       *
97       * @throws IllegalArgumentException if initial capacity is not greater
98       *         than 0, or maximum capacity is positive number but less than
99       *         initial capacity.
100      */
101     public DynamicArrayBlockingQueue(int initcap, int maxcap) {
102         if (initcap <=0) {
103             throw new IllegalArgumentException("Invalid initial capacity: " + initcap);
104         }
105         if (maxcap > 0 && maxcap < initcap) {
106             throw new IllegalArgumentException("Invalid max capacity: " + maxcap);
107         }
108         buf = new Object[initcap];
109         beg = end = length = 0;
110         this.maxcap = maxcap;
111     }
112 
113     /**
114      * Returns the number of items in this queue.
115      */
116     public synchronized int size() {
117         return length;
118     }
119 
120     public int remainingCapacity() {
121         if (maxcap <= 0) return Integer.MAX_VALUE;
122         synchronized (this) {
123             return maxcap - length;
124         }
125     }
126 
127     public Iterator iterator() {
128         throw new UnsupportedOperationException();
129     }
130 
131     /**
132      * Puts specified object at the bottom of the queue, if possible.
133      *
134      * @param o object to be put on the bottom of the queue.
135      */
136     public synchronized boolean offer(Object o) {
137         if (beg == end && length > 0) {
138             // queue is full
139             if (!enlargeForPut()) return false;
140         }
141         put0(o);
142         return true;
143     }
144 
145     /**
146      * Puts specified object at the bottom of the queue with specified timeout
147      * and returns true if operation succeeded, false otherwise.
148      * If the length of the queue is equal to its maximum capacity, this method
149      * will block until either the length decreases (in which case it proceeds
150      * with put and returns true) or timeout expires (in which case it returns
151      * false), whichever comes first. If the timeout is a positive
152      * number, it indicates a number of milliseconds to wait for available
153      * space. If the timeout is equal to 0 and the queue is full, the method
154      * will immediately return false. If the timeout is negative, the method
155      * will never timeout.
156      *
157      * @param o object to be put on the bottom of the queue.
158      * @param timeout number of milliseconds to wait for available space.
159      * @return true if operation succeeded, false if it was abandoned due
160      *         to the timeout.
161      *
162      * @throws InterruptedException if operation is interrupted by other thread
163      */
164     public synchronized boolean offer(Object o, long timeout, TimeUnit granularity)
165         throws InterruptedException
166     {
167         if (full()) {
168             // queue is full
169             if (!enlargeForPut()) {
170                 if (timeout == 0) return false;
171                 // must wait
172                 long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
173                 long startTime = msecs >= 0 ? System.currentTimeMillis(): 0;
174                 long waitTime = msecs;
175                 while (length >= maxcap) {
176                     if (startTime > 0) {
177                         // wait until timeout, or return false if expired
178                         if (waitTime <= 0) return false;
179                         wait(waitTime);
180                         waitTime = msecs - (System.currentTimeMillis() - startTime);
181                     }
182                     else {
183                         wait();
184                     }
185                 }
186             }
187 
188         }
189         put0(o);
190         return true;
191     }
192 
193     public synchronized void put(Object o) throws InterruptedException {
194         if (full()) {
195             if (!enlargeForPut()) {
196                 while (length >= maxcap) {
197                     wait();
198                 }
199             }
200 
201         }
202         put0(o);
203     }
204 
205     /**
206      * Inserts specified object at the top of the queue, so the next call
207      * to <code>get()<code> would return this object.
208      *
209      * @param o object to be inserted on the top of the queue.
210      * @throws InterruptedException if operation is interrupted by other thread.
211      */
212     public synchronized boolean offerAtHead(Object o) {
213         if (full()) {
214             if (!enlargeForPutAtHead()) return false;
215         }
216         putAtHead0(o);
217         return true;
218     }
219 
220     /**
221      * Inserts specified object at the top of the queue, so the next call
222      * to <code>poll()<code> or <code>take()</code> would return this object.
223      * If the length of the queue is equal to its maximum capacity, this method
224      * will block until the length decreases so that the operation can proceed.
225      *
226      * @param o object to be put at the head of the queue.
227      * @throws InterruptedException if operation is interrupted by other thread.
228      */
229     public synchronized void putAtHead(Object o) throws InterruptedException {
230         if (full()) {
231             if (!enlargeForPutAtHead()) {
232                 // must wait
233                 while (length >= maxcap) {
234                     wait();
235                 }
236             }
237         }
238         putAtHead0(o);
239     }
240 
241     /**
242      * Inserts specified object at the top of the queue, so the next call
243      * to <code>get()<code> would return this object, with specified timeout
244      * and returns true if operation succeeded, false otherwise.
245      * If the length of the queue is equal to its maximum capacity, this method
246      * will block until either the length decreases (in which case it proceeds
247      * with insert and returns true) or timeout expires (in which case it returns
248      * false), whichever comes first. If the timeout is a positive
249      * number, it indicates a number of milliseconds to wait for available
250      * space. If the timeout is equal to 0 and the queue is full, the method
251      * will immediately return false. If the timeout is negative, the method
252      * will never timeout.
253      *
254      * @param o object to be inserted on the top of the queue.
255      * @param timeout number of milliseconds to wait for available space.
256      * @return true if operation succeeded, false if it was abandoned due
257      *         to the timeout.
258      * @throws InterruptedException if operation is interrupted by other thread.
259      */
260     public synchronized boolean offerAtHead(Object o, int timeout, TimeUnit granularity)
261         throws InterruptedException
262     {
263         if (full()) {
264             if (!enlargeForPutAtHead()) {
265                 // must wait
266                 if (timeout == 0) return false;
267                 long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
268                 long startTime = msecs >= 0 ? System.currentTimeMillis(): 0;
269                 long waitTime = msecs;
270                 while (length >= maxcap) {
271                     if (startTime > 0) {
272                         // wait until timeout, or return false if expired
273                         if (waitTime <= 0) return false;
274                         wait(waitTime);
275                         waitTime = msecs - (System.currentTimeMillis() - startTime);
276                     }
277                     else {
278                         wait();
279                     }
280                 }
281             }
282         }
283         putAtHead0(o);
284         return true;
285     }
286 
287     /**
288      * Retrieve and remove the first element from the queue, waiting
289      * if no objects are present on the queue.
290      * @return the object
291      * @throws InterruptedException if interrupted while waiting.
292      */
293     public synchronized Object take() throws InterruptedException {
294         while (length <= 0) {
295             wait();
296         }
297         return take0();
298     }
299 
300     /**
301      * Gets the object from the top of the queue.
302      * If the queue is empty, this method
303      * will block until either the length increases (in which case it proceeds
304      * with get) or timeout expires (in which case it returns null),
305      * whichever comes first. If the timeout is a positive
306      * number, it indicates a number of milliseconds to wait.
307      * If the timeout is equal to 0 and the queue is empty, the method
308      * will immediately return null. If the timeout is negative, the method
309      * will never timeout.
310      *
311      * @param timeout number of milliseconds to wait for available space.
312      * @return object from the top of the queue or null on timeout.
313      * @throws InterruptedException if operation is interrupted by other thread.
314      */
315     public synchronized Object poll(long timeout, TimeUnit granularity)
316         throws InterruptedException
317     {
318         if (length <= 0) {
319             if (timeout == 0) return null;
320             long msecs = granularity.convert(timeout, TimeUnit.MILLISECONDS);
321             long startTime = msecs >= 0 ? System.currentTimeMillis(): 0;
322             long waitTime = msecs;
323 
324             do {
325                 if (startTime > 0) {
326                     // wait until timeout, or return null if expired
327                     if (waitTime <= 0) return null;
328                     wait(waitTime);
329                     waitTime = msecs - (System.currentTimeMillis() - startTime);
330                 }
331                 else {
332                     wait();
333                 }
334             }
335             while (length <= 0);
336         }
337         return take0();
338     }
339 
340     public Object peek() {
341         if (length <= 0) return null;
342         return peek0();
343     }
344 
345     /**
346      * Remove and return an element from the queue if one is available.
347      *
348      * @return an element previously on the queue, or <tt>null</tt> if the
349      *         queue is empty.
350      */
351     public synchronized Object poll() {
352         if (length <= 0) return null;
353         return take0();
354     }
355 
356     private Object take0() {
357         Object ret = buf[beg];
358         buf[beg] = null;
359         beg++;
360         if (beg >= buf.length)
361             beg = 0;
362         length--;
363         return ret;
364     }
365 
366     private Object peek0() {
367         return buf[beg];
368     }
369 
370     private void put0(Object o) {
371         buf[end++] = o;
372         if (end >= buf.length) end = 0;
373         length++;
374         notifyAll();
375     }
376 
377     private void putAtHead0(Object o) {
378         beg--;
379         if (beg < 0) beg = buf.length - 1;
380         buf[beg] = o;
381         length++;
382         notifyAll();
383     }
384 
385     private boolean enlargeForPut() {
386         if (maxcap <= 0 || length < maxcap) {
387             int newcap = maxcap <= 0 ? length * 2 : Math.min(maxcap, length * 2);
388             Object[] newbuf = new Object[newcap];
389             System.arraycopy(buf, beg, newbuf, 0, length - beg);
390             System.arraycopy(buf, 0, newbuf, length - beg, end);
391             buf = newbuf;
392             beg = 0;
393             end = length;
394             return true;
395         }
396         return false;
397     }
398 
399     private boolean enlargeForPutAtHead() {
400         if (maxcap <= 0 || length < maxcap) {
401             // can resize
402             int newcap = maxcap <= 0 ? length * 2 : Math.min(maxcap, length * 2);
403             Object[] newbuf = new Object[newcap];
404             System.arraycopy(buf, beg, newbuf, 1, length - beg);
405             System.arraycopy(buf, 0, newbuf, beg + 1, end);
406             buf = newbuf;
407             beg = 1;
408             end = length + 1;
409             return true;
410         }
411         return false;
412     }
413 
414     private boolean full() {
415        return (beg == end && length > 0);
416     }
417 
418 //    /**
419 //     * Returns the string representation of this queue.
420 //     */
421 //    public String toString() {
422 //        String s = "(cap=" + buf.length + ", len=" + length +
423 //            ", beg=" + beg + ", end=" + end + ": |";
424 //        for (int i=0; i<buf.length; i++) {
425 //            if (buf[i] == null) s += "."; else s += "*";
426 //        }
427 //        s+="|)";
428 //        return s;
429 //    }
430 
431 }