1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 37 package java.util.concurrent; 38 import java.util.concurrent.locks; 39 import java.util; 40 41 /** 42 * An unbounded {@linkplain BlockingQueue blocking queue} of 43 * <tt>Delayed</tt> elements, in which an element can only be taken 44 * when its delay has expired. The <em>head</em> of the queue is that 45 * <tt>Delayed</tt> element whose delay expired furthest in the 46 * past. If no delay has expired there is no head and <tt>poll</tt> 47 * will return <tt>null</tt>. Expiration occurs when an element's 48 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less 49 * than or equal to zero. Even though unexpired elements cannot be 50 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise 51 * treated as normal elements. For example, the <tt>size</tt> method 52 * returns the count of both expired and unexpired elements. 53 * This queue does not permit null elements. 54 * 55 * <p>This class and its iterator implement all of the 56 * <em>optional</em> methods of the {@link Collection} and {@link 57 * Iterator} interfaces. 58 * 59 * <p>This class is a member of the 60 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 61 * Java Collections Framework</a>. 62 * 63 * @since 1.5 64 * @author Doug Lea 65 * @param <E> the type of elements held in this collection 66 */ 67 68 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> 69 implements BlockingQueue<E> { 70 71 private transient final ReentrantLock lock = new ReentrantLock(); 72 private final PriorityQueue<E> q = new PriorityQueue<E>(); 73 74 /** 75 * Thread designated to wait for the element at the head of 76 * the queue. This variant of the Leader-Follower pattern 77 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 78 * minimize unnecessary timed waiting. When a thread becomes 79 * the leader, it waits only for the next delay to elapse, but 80 * other threads await indefinitely. The leader thread must 81 * signal some other thread before returning from take() or 82 * poll(...), unless some other thread becomes leader in the 83 * interim. Whenever the head of the queue is replaced with 84 * an element with an earlier expiration time, the leader 85 * field is invalidated by being reset to null, and some 86 * waiting thread, but not necessarily the current leader, is 87 * signalled. So waiting threads must be prepared to acquire 88 * and lose leadership while waiting. 89 */ 90 private Thread leader = null; 91 92 /** 93 * Condition signalled when a newer element becomes available 94 * at the head of the queue or a new thread may need to 95 * become leader. 96 */ 97 private final Condition available = lock.newCondition(); 98 99 /** 100 * Creates a new <tt>DelayQueue</tt> that is initially empty. 101 */ 102 public DelayQueue() {} 103 104 /** 105 * Creates a <tt>DelayQueue</tt> initially containing the elements of the 106 * given collection of {@link Delayed} instances. 107 * 108 * @param c the collection of elements to initially contain 109 * @throws NullPointerException if the specified collection or any 110 * of its elements are null 111 */ 112 public DelayQueue(Collection<? extends E> c) { 113 this.addAll(c); 114 } 115 116 /** 117 * Inserts the specified element into this delay queue. 118 * 119 * @param e the element to add 120 * @return <tt>true</tt> (as specified by {@link Collection#add}) 121 * @throws NullPointerException if the specified element is null 122 */ 123 public boolean add(E e) { 124 return offer(e); 125 } 126 127 /** 128 * Inserts the specified element into this delay queue. 129 * 130 * @param e the element to add 131 * @return <tt>true</tt> 132 * @throws NullPointerException if the specified element is null 133 */ 134 public boolean offer(E e) { 135 final ReentrantLock lock = this.lock; 136 lock.lock(); 137 try { 138 q.offer(e); 139 if (q.peek() == e) { 140 leader = null; 141 available.signal(); 142 } 143 return true; 144 } finally { 145 lock.unlock(); 146 } 147 } 148 149 /** 150 * Inserts the specified element into this delay queue. As the queue is 151 * unbounded this method will never block. 152 * 153 * @param e the element to add 154 * @throws NullPointerException {@inheritDoc} 155 */ 156 public void put(E e) { 157 offer(e); 158 } 159 160 /** 161 * Inserts the specified element into this delay queue. As the queue is 162 * unbounded this method will never block. 163 * 164 * @param e the element to add 165 * @param timeout This parameter is ignored as the method never blocks 166 * @param unit This parameter is ignored as the method never blocks 167 * @return <tt>true</tt> 168 * @throws NullPointerException {@inheritDoc} 169 */ 170 public boolean offer(E e, long timeout, TimeUnit unit) { 171 return offer(e); 172 } 173 174 /** 175 * Retrieves and removes the head of this queue, or returns <tt>null</tt> 176 * if this queue has no elements with an expired delay. 177 * 178 * @return the head of this queue, or <tt>null</tt> if this 179 * queue has no elements with an expired delay 180 */ 181 public E poll() { 182 final ReentrantLock lock = this.lock; 183 lock.lock(); 184 try { 185 E first = q.peek(); 186 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 187 return null; 188 else 189 return q.poll(); 190 } finally { 191 lock.unlock(); 192 } 193 } 194 195 /** 196 * Retrieves and removes the head of this queue, waiting if necessary 197 * until an element with an expired delay is available on this queue. 198 * 199 * @return the head of this queue 200 * @throws InterruptedException {@inheritDoc} 201 */ 202 public E take() throws InterruptedException { 203 final ReentrantLock lock = this.lock; 204 lock.lockInterruptibly(); 205 try { 206 for (;;) { 207 E first = q.peek(); 208 if (first == null) 209 available.await(); 210 else { 211 long delay = first.getDelay(TimeUnit.NANOSECONDS); 212 if (delay <= 0) 213 return q.poll(); 214 else if (leader != null) 215 available.await(); 216 else { 217 Thread thisThread = Thread.currentThread(); 218 leader = thisThread; 219 try { 220 available.awaitNanos(delay); 221 } finally { 222 if (leader == thisThread) 223 leader = null; 224 } 225 } 226 } 227 } 228 } finally { 229 if (leader == null && q.peek() != null) 230 available.signal(); 231 lock.unlock(); 232 } 233 } 234 235 /** 236 * Retrieves and removes the head of this queue, waiting if necessary 237 * until an element with an expired delay is available on this queue, 238 * or the specified wait time expires. 239 * 240 * @return the head of this queue, or <tt>null</tt> if the 241 * specified waiting time elapses before an element with 242 * an expired delay becomes available 243 * @throws InterruptedException {@inheritDoc} 244 */ 245 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 246 long nanos = unit.toNanos(timeout); 247 final ReentrantLock lock = this.lock; 248 lock.lockInterruptibly(); 249 try { 250 for (;;) { 251 E first = q.peek(); 252 if (first == null) { 253 if (nanos <= 0) 254 return null; 255 else 256 nanos = available.awaitNanos(nanos); 257 } else { 258 long delay = first.getDelay(TimeUnit.NANOSECONDS); 259 if (delay <= 0) 260 return q.poll(); 261 if (nanos <= 0) 262 return null; 263 if (nanos < delay || leader != null) 264 nanos = available.awaitNanos(nanos); 265 else { 266 Thread thisThread = Thread.currentThread(); 267 leader = thisThread; 268 try { 269 long timeLeft = available.awaitNanos(delay); 270 nanos -= delay - timeLeft; 271 } finally { 272 if (leader == thisThread) 273 leader = null; 274 } 275 } 276 } 277 } 278 } finally { 279 if (leader == null && q.peek() != null) 280 available.signal(); 281 lock.unlock(); 282 } 283 } 284 285 /** 286 * Retrieves, but does not remove, the head of this queue, or 287 * returns <tt>null</tt> if this queue is empty. Unlike 288 * <tt>poll</tt>, if no expired elements are available in the queue, 289 * this method returns the element that will expire next, 290 * if one exists. 291 * 292 * @return the head of this queue, or <tt>null</tt> if this 293 * queue is empty. 294 */ 295 public E peek() { 296 final ReentrantLock lock = this.lock; 297 lock.lock(); 298 try { 299 return q.peek(); 300 } finally { 301 lock.unlock(); 302 } 303 } 304 305 public int size() { 306 final ReentrantLock lock = this.lock; 307 lock.lock(); 308 try { 309 return q.size(); 310 } finally { 311 lock.unlock(); 312 } 313 } 314 315 /** 316 * @throws UnsupportedOperationException {@inheritDoc} 317 * @throws ClassCastException {@inheritDoc} 318 * @throws NullPointerException {@inheritDoc} 319 * @throws IllegalArgumentException {@inheritDoc} 320 */ 321 public int drainTo(Collection<? super E> c) { 322 if (c == null) 323 throw new NullPointerException(); 324 if (c == this) 325 throw new IllegalArgumentException(); 326 final ReentrantLock lock = this.lock; 327 lock.lock(); 328 try { 329 int n = 0; 330 for (;;) { 331 E first = q.peek(); 332 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 333 break; 334 c.add(q.poll()); 335 ++n; 336 } 337 return n; 338 } finally { 339 lock.unlock(); 340 } 341 } 342 343 /** 344 * @throws UnsupportedOperationException {@inheritDoc} 345 * @throws ClassCastException {@inheritDoc} 346 * @throws NullPointerException {@inheritDoc} 347 * @throws IllegalArgumentException {@inheritDoc} 348 */ 349 public int drainTo(Collection<? super E> c, int maxElements) { 350 if (c == null) 351 throw new NullPointerException(); 352 if (c == this) 353 throw new IllegalArgumentException(); 354 if (maxElements <= 0) 355 return 0; 356 final ReentrantLock lock = this.lock; 357 lock.lock(); 358 try { 359 int n = 0; 360 while (n < maxElements) { 361 E first = q.peek(); 362 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 363 break; 364 c.add(q.poll()); 365 ++n; 366 } 367 return n; 368 } finally { 369 lock.unlock(); 370 } 371 } 372 373 /** 374 * Atomically removes all of the elements from this delay queue. 375 * The queue will be empty after this call returns. 376 * Elements with an unexpired delay are not waited for; they are 377 * simply discarded from the queue. 378 */ 379 public void clear() { 380 final ReentrantLock lock = this.lock; 381 lock.lock(); 382 try { 383 q.clear(); 384 } finally { 385 lock.unlock(); 386 } 387 } 388 389 /** 390 * Always returns <tt>Integer.MAX_VALUE</tt> because 391 * a <tt>DelayQueue</tt> is not capacity constrained. 392 * 393 * @return <tt>Integer.MAX_VALUE</tt> 394 */ 395 public int remainingCapacity() { 396 return Integer.MAX_VALUE; 397 } 398 399 /** 400 * Returns an array containing all of the elements in this queue. 401 * The returned array elements are in no particular order. 402 * 403 * <p>The returned array will be "safe" in that no references to it are 404 * maintained by this queue. (In other words, this method must allocate 405 * a new array). The caller is thus free to modify the returned array. 406 * 407 * <p>This method acts as bridge between array-based and collection-based 408 * APIs. 409 * 410 * @return an array containing all of the elements in this queue 411 */ 412 public Object[] toArray() { 413 final ReentrantLock lock = this.lock; 414 lock.lock(); 415 try { 416 return q.toArray(); 417 } finally { 418 lock.unlock(); 419 } 420 } 421 422 /** 423 * Returns an array containing all of the elements in this queue; the 424 * runtime type of the returned array is that of the specified array. 425 * The returned array elements are in no particular order. 426 * If the queue fits in the specified array, it is returned therein. 427 * Otherwise, a new array is allocated with the runtime type of the 428 * specified array and the size of this queue. 429 * 430 * <p>If this queue fits in the specified array with room to spare 431 * (i.e., the array has more elements than this queue), the element in 432 * the array immediately following the end of the queue is set to 433 * <tt>null</tt>. 434 * 435 * <p>Like the {@link #toArray()} method, this method acts as bridge between 436 * array-based and collection-based APIs. Further, this method allows 437 * precise control over the runtime type of the output array, and may, 438 * under certain circumstances, be used to save allocation costs. 439 * 440 * <p>The following code can be used to dump a delay queue into a newly 441 * allocated array of <tt>Delayed</tt>: 442 * 443 * <pre> 444 * Delayed[] a = q.toArray(new Delayed[0]);</pre> 445 * 446 * Note that <tt>toArray(new Object[0])</tt> is identical in function to 447 * <tt>toArray()</tt>. 448 * 449 * @param a the array into which the elements of the queue are to 450 * be stored, if it is big enough; otherwise, a new array of the 451 * same runtime type is allocated for this purpose 452 * @return an array containing all of the elements in this queue 453 * @throws ArrayStoreException if the runtime type of the specified array 454 * is not a supertype of the runtime type of every element in 455 * this queue 456 * @throws NullPointerException if the specified array is null 457 */ 458 public <T> T[] toArray(T[] a) { 459 final ReentrantLock lock = this.lock; 460 lock.lock(); 461 try { 462 return q.toArray(a); 463 } finally { 464 lock.unlock(); 465 } 466 } 467 468 /** 469 * Removes a single instance of the specified element from this 470 * queue, if it is present, whether or not it has expired. 471 */ 472 public boolean remove(Object o) { 473 final ReentrantLock lock = this.lock; 474 lock.lock(); 475 try { 476 return q.remove(o); 477 } finally { 478 lock.unlock(); 479 } 480 } 481 482 /** 483 * Returns an iterator over all the elements (both expired and 484 * unexpired) in this queue. The iterator does not return the 485 * elements in any particular order. 486 * 487 * <p>The returned iterator is a "weakly consistent" iterator that 488 * will never throw {@link java.util.ConcurrentModificationException 489 * ConcurrentModificationException}, and guarantees to traverse 490 * elements as they existed upon construction of the iterator, and 491 * may (but is not guaranteed to) reflect any modifications 492 * subsequent to construction. 493 * 494 * @return an iterator over the elements in this queue 495 */ 496 public Iterator<E> iterator() { 497 return new Itr(toArray()); 498 } 499 500 /** 501 * Snapshot iterator that works off copy of underlying q array. 502 */ 503 private class Itr implements Iterator<E> { 504 final Object[] array; // Array of all elements 505 int cursor; // index of next element to return; 506 int lastRet; // index of last element, or -1 if no such 507 508 Itr(Object[] array) { 509 lastRet = -1; 510 this.array = array; 511 } 512 513 public boolean hasNext() { 514 return cursor < array.length; 515 } 516 517 @SuppressWarnings("unchecked") 518 public E next() { 519 if (cursor >= array.length) 520 throw new NoSuchElementException(); 521 lastRet = cursor; 522 return (E)array[cursor++]; 523 } 524 525 public void remove() { 526 if (lastRet < 0) 527 throw new IllegalStateException(); 528 Object x = array[lastRet]; 529 lastRet = -1; 530 // Traverse underlying queue to find == element, 531 // not just a .equals element. 532 lock.lock(); 533 try { 534 for (Iterator it = q.iterator(); it.hasNext(); ) { 535 if (it.next() == x) { 536 it.remove(); 537 return; 538 } 539 } 540 } finally { 541 lock.unlock(); 542 } 543 } 544 } 545 546 }