| Method from org.apache.catalina.cluster.tcp.ClusterReceiverBase Detail: |
protected void addReceivedProcessingStats(long startTime) {
long current = System.currentTimeMillis() ;
long time = current - startTime ;
synchronized(this) {
if(time < minReceivedProcessingTime)
minReceivedProcessingTime = time ;
if( time > maxReceivedProcessingTime)
maxReceivedProcessingTime = time ;
receivedProcessingTime += time ;
}
if (log.isDebugEnabled()) {
if ((current - lastChecked) > 5000) {
log.debug("Calc msg send time total=" + receivedTime
+ "ms num request=" + nrOfMsgsReceived
+ " average per msg="
+ (receivedTime / nrOfMsgsReceived) + "ms.");
lastChecked=current ;
}
}
}
Add receiver processing stats times |
protected ClusterMessage deserialize(ClusterData data) throws ClassNotFoundException, IOException {
Object message = null;
if (data != null) {
InputStream instream;
if (isCompress() || data.getCompress() == ClusterMessage.FLAG_ALLOWED ) {
instream = new GZIPInputStream(new ByteArrayInputStream(data.getMessage()));
} else {
instream = new ByteArrayInputStream(data.getMessage());
}
ReplicationStream stream = new ReplicationStream(instream,
getClass().getClassLoader());
message = stream.readObject();
// calc stats really received bytes
totalReceivedBytes += data.getMessage().length;
//totalReceivedBytes += data.length;
nrOfMsgsReceived++;
instream.close();
}
if (message instanceof ClusterMessage)
return (ClusterMessage) message;
else {
if (log.isDebugEnabled())
log.debug("Message " + message.toString() + " from type "
+ message.getClass().getName()
+ " transfered but is not a cluster message");
return null;
}
}
deserialize the receieve cluster message |
public double getAvgReceivedProcessingTime() {
return ((double)receivedProcessingTime) / nrOfMsgsReceived;
}
|
public long getAvgTotalReceivedBytes() {
return ((long)totalReceivedBytes) / nrOfMsgsReceived;
}
|
public InetAddress getBind() {
if (bind == null) {
try {
if ("auto".equals(tcpListenAddress)) {
tcpListenAddress = java.net.InetAddress.getLocalHost()
.getHostAddress();
}
if (log.isDebugEnabled())
log.debug("Starting replication listener on address:"
+ tcpListenAddress);
bind = java.net.InetAddress.getByName(tcpListenAddress);
} catch (IOException ioe) {
log.error("Failed bind replication listener on address:"
+ tcpListenAddress, ioe);
}
}
return bind;
}
|
public CatalinaCluster getCatalinaCluster() {
return (CatalinaCluster) cluster;
}
|
public String getHost() {
return getTcpListenAddress();
}
|
public long getLastChecked() {
return lastChecked;
}
|
public long getMaxReceivedProcessingTime() {
return maxReceivedProcessingTime;
}
|
public long getMinReceivedProcessingTime() {
return minReceivedProcessingTime;
}
|
public long getNrOfMsgsReceived() {
return nrOfMsgsReceived;
}
|
public ObjectName getObjectName() {
return objectName;
}
|
public int getPort() {
return getTcpListenPort();
}
|
public long getReceivedProcessingTime() {
return receivedProcessingTime;
}
|
public long getReceivedTime() {
return receivedTime;
}
|
public String getTcpListenAddress() {
return tcpListenAddress;
}
|
public int getTcpListenPort() {
return tcpListenPort;
}
|
public long getTotalReceivedBytes() {
return totalReceivedBytes;
}
|
public boolean isCompress() {
return compress;
}
|
public boolean isDoListen() {
return doListen;
}
|
public boolean isDoReceivedProcessingStats() {
return doReceivedProcessingStats;
}
|
public boolean isSendAck() {
return sendAck;
}
|
abstract protected void listen() throws Exception
|
public void messageDataReceived(ClusterData data) {
//public void messageDataReceived(byte[] data) {
long timeSent = 0 ;
if (doReceivedProcessingStats) {
timeSent = System.currentTimeMillis();
}
try {
ClusterMessage message = deserialize(data);
cluster.receive(message);
} catch (Exception x) {
log
.error(
"Unable to deserialize session message or unexpected exception from message listener.",
x);
} finally {
if (doReceivedProcessingStats) {
addReceivedProcessingStats(timeSent);
}
}
}
receiver Message from other node.
All SessionMessage forward to ClusterManager and other message dispatch to all accept MessageListener. |
protected void registerReceiverMBean() {
if (cluster != null && cluster instanceof SimpleTcpCluster) {
SimpleTcpCluster scluster = (SimpleTcpCluster) cluster;
ObjectName clusterName = scluster.getObjectName();
try {
MBeanServer mserver = scluster.getMBeanServer();
Container container = cluster.getContainer();
String name = clusterName.getDomain() + ":type=ClusterReceiver";
if (container instanceof StandardHost) {
name += ",host=" + clusterName.getKeyProperty("host");
}
ObjectName receiverName = new ObjectName(name);
if (mserver.isRegistered(receiverName)) {
if (log.isWarnEnabled())
log.warn(sm.getString(
"cluster.mbean.register.allready",
receiverName));
return;
}
setObjectName(receiverName);
mserver.registerMBean(scluster.getManagedBean(this),
getObjectName());
} catch (Exception e) {
log.warn(e);
}
}
}
Register Recevier MBean
:type=ClusterReceiver,host= |
public synchronized void resetStatistics() {
nrOfMsgsReceived = 0;
totalReceivedBytes = 0;
minReceivedProcessingTime = Long.MAX_VALUE ;
maxReceivedProcessingTime = 0 ;
receivedProcessingTime = 0 ;
receivedTime = 0 ;
}
|
public void run() {
try
{
listen();
}
catch ( Exception x )
{
log.error("Unable to start cluster listener.",x);
}
}
|
public void sendAck() throws IOException {
// do nothing
}
|
public void setBind(InetAddress bind) {
this.bind = bind;
}
|
public void setCatalinaCluster(CatalinaCluster cluster) {
this.cluster = cluster;
}
|
public void setCompress(boolean compressMessageData) {
this.compress = compressMessageData;
}
|
public void setDoReceivedProcessingStats(boolean doReceiverProcessingStats) {
this.doReceivedProcessingStats = doReceiverProcessingStats;
}
|
public void setObjectName(ObjectName name) {
objectName = name;
}
|
public void setSendAck(boolean sendAck) {
this.sendAck = sendAck;
}
|
public void setTcpListenAddress(String tcpListenAddress) {
this.tcpListenAddress = tcpListenAddress;
}
|
public void setTcpListenPort(int tcpListenPort) {
this.tcpListenPort = tcpListenPort;
}
|
public void start() {
try {
getBind();
Thread t = new Thread(this, "ClusterReceiver");
t.setDaemon(true);
t.start();
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
}
registerReceiverMBean();
}
|
public void stop() {
stopListening();
unregisterRecevierMBean();
}
|
abstract protected void stopListening()
|
protected void unregisterRecevierMBean() {
if (cluster != null && getObjectName() != null
&& cluster instanceof SimpleTcpCluster) {
SimpleTcpCluster scluster = (SimpleTcpCluster) cluster;
try {
MBeanServer mserver = scluster.getMBeanServer();
mserver.unregisterMBean(getObjectName());
} catch (Exception e) {
log.error(e);
}
}
}
UnRegister Recevier MBean
:type=ClusterReceiver,host= |