Source code: org/activemq/io/util/MemoryBoundedQueueManager.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 * Copyright 2004 Hiram Chirino
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19
20 package org.activemq.io.util;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24
25 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
26 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
27
28 /**
29 * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active
30 * MemoryBoundedQueues created by this instance stays within the memory usage bounds set.
31 *
32 * @version $Revision: 1.1.1.1 $
33 */
34 public class MemoryBoundedQueueManager {
35
36 private final ConcurrentHashMap activeQueues = new ConcurrentHashMap();
37 private final MemoryBoundedObjectManager memoryManager;
38 private final SynchronizedBoolean memoryLimitEnforced = new SynchronizedBoolean(true);
39
40 /**
41 * @param name
42 * @param maxSize
43 */
44 public MemoryBoundedQueueManager(MemoryBoundedObjectManager memoryManager) {
45 this.memoryManager = memoryManager;
46 }
47
48 /**
49 * retrieve a named MemoryBoundedQueue or creates one if not found
50 *
51 * @param name
52 * @return an named instance of a MemoryBoundedQueue
53 */
54 public MemoryBoundedQueue getMemoryBoundedQueue(String name) {
55 MemoryBoundedQueue result = (MemoryBoundedQueue) activeQueues.get(name);
56 if (result == null) {
57 if (memoryManager.isSupportJMSPriority())
58 result = new MemoryBoundedPrioritizedQueue(this, name);
59 else
60 result = new MemoryBoundedQueue(this, name);
61 activeQueues.put(name, result);
62 }
63 return result;
64 }
65
66 /**
67 * close this queue manager and all associated MemoryBoundedQueues
68 */
69 public void close() {
70 memoryManager.close();
71 }
72
73 /**
74 * @return Returns the memoryManager.
75 */
76 public MemoryBoundedObjectManager getMemoryManager() {
77 return memoryManager;
78 }
79
80 public int getCurrentCapacity() {
81 return memoryManager.getCurrentCapacity();
82 }
83
84 public void add(MemoryBoundedQueue queue) {
85 memoryManager.add(queue);
86 }
87
88 public void remove(MemoryBoundedQueue queue) {
89 memoryManager.remove(queue);
90 activeQueues.remove(queue.getName());
91 }
92
93 public boolean isFull() {
94 return memoryManager.isFull();
95 }
96
97 public void incrementMemoryUsed(int size) {
98 memoryManager.incrementMemoryUsed(size);
99 }
100
101 public void decrementMemoryUsed(int size) {
102 memoryManager.decrementMemoryUsed(size);
103 }
104
105 public List getMemoryBoundedQueues(){
106 return new ArrayList(activeQueues.values());
107 }
108
109 public String dumpContents(){
110 String result = "Memory = " + memoryManager.getTotalMemoryUsedSize() + " , capacity = " + memoryManager.getCurrentCapacity() + "\n";
111 for (Iterator i = activeQueues.values().iterator(); i.hasNext(); ){
112 MemoryBoundedQueue q = (MemoryBoundedQueue)i.next();
113 result += "\t" + q.getName() + " enqueued = " + q.getContents().size() + " memory = " + q.getLocalMemoryUsedByThisQueue() + "\n";
114 }
115 result += "\n\n";
116 return result;
117 }
118
119 public void setMemoryLimitEnforced(boolean enable) {
120 memoryLimitEnforced.set(enable);
121 }
122
123 public boolean isMemoryLimitEnforced() {
124 return memoryLimitEnforced.get();
125 }
126 }