Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/io/util/MemoryBoundedQueue.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.io.util;
20  
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  import java.util.List;
24  
25  import javax.jms.JMSException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.activemq.service.QueueListEntry;
30  import org.activemq.service.impl.DefaultQueueList;
31  
32  import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
33  
34  /**
35   * MemoryBoundedQueue is a queue bounded by memory usage for MemoryManageables
36   *
37   * @version $Revision: 1.1.1.1 $
38   */
39  public class MemoryBoundedQueue implements MemoryBoundedObject {
40      
41      private static final Log log = LogFactory.getLog(MemoryBoundedQueue.class);
42      
43      private static final int OBJECT_OVERHEAD = 50;
44      protected static final int WAIT_TIMEOUT = 100;
45  
46      private final MemoryBoundedQueueManager manager;
47      private final String name;
48  
49      protected final Object outLock = new Object();
50      protected final Object inLock = new Object();
51      private final DefaultQueueList internalList = new DefaultQueueList();
52      protected boolean stopped = false;
53      protected boolean closed = false;
54      private SynchronizedLong memoryUsedByThisQueue = new SynchronizedLong(0);
55  
56      /**
57       * Constructor
58       *
59       * @param name
60       * @param manager
61       * @param name 
62       */
63      public MemoryBoundedQueue(MemoryBoundedQueueManager manager, String name) {
64          this.manager = manager;
65          this.name = name;
66          this.manager.add(this);
67      }
68      /**
69       * @return a pretty print of this queue
70       */
71      public String toString() {
72          return "MemoryBoundedQueue{ size=" + size() + ", memory usage=" + memoryUsedByThisQueue + " }";
73      }
74  
75      /**
76       * @return the number of items held by this queue
77       */
78      public int size() {
79          return internalList.size();
80      }
81  
82      /**
83       * @return an aproximation the memory used by this queue
84       */
85      public long getLocalMemoryUsedByThisQueue() {
86          return memoryUsedByThisQueue.get();
87      }
88  
89      /**
90       * close and remove this queue from the MemoryBoundedQueueManager
91       */
92      public void close() {
93          try {
94              clear();
95              closed = true;
96              synchronized (outLock) {
97                  outLock.notifyAll();
98              }
99              synchronized (inLock) {
100                 inLock.notifyAll();
101             }
102         }
103         catch (Throwable e) {
104             e.printStackTrace();
105         }
106         finally {
107             manager.remove(this);
108         }
109     }
110 
111     /**
112      * Enqueue a MemoryManageable without checking memory usage limits
113      *
114      * @param packet
115      */
116     public void enqueueNoBlock(MemoryManageable packet) {
117         if (!closed) {
118             internalList.add(packet);
119             incrementMemoryUsed(packet);
120             synchronized (outLock) {
121                 outLock.notify();
122             }
123         }
124     }
125 
126     /**
127      * Enqueue a MemoryManageable to this queue
128      *
129      * @param packet
130      */
131     public void enqueue(MemoryManageable packet) {
132         if (!manager.isMemoryLimitEnforced() || !manager.isFull()) {
133             enqueueNoBlock(packet);
134         }
135         else {
136             synchronized (inLock) {
137                 try {
138                     while (manager.isFull() && !closed) {
139                         log.warn("Queue is full, waiting for it to be dequeued.");
140                         inLock.wait(WAIT_TIMEOUT);
141                     }
142                 }
143                 catch (InterruptedException ie) {
144                 }
145             }
146             enqueueNoBlock(packet);
147         }
148     }
149 
150     /**
151      * Enqueue a packet to the head of the queue with total disregard for memory constraints
152      *
153      * @param packet
154      */
155     public void enqueueFirstNoBlock(MemoryManageable packet) {
156         if (!closed) {
157             internalList.addFirst(packet);
158             incrementMemoryUsed(packet);
159             synchronized (outLock) {
160                 outLock.notify();
161             }
162         }
163     }
164 
165     /**
166      * Enqueue an array of packets to the head of the queue with total disregard for memory constraints
167      *
168      * @param packets
169      */
170     public void enqueueAllFirstNoBlock(List packets) {
171         if (!closed) {
172             internalList.addAllFirst(packets);
173             Iterator iterator = packets.iterator();
174             for (Iterator iter = packets.iterator(); iter.hasNext();) {
175                 MemoryManageable packet = (MemoryManageable) iter.next();
176                 incrementMemoryUsed(packet);
177             }
178             synchronized (outLock) {
179                 outLock.notify();
180             }
181         }
182     }
183 
184     /**
185      * Enqueue a MemoryManageable to the head of the queue
186      *
187      * @param packet
188      * @throws InterruptedException
189      */
190     public void enqueueFirst(MemoryManageable packet) throws InterruptedException {
191         if (!manager.isMemoryLimitEnforced() || !manager.isFull()) {
192             enqueueFirstNoBlock(packet);
193         }
194         else {
195             synchronized (inLock) {
196                 while (manager.isFull() && !closed) {
197                     inLock.wait(WAIT_TIMEOUT);
198                 }
199             }
200             enqueueFirstNoBlock(packet);
201         }
202     }
203 
204     /**
205      * @return the first dequeued MemoryManageable or blocks until one is available
206      * @throws InterruptedException
207      */
208     public MemoryManageable dequeue() throws InterruptedException {
209         MemoryManageable result = null;
210         synchronized (outLock) {
211             while (internalList.isEmpty() && !closed) {
212                 outLock.wait(WAIT_TIMEOUT);
213             }
214             result = dequeueNoWait();
215         }
216         return result;
217     }
218 
219     /**
220      * Dequeues a MemoryManageable from the head of the queue
221      *
222      * @param timeInMillis time to wait for a MemoryManageable to be available
223      * @return the first MemoryManageable or null if none available within <I>timeInMillis </I>
224      * @throws InterruptedException
225      */
226     public MemoryManageable dequeue(long timeInMillis) throws InterruptedException {
227         MemoryManageable result = null;
228         if (timeInMillis == 0) {
229             result = dequeue();
230         }
231         else {
232             synchronized (outLock) {
233                 // if timeInMillis is less than zero assume nowait
234                 long waitTime = timeInMillis;
235                 long start = (timeInMillis <= 0) ? 0 : System.currentTimeMillis();
236                 while (!closed) {
237                     result = dequeueNoWait();
238                     if (result != null || waitTime <= 0) {
239                         break;
240                     }
241                     else {
242                         outLock.wait(waitTime);
243                         waitTime = timeInMillis - (System.currentTimeMillis() - start);
244                     }
245                 }
246             }
247         }
248         return result;
249     }
250 
251     /**
252      * dequeues a MemoryManageable from the head of the queue
253      *
254      * @return the MemoryManageable at the head of the queue or null, if none is available
255      * @throws InterruptedException
256      */
257     public MemoryManageable dequeueNoWait() throws InterruptedException {
258         MemoryManageable packet = null;
259         synchronized (outLock) {
260             while (stopped && !closed) {
261                 outLock.wait(WAIT_TIMEOUT);
262             }
263         }
264         packet = (MemoryManageable) internalList.removeFirst();
265         decrementMemoryUsed(packet);
266         if (packet != null) {
267             synchronized (inLock) {
268                 inLock.notify();
269             }
270         }
271         return packet;
272     }
273 
274     /**
275      * @return true if the queue is enabled for dequeing (default = true)
276      */
277     public boolean isStarted() {
278         synchronized (outLock) {
279             return stopped == false;
280         }
281     }
282 
283     /**
284      * disable dequeueing
285      */
286     public void stop() {
287         synchronized (outLock) {
288             stopped = true;
289         }
290     }
291 
292     /**
293      * enable dequeueing
294      */
295     public void start() {
296         synchronized (outLock) {
297             stopped = false;
298             outLock.notifyAll();
299         }
300         synchronized (inLock) {
301             inLock.notifyAll();
302         }
303     }
304 
305     /**
306      * Remove a packet from the queue
307      *
308      * @param packet
309      * @return true if the packet was found
310      */
311     public boolean remove(MemoryManageable packet) {
312         boolean result = false;
313         if (!internalList.isEmpty()) {
314             result = internalList.remove(packet);
315         }
316         if (result) {
317             decrementMemoryUsed(packet);
318         }
319         synchronized (inLock) {
320             inLock.notify();
321         }
322         return result;
323     }
324 
325     /**
326      * Remove a MemoryManageable by it's id
327      *
328      * @param id
329      * @return
330      */
331     public MemoryManageable remove(Object id) {
332         MemoryManageable result = null;
333         QueueListEntry entry = internalList.getFirstEntry();
334         try {
335             while (entry != null) {
336                 MemoryManageable p = (MemoryManageable) entry.getElement();
337                 if (p.getMemoryId().equals(id)) {
338                     result = p;
339                     remove(p);
340                     break;
341                 }
342                 entry = internalList.getNextEntry(entry);
343             }
344         }
345         catch (JMSException jmsEx) {
346             jmsEx.printStackTrace();
347         }
348         synchronized (inLock) {
349             inLock.notify();
350         }
351         return result;
352     }
353 
354     /**
355      * remove any MemoryManageables in the queue
356      */
357     public void clear() {
358         while (!internalList.isEmpty()) {
359             MemoryManageable packet = (MemoryManageable) internalList.removeFirst();
360             decrementMemoryUsed(packet);
361         }
362         synchronized (inLock) {
363             inLock.notifyAll();
364         }
365     }
366 
367     /**
368      * @return true if the queue is empty
369      */
370     public boolean isEmpty() {
371         return internalList.isEmpty();
372     }
373 
374     /**
375      * retrieve a MemoryManageable at an indexed position in the queue
376      *
377      * @param index
378      * @return
379      */
380     public MemoryManageable get(int index) {
381         return (MemoryManageable) internalList.get(index);
382     }
383 
384     /**
385      * Retrieve a shallow copy of the contents as a list
386      *
387      * @return a list containing the bounded queue contents
388      */
389     public List getContents() {
390         Object[] array = internalList.toArray();
391         List list = new ArrayList();
392         for (int i = 0; i < array.length; i++) {
393             list.add(array[i]);
394         }
395         return list;
396     }
397 
398     protected void incrementMemoryUsed(MemoryManageable packet) {
399         if (packet != null) {
400             int size = OBJECT_OVERHEAD;
401             if (packet != null) {
402                 if (packet.incrementMemoryReferenceCount() == 1) {
403                     size += packet.getMemoryUsage();
404                 }
405             }    
406             memoryUsedByThisQueue.add(size);
407             manager.incrementMemoryUsed(size);
408         }
409     }
410 
411     protected void decrementMemoryUsed(MemoryManageable packet) {
412         if (packet != null) {
413             int size = OBJECT_OVERHEAD;
414             if (packet != null) {
415                 if ( packet.decrementMemoryReferenceCount() == 0) {
416                     size += packet.getMemoryUsage();
417                 }
418             }
419                         
420             memoryUsedByThisQueue.subtract(size);
421             manager.decrementMemoryUsed(size);
422         }
423     }
424     /**
425      * @return Returns the name.
426      */
427     public String getName() {
428         return name;
429     }
430 }