Source code: org/activemq/io/util/MemoryBoundedMessageCache.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.io.util;
19
20 import java.util.HashMap;
21 import java.util.Iterator;
22
23 import org.activemq.message.ActiveMQMessage;
24 import org.activemq.service.QueueListEntry;
25 import org.activemq.service.impl.DefaultQueueList;
26 import org.activemq.store.cache.MessageCache;
27
28 /**
29 * A simple cache that stores messages in memory. Cache entries are evicted
30 * when the memoryManager starts to run short on memory (A LRU cache is used).
31 *
32 * @version $Revision: 1.1.1.1 $
33 */
34 public class MemoryBoundedMessageCache implements MessageCache, MemoryBoundedObject {
35
36 private static final int OBJECT_OVERHEAD = 50;
37
38 private final MemoryBoundedObjectManager memoryManager;
39
40 /** msgId -> LRUNode */
41 private final HashMap messages = new HashMap();
42 /** Ordered list of messageIds recently used at the front */
43 private final DefaultQueueList lruList = new DefaultQueueList();
44
45 private int memoryUsedByThisCache;
46 private float growthLimit = 0.75f;
47 private boolean closed;
48
49 /** Used associate the a Message to it's QueueListEntry in the lruList */
50 private static class CacheNode {
51 ActiveMQMessage message;
52 QueueListEntry entry;
53 }
54
55 public MemoryBoundedMessageCache(MemoryBoundedObjectManager memoryManager) {
56 this.memoryManager = memoryManager;
57 this.memoryManager.add(this);
58 }
59
60 /**
61 * Gets a message that was previously <code>put</code> into this object.
62 *
63 * @param msgid
64 * @return null if the message was not previously put or if the message has expired out of the cache.
65 */
66 synchronized public ActiveMQMessage get(String msgid) {
67 CacheNode rc = (CacheNode) messages.get(msgid);
68 if( rc != null ) {
69 // Move to front (the recently used part of the list).
70 lruList.remove(rc.entry);
71 rc.entry = lruList.addFirst(msgid);
72 return rc.message;
73 }
74 return null;
75 }
76
77 /**
78 * Puts a message into the cache.
79 *
80 * @param messageID
81 * @param message
82 */
83 synchronized public void put(String messageID, ActiveMQMessage message) {
84
85 // Drop old messages until there is space.
86 while( isFull() && !messages.isEmpty() ) {
87 removeOldest();
88 }
89
90 if( !isFull() ) {
91 incrementMemoryUsed(message);
92 CacheNode newNode = new CacheNode();
93 newNode.message = message;
94 newNode.entry = lruList.addFirst(messageID);
95 CacheNode oldNode = (CacheNode) messages.put(messageID, newNode);
96 if( oldNode !=null ) {
97 lruList.remove(oldNode);
98 decrementMemoryUsed(oldNode.message);
99 }
100 }
101 }
102
103 private void removeOldest() {
104 String messageID = (String) lruList.removeLast();
105 CacheNode node = (CacheNode) messages.remove(messageID);
106 decrementMemoryUsed(node.message);
107 }
108
109 private boolean isFull() {
110 return memoryManager.getPercentFull() > growthLimit;
111 }
112
113 /**
114 * Remvoes a message from the cache.
115 *
116 * @param messageID
117 */
118 synchronized public void remove(String messageID) {
119 CacheNode node = (CacheNode) messages.remove(messageID);
120 if( node !=null ) {
121 lruList.remove(node.entry);
122 decrementMemoryUsed(node.message);
123 }
124 }
125
126 private void incrementMemoryUsed(ActiveMQMessage packet) {
127 if (packet != null) {
128 int size = OBJECT_OVERHEAD;
129 if (packet != null) {
130 if (packet.incrementMemoryReferenceCount() == 1) {
131 size += packet.getMemoryUsage();
132 }
133 }
134 synchronized( this ) {
135 memoryUsedByThisCache += size;
136 }
137 memoryManager.incrementMemoryUsed(size);
138 }
139 }
140
141 private void decrementMemoryUsed(ActiveMQMessage packet) {
142 if (packet != null) {
143 int size = OBJECT_OVERHEAD;
144 if (packet != null) {
145 if (packet.decrementMemoryReferenceCount() == 0) {
146 size += packet.getMemoryUsage();
147 }
148 }
149
150 synchronized( this ) {
151 memoryUsedByThisCache -= size;
152 }
153 memoryManager.decrementMemoryUsed(size);
154 }
155 }
156
157 /**
158 * @return returns the percentage of memory usage at which that cache will stop to grow.
159 */
160 public float getGrowthLimit() {
161 return growthLimit;
162 }
163
164 /**
165 * @param growTillFence the percentage of memory usage at which that cache will stop to grow.
166 */
167 public void setGrowthLimit(float growTillFence) {
168 this.growthLimit = growTillFence;
169 }
170
171 synchronized public void close() {
172 if( closed )
173 return;
174 closed=true;
175
176 for (Iterator iter = messages.values().iterator(); iter.hasNext();) {
177 CacheNode node = (CacheNode) iter.next();
178 decrementMemoryUsed(node.message);
179 }
180 messages.clear();
181 lruList.clear();
182
183 memoryManager.remove(this);
184 }
185 }