Send cluster messages from a Message queue with only one socket. Ack and keep
Alive Handling is supported. Fast Queue can limit queue size and consume all messages at queue at one block.
FIXME: refactor code duplications with AsyncSocketSender => configurable or extract super class
| Method from org.apache.catalina.cluster.tcp.FastAsyncSocketSender Detail: |
protected void checkThread() {
if (queueThread == null) {
if (log.isInfoEnabled())
log.info(sm.getString("AsyncSocketSender.create.thread",
getAddress(), new Integer(getPort())));
queueThread = new FastQueueThread(this, queue);
queueThread.setDaemon(true);
queueThread.setPriority(getThreadPriority());
queueThread.start();
}
}
Start Queue thread as daemon |
public void connect() throws IOException {
super.connect();
checkThread();
if(!queue.isEnabled())
queue.start() ;
}
Connect to socket and start background thread to push queued messages |
public void disconnect() {
stopThread();
// delete "remove" lock at queue
queue.stop() ;
// enable that sendMessage can add new messages
queue.start() ;
// close socket
super.disconnect();
}
Disconnect socket ad stop queue thread |
public long getInQueueCounter() {
return inQueueCounter;
}
|
public String getInfo() {
return (info);
}
Return descriptive information about this implementation and the
corresponding version number, in the format
<description>/<version>. |
public int getMaxQueueLength() {
return queue.getMaxQueueLength();
}
|
public long getOutQueueCounter() {
return outQueueCounter;
}
|
public long getQueueAddWaitTime() {
return queue.getAddWait();
}
|
public long getQueueAddWaitTimeout() {
return queue.getAddWaitTimeout();
}
get current add wait timeout |
public long getQueueRemoveWaitTime() {
return queue.getRemoveWait();
}
|
public long getQueueRemoveWaitTimeout() {
return queue.getRemoveWaitTimeout();
}
get current remove wait timeout |
public int getQueueSize() {
return queue.getSize();
}
|
public long getQueuedNrOfBytes() {
if(queueThread != null)
return queueThread.getQueuedNrOfBytes();
return 0l ;
}
|
public int getThreadPriority() {
return threadPriority;
}
Get the current threadPriority |
public boolean isQueueCheckLock() {
return queue.isCheckLock();
}
|
public boolean isQueueDoStats() {
return queue.isDoStats();
}
|
public boolean isQueueTimeWait() {
return queue.isTimeWait();
}
|
public synchronized void resetStatistics() {
super.resetStatistics();
inQueueCounter = queue.getSize();
outQueueCounter = 0;
queue.resetStatistics();
}
|
public void sendMessage(ClusterData data) throws IOException {
queue.add(data.getUniqueId(), data);
synchronized (this) {
inQueueCounter++;
if(queueThread != null)
queueThread.incQueuedNrOfBytes(data.getMessage().length);
}
if (log.isTraceEnabled())
log.trace(sm.getString("AsyncSocketSender.queue.message",
getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
data.getMessage().length)));
}
Send message to queue for later sending. |
public void setMaxQueueLength(int length) {
queue.setMaxQueueLength(length);
}
|
public void setQueueAddWaitTimeout(long timeout) {
queue.setAddWaitTimeout(timeout);
}
Set add wait timeout (default 10000 msec) |
public void setQueueCheckLock(boolean checkLock) {
queue.setCheckLock(checkLock);
}
|
public void setQueueDoStats(boolean doStats) {
queue.setDoStats(doStats);
}
|
public void setQueueTimeWait(boolean timeWait) {
queue.setTimeWait(timeWait);
}
|
public void setRemoveWaitTimeout(long timeout) {
queue.setRemoveWaitTimeout(timeout);
}
set remove wait timeout ( default 30000 msec) |
public void setThreadPriority(int threadPriority) {
if (log.isDebugEnabled())
log.debug(sm.getString("FastAsyncSocketSender.setThreadPriority",
getAddress().getHostAddress(), new Integer(getPort()),
new Integer(threadPriority)));
if (threadPriority < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException(sm.getString(
"FastAsyncSocketSender.min.exception", getAddress()
.getHostAddress(), new Integer(getPort()),
new Integer(threadPriority)));
} else if (threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(sm.getString(
"FastAsyncSocketSender.max.exception", getAddress()
.getHostAddress(), new Integer(getPort()),
new Integer(threadPriority)));
}
this.threadPriority = threadPriority;
if (queueThread != null)
queueThread.setPriority(threadPriority);
}
change active the queue Thread priority |
protected void stopThread() {
if (queueThread != null) {
queueThread.stopRunning();
queueThread = null;
}
}
|
public String toString() {
StringBuffer buf = new StringBuffer("FastAsyncSocketSender[");
buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]");
return buf.toString();
}
Name of this SockerSender |