A fast queue that remover thread lock the adder thread.
Limit the queue
length when you have strange producer thread problemes.
FIXME add i18n support to log messages
| Method from org.apache.catalina.cluster.util.FastQueue Detail: |
public boolean add(String key,
Object data) {
boolean ok = true;
long time = 0;
if (!enabled) {
if (log.isInfoEnabled())
log.info("FastQueue.add: queue disabled, add aborted");
return false;
}
if (timeWait) {
time = System.currentTimeMillis();
}
lock.lockAdd();
try {
if (timeWait) {
addWait += (System.currentTimeMillis() - time);
}
if (log.isTraceEnabled()) {
log.trace("FastQueue.add: starting with size " + size);
}
if (checkLock) {
if (inAdd)
log.warn("FastQueue.add: Detected other add");
inAdd = true;
if (inMutex)
log.warn("FastQueue.add: Detected other mutex in add");
inMutex = true;
}
if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
ok = false;
if (log.isTraceEnabled()) {
log.trace("FastQueue.add: Could not add, since queue is full ("
+ size + " >=" + maxQueueLength + ")");
}
} else {
LinkObject element = new LinkObject(key, data);
if (size == 0) {
first = last = element;
size = 1;
} else {
if (last == null) {
ok = false;
log
.error("FastQueue.add: Could not add, since last is null although size is "
+ size + " ( >0)");
} else {
last.append(element);
last = element;
size++;
}
}
}
if (doStats) {
if (ok) {
if (addCounter % sampleInterval == 0) {
maxSizeSample = 0;
avgSizeSample = 0;
}
addCounter++;
if (size > maxSize) {
maxSize = size;
}
if (size > maxSizeSample) {
maxSizeSample = size;
}
avgSize += size;
avgSizeSample += size;
} else {
addErrorCounter++;
}
}
if (first == null) {
log.error("FastQueue.add: first is null, size is " + size
+ " at end of add");
}
if (last == null) {
log.error("FastQueue.add: last is null, size is " + size
+ " at end of add");
}
if (checkLock) {
if (!inMutex)
log.warn("FastQueue.add: Cancelled by other mutex in add");
inMutex = false;
if (!inAdd)
log.warn("FastQueue.add: Cancelled by other add");
inAdd = false;
}
if (log.isTraceEnabled()) {
log.trace("FastQueue.add: add ending with size " + size);
}
if (timeWait) {
time = System.currentTimeMillis();
}
} finally {
lock.unlockAdd(true);
}
if (timeWait) {
addWait += (System.currentTimeMillis() - time);
}
return ok;
}
Add new data to the queue |
public long getAddCounter() {
return addCounter;
}
|
public long getAddErrorCounter() {
return addErrorCounter;
}
|
public long getAddWait() {
return addWait;
}
|
public long getAddWaitTimeout() {
addWaitTimeout = lock.getAddWaitTimeout();
return addWaitTimeout;
}
get current add wait timeout |
public long getAvgSize() {
if (addCounter > 0) {
return avgSize / addCounter;
} else {
return 0;
}
}
|
public long getAvgSizeSample() {
long sample = addCounter % sampleInterval;
if (sample > 0) {
return avgSizeSample / sample;
} else if (addCounter > 0) {
return avgSizeSample / sampleInterval;
} else {
return 0;
}
}
|
public int getMaxQueueLength() {
return maxQueueLength;
}
|
public int getMaxSize() {
return maxSize;
}
|
public int getMaxSizeSample() {
return maxSizeSample;
}
|
public long getRemoveCounter() {
return removeCounter;
}
|
public long getRemoveErrorCounter() {
return removeErrorCounter;
}
|
public long getRemoveWait() {
return removeWait;
}
|
public long getRemoveWaitTimeout() {
removeWaitTimeout = lock.getRemoveWaitTimeout();
return removeWaitTimeout;
}
get current remove wait timeout |
public long getSample() {
return addCounter % sampleInterval;
}
|
public int getSampleInterval() {
return sampleInterval;
}
|
public int getSize() {
int sz;
sz = size;
return sz;
}
|
public boolean isCheckLock() {
return checkLock;
}
|
public boolean isDoStats() {
return doStats;
}
|
public boolean isEnabled() {
return enabled;
}
|
public boolean isTimeWait() {
return timeWait;
}
|
public LinkObject remove() {
LinkObject element;
boolean gotLock;
long time = 0;
if (!enabled) {
if (log.isInfoEnabled())
log.info("FastQueue.remove: queue disabled, remove aborted");
return null;
}
if (timeWait) {
time = System.currentTimeMillis();
}
gotLock = lock.lockRemove();
try {
if (!gotLock) {
if (enabled) {
if (timeWait) {
removeWait += (System.currentTimeMillis() - time);
}
if (doStats) {
removeErrorCounter++;
}
if (log.isInfoEnabled())
log.info("FastQueue.remove: Remove aborted although queue enabled");
} else {
if (log.isInfoEnabled())
log.info("FastQueue.remove: queue disabled, remove aborted");
}
return null;
}
if (timeWait) {
removeWait += (System.currentTimeMillis() - time);
}
if (log.isTraceEnabled()) {
log.trace("FastQueue.remove: remove starting with size " + size);
}
if (checkLock) {
if (inRemove)
log.warn("FastQueue.remove: Detected other remove");
inRemove = true;
if (inMutex)
log.warn("FastQueue.remove: Detected other mutex in remove");
inMutex = true;
}
element = first;
if (doStats) {
if (element != null) {
removeCounter++;
} else {
removeErrorCounter++;
log
.error("FastQueue.remove: Could not remove, since first is null although size is "
+ size + " ( >0)");
}
}
first = last = null;
size = 0;
if (checkLock) {
if (!inMutex)
log.warn("FastQueue.remove: Cancelled by other mutex in remove");
inMutex = false;
if (!inRemove)
log.warn("FastQueue.remove: Cancelled by other remove");
inRemove = false;
}
if (log.isTraceEnabled()) {
log.trace("FastQueue.remove: remove ending with size " + size);
}
if (timeWait) {
time = System.currentTimeMillis();
}
} finally {
lock.unlockRemove();
}
if (timeWait) {
removeWait += (System.currentTimeMillis() - time);
}
return element;
}
remove the complete queued object list |
public void resetStatistics() {
addCounter = 0;
addErrorCounter = 0;
removeCounter = 0;
removeErrorCounter = 0;
avgSize = 0;
maxSize = 0;
addWait = 0;
removeWait = 0;
}
|
public void setAddCounter(long counter) {
addCounter = counter;
}
|
public void setAddErrorCounter(long counter) {
addErrorCounter = counter;
}
|
public void setAddWait(long wait) {
addWait = wait;
}
|
public void setAddWaitTimeout(long timeout) {
addWaitTimeout = timeout;
lock.setAddWaitTimeout(addWaitTimeout);
}
Set add wait timeout (default 10000 msec) |
public void setCheckLock(boolean checkLock) {
this.checkLock = checkLock;
}
|
public void setDoStats(boolean doStats) {
this.doStats = doStats;
}
|
public void setEnabled(boolean enable) {
enabled = enable;
if (!enabled) {
lock.abortRemove();
}
}
|
public void setMaxQueueLength(int length) {
maxQueueLength = length;
}
|
public void setMaxSize(int size) {
maxSize = size;
}
|
public void setMaxSizeSample(int size) {
maxSizeSample = size;
}
|
public void setRemoveCounter(long counter) {
removeCounter = counter;
}
|
public void setRemoveErrorCounter(long counter) {
removeErrorCounter = counter;
}
|
public void setRemoveWait(long wait) {
removeWait = wait;
}
|
public void setRemoveWaitTimeout(long timeout) {
removeWaitTimeout = timeout;
lock.setRemoveWaitTimeout(removeWaitTimeout);
}
set remove wait timeout ( default 30000 msec) |
public void setSampleInterval(int interval) {
sampleInterval = interval;
}
|
public void setTimeWait(boolean timeWait) {
this.timeWait = timeWait;
}
|
public void start() {
setEnabled(true);
}
|
public void stop() {
setEnabled(false);
}
|
public void unlockAdd() {
lock.unlockAdd(size > 0 ? true : false);
}
unlock queue for next add |
public void unlockRemove() {
lock.unlockRemove();
}
unlock queue for next remove |