| Method from org.jboss.mq.server.MessageCache Detail: |
public MessageReference add(SpyMessage message,
BasicQueue queue,
int stored) throws JMSException {
DurableSubscriptionID id = message.header.durableSubscriberID;
return addInternal(message, queue, stored, id);
}
Adds a message to the cache. |
public MessageReference add(SpyMessage message,
BasicQueue queue,
int stored,
DurableSubscriptionID id) throws JMSException {
return addInternal(message, queue, stored, id);
}
Adds a message to the cache. |
public MessageReference addInternal(SpyMessage message,
BasicQueue queue,
int stored,
DurableSubscriptionID id) throws JMSException {
// Create the message reference
MessageReference mh = new MessageReference();
mh.init(this, messageCounter.increment(), message, queue, id);
mh.setStored(stored);
// Add it to the cache
synchronized (mh)
{
synchronized (lruCache)
{
lruCache.addMostRecent(mh);
totalCacheSize++;
}
}
validateSoftReferenceDepth();
return mh;
}
Adds a message to the cache. |
public long getCacheHits() {
return cacheHits;
}
|
public long getCacheMisses() {
return cacheMisses;
}
|
public ObjectName getCacheStore() {
return cacheStoreName;
}
|
public long getCurrentMemoryUsage() {
return (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE;
}
Gets the CurrentMemoryUsage |
public int getHardRefCacheSize() {
synchronized (lruCache)
{
return lruCache.size();
}
}
Gets the hardRefCacheSize |
public long getHighMemoryMark() {
return highMemoryMark / ONE_MEGABYTE;
}
|
public MessageCache getInstance() {
return this;
}
|
public boolean getMakeSoftReferences() {
return makeSoftReferences;
}
Gets whether to make soft references |
public long getMaxMemoryMark() {
return maxMemoryMark / ONE_MEGABYTE;
}
|
public int getMaximumHard() {
return maximumHard;
}
Gets the maximum number of hard messages |
public int getMinimumHard() {
return minimumHard;
}
Gets the minimum number of hard messages |
public String getName() {
return "MessageCache";
}
|
public int getSoftRefCacheSize() {
return softRefCacheSize;
}
Gets the softRefCacheSize |
public long getSoftenAtLeastEveryMillis() {
return softenAtLeastEveryMillis;
}
Gets the maximum length between softening checks |
public long getSoftenNoMoreOftenThanMillis() {
return softenNoMoreOftenThanMillis;
}
Gets the minimum length between softening checks |
public long getSoftenWaitMillis() {
return softenWaitMillis;
}
Gets the length of time to wait before checking whether we should soften |
public long getSoftenedSize() {
return softenedSize;
}
The getSoftenedSize method |
public int getTotalCacheSize() {
return totalCacheSize;
}
|
SpyMessage loadFromStorage(MessageReference mh) throws JMSException {
return cacheStore.loadFromStorage(mh);
}
|
void messageReferenceUsedEvent(MessageReference mh,
boolean wasHard) throws JMSException {
synchronized (mh)
{
synchronized (lruCache)
{
if (wasHard)
lruCache.makeMostRecent(mh);
else
{
lruCache.addMostRecent(mh);
}
}
}
if (wasHard == false)
checkSoftReferenceDepth = true;
}
This gets called when a MessageReference is de-referenced.
We will pop it to the top of the RLU |
public void remove(MessageReference mr) throws JMSException {
// Remove if not done already
removeInternal(mr, true, true);
}
removes a message from the cache |
public void removeDelayed(MessageReference mr) throws JMSException {
// Remove from the cache
removeInternal(mr, true, false);
}
removes a message from the cache without returning it to the pool
used in two phase removes for joint cache/persistence |
void removeFromStorage(MessageReference mh) throws JMSException {
cacheStore.removeFromStorage(mh);
}
|
protected void removeInternal(MessageReference mr,
boolean clear,
boolean reset) throws JMSException {
synchronized (mr)
{
if (mr.stored != MessageReference.REMOVED)
{
synchronized (lruCache)
{
if (mr.hardReference != null) //If message is not hard, dont do lru stuff
lruCache.remove(mr);
if (clear)
totalCacheSize--;
}
if (clear)
mr.clear();
//Will remove it from storage if stored
}
if (reset)
mr.reset();
//Return to the pool
}
}
removes a message from the cache |
public void run() {
try
{
while (true)
{
// Get the next soft reference that was canned by the GC
Reference r = null;
if (checkSoftReferenceDepth)
r = referenceQueue.poll();
else
r = referenceQueue.remove(softenWaitMillis);
if (r != null)
{
softRefCacheSize--;
// the GC will free a set of messages together, so we poll them
// all before we validate the soft reference depth.
while ((r = referenceQueue.poll()) != null)
{
softRefCacheSize--;
}
if (log.isTraceEnabled())
log.trace("soft reference cache size is now: " + softRefCacheSize);
checkSoftReferenceDepth = true;
}
long now = System.currentTimeMillis();
// Don't try to soften too often
if (softenNoMoreOftenThanMillis > 0 && (now - lastSoften < softenNoMoreOftenThanMillis))
checkSoftReferenceDepth = false;
// Is it a while since we last softened?
else if (softenAtLeastEveryMillis > 0 && (now - lastSoften > softenAtLeastEveryMillis))
checkSoftReferenceDepth = true;
// Should we check for softening
if (checkSoftReferenceDepth)
{
checkSoftReferenceDepth = validateSoftReferenceDepth();
// Did the softening complete?
if (checkSoftReferenceDepth == false)
lastSoften = now;
}
}
}
catch (InterruptedException e)
{
// Signal to exit the thread.
}
catch (Throwable t)
{
log.error("Message Cache Thread Stopped: ", t);
}
log.debug("Thread exiting.");
}
The strategy is that we keep the most recently used messages as
Hard references. Then we make the older ones soft references. Making
something a soft reference stores it to disk so we need to avoid making
soft references if we can avoid it. But once it is made a soft reference does
not mean that it is removed from memory. Depending on how agressive the JVM's
GC is, it may stay around long enough for it to be used by a client doing a read,
saving us read from the file system. If memory gets tight the GC will remove
the soft references. What we want to do is make sure there are at least some
soft references available so that the GC can reclaim memory. |
void saveToStorage(MessageReference mh,
SpyMessage message) throws JMSException {
cacheStore.saveToStorage(mh, message);
}
|
public void setCacheStore(ObjectName cacheStoreName) {
this.cacheStoreName = cacheStoreName;
}
|
public void setHighMemoryMark(long highMemoryMark) {
if (highMemoryMark > 0)
this.highMemoryMark = highMemoryMark * ONE_MEGABYTE;
else
this.highMemoryMark = 0;
}
|
public void setMakeSoftReferences(boolean makeSoftReferences) {
this.makeSoftReferences = makeSoftReferences;
}
Sets whether to make soft references |
public void setMaxMemoryMark(long maxMemoryMark) {
if (maxMemoryMark > 0)
this.maxMemoryMark = maxMemoryMark * ONE_MEGABYTE;
else
this.maxMemoryMark = 0;
}
|
public void setMaximumHard(int maximumHard) {
if (maximumHard < 0)
this.maximumHard = 0;
else
this.maximumHard = maximumHard;
}
Sets the maximum number of hard messages |
public void setMinimumHard(int minimumHard) {
if (minimumHard < 1)
this.minimumHard = 1;
else
this.minimumHard = minimumHard;
}
Sets the minimum number of hard messages |
public void setSoftenAtLeastEveryMillis(long millis) {
if (millis < 0)
softenAtLeastEveryMillis = 0;
else
softenAtLeastEveryMillis = millis;
}
Sets the minimum length between softening checks |
public void setSoftenNoMoreOftenThanMillis(long millis) {
if (millis < 0)
softenNoMoreOftenThanMillis = 0;
else
softenNoMoreOftenThanMillis = millis;
}
Sets the minimum length between softening checks |
public void setSoftenWaitMillis(long millis) {
if (millis < 1000)
softenWaitMillis = 1000;
else
softenWaitMillis = millis;
}
Sets the length of time to wait before checking whether we should soften |
protected void setupCacheStore() throws Exception {
cacheStore = (CacheStore) getServer().getAttribute(cacheStoreName, "Instance");
}
|
void soften(MessageReference mr) throws JMSException {
// Remove from the cache
removeInternal(mr, false, false);
if (makeSoftReferences)
softRefCacheSize++;
}
removes a message from the cache but does not clear it,
used in softening |
protected void startService() throws Exception {
setupCacheStore();
referenceSoftner = new Thread(this, "JBossMQ Cache Reference Softner");
referenceSoftner.setDaemon(true);
referenceSoftner.start();
}
This gets called to start the cache service. Synch. by start |
protected void stopService() {
synchronized (lruCache)
{
referenceSoftner.interrupt();
referenceSoftner = null;
}
cacheStore = null;
}
This gets called to stop the cache service. |
boolean validateSoftReferenceDepth() throws JMSException {
boolean trace = log.isTraceEnabled();
// Loop until softening is not required or we find a message we can soften
while (getState() == ServiceMBeanSupport.STARTED)
{
MessageReference messageToSoften = null;
synchronized (lruCache)
{
// howmany to change over to soft refs
int softenCount = 0;
int hardCount = getHardRefCacheSize();
int softCount = getSoftRefCacheSize();
// Only soften down to a minimum
if (hardCount < = minimumHard)
return false;
long currentMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
if (currentMem > highMemoryMark)
{
// we need to get more aggresive... how much?? lets get
// a mesurment from 0 to 1
float severity = ((float) (currentMem - highMemoryMark)) / (maxMemoryMark - highMemoryMark);
severity = Math.min(severity, 1.0F);
if (trace)
log.trace("Memory usage serverity=" + severity);
int totalMessageInMem = hardCount + softCount;
int howManyShouldBeSoft = (int) ((totalMessageInMem) * severity);
softenCount = howManyShouldBeSoft - softCount;
}
// Are there too many messages in memory?
if (maximumHard > 0)
{
int removeCount = hardCount - maximumHard;
if (removeCount > 0 && removeCount > softenCount)
softenCount = removeCount;
}
// We can only do so much, somebody else is using all the memory?
if (softenCount > hardCount)
{
if (trace)
log.trace("Soften count " + softenCount + " greater than hard references " + hardCount);
softenCount = hardCount;
}
// Ignore soften counts of 1 since this will happen too often even
// if the serverity is low since it will round up.
if (softenCount > 1 || (maximumHard > 0 && hardCount > maximumHard))
{
if (trace)
log.trace("Need to soften " + softenCount + " messages");
Node node = lruCache.getLeastRecent();
messageToSoften = (MessageReference) node.data;
}
}
// No softening required
if (messageToSoften == null)
return false;
synchronized (messageToSoften)
{
// Soften unless it was removed
if (messageToSoften.messageCache != null && messageToSoften.stored != MessageReference.REMOVED)
{
messageToSoften.makeSoft();
if (messageToSoften.stored == MessageReference.STORED)
{
softenedSize++;
return true;
}
else if (messageToSoften.isPersistent())
{
// Avoid going into a cpu loop if there are persistent
// messages just about to be persisted
return false;
}
}
else if (trace)
log.trace("not softening removed message " + messageToSoften);
}
}
return false;
}
This method is in charge of determining if it time to convert some
hard references over to soft references. |