Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/io/util/MemoryBoundedMessageCache.java


1   /** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.io.util;
19  
20  import java.util.HashMap;
21  import java.util.Iterator;
22  
23  import org.activemq.message.ActiveMQMessage;
24  import org.activemq.service.QueueListEntry;
25  import org.activemq.service.impl.DefaultQueueList;
26  import org.activemq.store.cache.MessageCache;
27  
28  /**
29   * A simple cache that stores messages in memory.  Cache entries are evicted 
30   * when the memoryManager starts to run short on memory (A LRU cache is used).
31   *
32   * @version $Revision: 1.1.1.1 $
33   */
34  public class MemoryBoundedMessageCache implements MessageCache, MemoryBoundedObject {
35  
36      private static final int OBJECT_OVERHEAD = 50;
37  
38      private final MemoryBoundedObjectManager memoryManager;
39      
40      /** msgId -> LRUNode */
41      private final HashMap messages = new HashMap();
42      /** Ordered list of messageIds recently used at the front */
43      private final DefaultQueueList lruList = new DefaultQueueList();
44  
45      private int memoryUsedByThisCache;
46      private float growthLimit = 0.75f;
47      private boolean closed;
48      
49      /** Used associate the a Message to it's QueueListEntry in the lruList */
50      private static class CacheNode {        
51          ActiveMQMessage message;
52          QueueListEntry entry;
53      }
54    
55    public MemoryBoundedMessageCache(MemoryBoundedObjectManager memoryManager) {
56          this.memoryManager = memoryManager;
57          this.memoryManager.add(this);
58    }
59    
60    /**
61     * Gets a message that was previously <code>put</code> into this object.   
62     * 
63     * @param msgid
64     * @return null if the message was not previously put or if the message has expired out of the cache.
65     */
66    synchronized public ActiveMQMessage get(String msgid) {
67          CacheNode rc  = (CacheNode) messages.get(msgid);
68          if( rc != null ) {
69              // Move to front (the recently used part of the list).
70              lruList.remove(rc.entry);
71              rc.entry = lruList.addFirst(msgid);
72              return rc.message;
73          }
74          return null;
75    }
76  
77    /**
78     * Puts a message into the cache.
79     * 
80     * @param messageID
81     * @param message
82     */
83    synchronized public void put(String messageID, ActiveMQMessage message) {
84          
85          // Drop old messages until there is space.
86          while( isFull() && !messages.isEmpty() ) {
87              removeOldest();
88          }
89          
90          if( !isFull() ) {
91              incrementMemoryUsed(message);
92              CacheNode newNode = new CacheNode();
93              newNode.message = message;
94              newNode.entry = lruList.addFirst(messageID);            
95              CacheNode oldNode = (CacheNode) messages.put(messageID, newNode);
96              if( oldNode !=null ) {
97                  lruList.remove(oldNode);
98                  decrementMemoryUsed(oldNode.message);
99              }        
100         }
101   }
102 
103     private void removeOldest() {
104         String messageID = (String) lruList.removeLast();
105         CacheNode node = (CacheNode) messages.remove(messageID);
106         decrementMemoryUsed(node.message);
107     }
108 
109     private boolean isFull() {
110         return memoryManager.getPercentFull() > growthLimit;
111     }
112 
113     /**
114    * Remvoes a message from the cache.
115    * 
116    * @param messageID
117    */
118   synchronized public void remove(String messageID) {
119         CacheNode node = (CacheNode) messages.remove(messageID);
120         if( node !=null ) {
121             lruList.remove(node.entry);
122             decrementMemoryUsed(node.message);
123         }        
124   }
125     
126     private void incrementMemoryUsed(ActiveMQMessage packet) {
127         if (packet != null) {
128             int size = OBJECT_OVERHEAD;
129             if (packet != null) {
130                 if (packet.incrementMemoryReferenceCount() == 1) {
131                     size += packet.getMemoryUsage();
132                 }
133             }
134             synchronized( this ) {
135                 memoryUsedByThisCache += size;
136             }
137             memoryManager.incrementMemoryUsed(size);
138         }
139     }
140 
141     private void decrementMemoryUsed(ActiveMQMessage packet) {
142         if (packet != null) {
143             int size = OBJECT_OVERHEAD;
144             if (packet != null) {
145                 if (packet.decrementMemoryReferenceCount() == 0) {
146                     size += packet.getMemoryUsage();
147                 }
148             }
149                         
150             synchronized( this ) {
151                 memoryUsedByThisCache -= size;
152             }
153             memoryManager.decrementMemoryUsed(size);
154         }
155     }
156 
157     /**
158      * @return returns the percentage of memory usage at which that cache will stop to grow.
159      */
160     public float getGrowthLimit() {
161         return growthLimit;
162     }
163     
164     /**
165      * @param growTillFence the percentage of memory usage at which that cache will stop to grow.
166      */
167     public void setGrowthLimit(float growTillFence) {
168         this.growthLimit = growTillFence;
169     }
170 
171     synchronized public void close() {
172         if( closed ) 
173             return;
174         closed=true;
175         
176         for (Iterator iter = messages.values().iterator(); iter.hasNext();) {
177             CacheNode node = (CacheNode) iter.next();
178             decrementMemoryUsed(node.message);
179         }
180         messages.clear();
181         lruList.clear();
182         
183         memoryManager.remove(this);
184     }
185 }