Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ra/CircularQueue.java


1   package org.activemq.ra;
2   
3   import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
4   
5   /**
6    */
7   public class CircularQueue {
8   
9       private final int size;
10  
11      private final SynchronizedBoolean stopping;
12  
13  
14      // For pooling objects
15      private final Object[] contents;
16      final private Object mutex = new Object();
17      //where the next worker to be supplied currently is.
18      private int start=0;
19      //where the next worker to be inserted will go
20      private int end=0;
21  
22      public CircularQueue(int size, SynchronizedBoolean stopping) {
23          this.size = size;
24          contents = new Object[size];
25          this.stopping = stopping;
26      }
27  
28      public Object get() {
29        synchronized(mutex) {
30          while( true ) {
31              Object ew = contents[start];
32                  if (ew != null) {
33                      start++;
34              if(start == contents.length) {
35                start=0;
36              }
37              return ew;
38            } else {
39              try {
40              mutex.wait();
41              if(stopping.get()) {
42                return null;
43              }
44            } catch (InterruptedException e) {
45              return null;
46            }
47            }
48          }
49        }
50      }
51  
52      public void returnObject(Object worker) {
53        synchronized(mutex) {
54          contents[end++] = worker;
55              if( end == contents.length) {
56                  end=0;
57              }
58          mutex.notify();
59        }
60      }
61  
62      public int size() {
63          return contents.length;
64      }
65  
66      public void drain() {
67          int i = 0;
68          while (i < size) {
69              if (get() != null) {
70                  i++;
71              }
72          }
73      }
74  
75  
76      public void notifyWaiting() {
77          synchronized(mutex) {
78          mutex.notifyAll();
79        }
80      }
81  
82  }