| Method from org.apache.catalina.cluster.tcp.ReplicationTransmitter Detail: |
public synchronized void add(Member member) {
try {
String key = getKey(member);
if (!map.containsKey(key)) {
IDataSender sender = IDataSenderFactory.getIDataSender(
replicationMode, member);
transferSenderProperty(sender);
map.put(key, sender);
registerSenderMBean(member, sender);
}
} catch (java.io.IOException x) {
log.error("Unable to create and add a IDataSender object.", x);
}
}
add new cluster member and create sender ( s. replicationMode) transfer
current properties to sender |
protected void addProcessingStats(long startTime) {
long time = System.currentTimeMillis() - startTime ;
if(time < minProcessingTime)
minProcessingTime = time ;
if( time > maxProcessingTime)
maxProcessingTime = time ;
processingTime += time ;
}
Add processing stats times |
protected synchronized void addStats(int length) {
nrOfRequests++;
totalBytes += length;
if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
log.debug("Nr of bytes sent=" + totalBytes + " over "
+ nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
+ " bytes/request; failures=" + failureCounter);
}
}
calc number of requests and transfered bytes. Log stats all 100 requets |
public void backgroundProcess() {
count = (count + 1) % processSenderFrequency;
if (count == 0) {
checkKeepAlive();
}
}
Call transmitter to check for sender socket status |
public void checkKeepAlive() {
if (map.size() > 0) {
java.util.Iterator iter = map.entrySet().iterator();
while (iter.hasNext()) {
IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
.next()).getValue();
if (sender != null)
sender.checkKeepAlive();
}
}
}
Check all DataSender Socket to close socket at keepAlive mode |
public long getAckTimeout() {
return ackTimeout;
}
|
public double getAvgProcessingTime() {
return ((double)processingTime) / nrOfRequests;
}
|
public long getFailureCounter() {
return failureCounter;
}
|
public String getInfo() {
// ------------------------------------------------------------- Properties
return (info);
}
Return descriptive information about this implementation and the
corresponding version number, in the format
<description>/<version>. |
public boolean getIsSenderSynchronized() {
return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
|| IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
} Deprecated! since - version 5.5.7
|
protected String getKey(Member member) {
return member.getHost() + ":" + member.getPort();
}
set unique key to find sender |
public long getMaxProcessingTime() {
return maxProcessingTime;
}
|
public long getMinProcessingTime() {
return minProcessingTime;
}
|
public long getNrOfRequests() {
return nrOfRequests;
}
|
public ObjectName getObjectName() {
return objectName;
}
|
public int getProcessSenderFrequency() {
return processSenderFrequency;
}
|
public long getProcessingTime() {
return processingTime;
}
|
public Object getProperty(String key) {
if (log.isTraceEnabled())
log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
return properties.get(key);
}
|
public Iterator getPropertyNames() {
return properties.keySet().iterator();
}
|
public String getReplicationMode() {
return replicationMode;
}
|
protected ObjectName getSenderObjectName(IDataSender sender) {
ObjectName senderName = null;
try {
ObjectName clusterName = cluster.getObjectName();
Container container = cluster.getContainer();
String name = clusterName.getDomain() + ":type=IDataSender";
if (container instanceof StandardHost) {
name += ",host=" + clusterName.getKeyProperty("host");
}
senderName = new ObjectName(name + ",senderAddress="
+ sender.getAddress().getHostAddress() + ",senderPort="
+ sender.getPort());
} catch (Exception e) {
log.warn(e);
}
return senderName;
}
build sender ObjectName (
engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" ) |
public ObjectName[] getSenderObjectNames() {
java.util.Iterator iter = map.entrySet().iterator();
ObjectName array[] = new ObjectName[map.size()];
int i = 0;
while (iter.hasNext()) {
IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
.next()).getValue();
if (sender != null)
array[i] = getSenderObjectName(sender);
i++;
}
return array;
}
|
public IDataSender[] getSenders() {
java.util.Iterator iter = map.entrySet().iterator();
IDataSender[] array = new IDataSender[map.size()];
int i = 0;
while (iter.hasNext()) {
IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
.next()).getValue();
if (sender != null)
array[i] = sender;
i++;
}
return array;
}
|
public long getTotalBytes() {
return totalBytes;
}
|
public boolean isAutoConnect() {
return autoConnect;
}
|
public boolean isCompress() {
return compress;
}
|
public boolean isDoTransmitterProcessingStats() {
return doTransmitterProcessingStats;
}
|
public boolean isWaitForAck() {
return waitForAck;
}
|
protected void registerSenderMBean(Member member,
IDataSender sender) {
if (member != null && cluster != null) {
try {
MBeanServer mserver = cluster.getMBeanServer();
ObjectName senderName = getSenderObjectName(sender);
if (mserver.isRegistered(senderName)) {
if (log.isWarnEnabled())
log.warn(sm.getString(
"cluster.mbean.register.allready", senderName));
return;
}
mserver.registerMBean(cluster.getManagedBean(sender),
senderName);
} catch (Exception e) {
log.warn(e);
}
}
}
register MBean and check it exist (big problem!) |
public synchronized void remove(Member member) {
String key = getKey(member);
IDataSender toberemoved = (IDataSender) map.get(key);
if (toberemoved == null)
return;
unregisterSenderMBean(toberemoved);
toberemoved.disconnect();
map.remove(key);
}
remove sender from transmitter. ( deregister mbean and disconnect sender ) |
public void removeProperty(String key) {
properties.remove(key);
}
remove a configured property. |
public synchronized void resetStatistics() {
nrOfRequests = 0;
totalBytes = 0;
failureCounter = 0;
processingTime = 0;
minProcessingTime = Long.MAX_VALUE;
maxProcessingTime = 0;
}
|
public void sendMessage(ClusterMessage message) throws IOException {
long time = 0;
if (doTransmitterProcessingStats) {
time = System.currentTimeMillis();
}
try {
ClusterData data = serialize(message);
IDataSender[] senders = getSenders();
for (int i = 0; i < senders.length; i++) {
IDataSender sender = senders[i];
try {
sendMessageData(data, sender);
} catch (Exception x) {
if (!sender.getSuspect()) {
log.warn("Unable to send replicated message to "
+ sender + ", is server down?", x);
sender.setSuspect(true);
}
}
}
} finally {
if (doTransmitterProcessingStats) {
addProcessingStats(time);
}
}
}
send message to all senders (broadcast) |
public void sendMessage(ClusterMessage message,
Member member) throws IOException {
long time = 0 ;
if(doTransmitterProcessingStats) {
time = System.currentTimeMillis();
}
try {
ClusterData data = serialize(message);
String key = getKey(member);
IDataSender sender = (IDataSender) map.get(key);
sendMessageData(data, sender);
} finally {
if (doTransmitterProcessingStats) {
addProcessingStats(time);
}
}
}
Send data to one member
FIXME set filtering messages |
public void sendMessageClusterDomain(ClusterMessage message) throws IOException {
long time = 0;
if (doTransmitterProcessingStats) {
time = System.currentTimeMillis();
}
try {
String domain = message.getAddress().getDomain();
if(domain == null)
throw new RuntimeException("Domain at member not set");
ClusterData data = serialize(message);
IDataSender[] senders = getSenders();
for (int i = 0; i < senders.length; i++) {
IDataSender sender = senders[i];
if(domain.equals(sender.getDomain())) {
try {
sendMessageData(data, sender);
} catch (Exception x) {
if (!sender.getSuspect()) {
log.warn("Unable to send replicated message to "
+ sender + ", is server down?", x);
sender.setSuspect(true);
}
}
}
}
} finally {
if (doTransmitterProcessingStats) {
addProcessingStats(time);
}
}
}
Send to all senders at same cluster domain as message from address |
protected void sendMessageData(ClusterData data,
IDataSender sender) throws IOException {
if (sender == null)
throw new java.io.IOException(
"Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
try {
// deprecated not needed DataSender#pushMessage can handle connection
if (autoConnect) {
synchronized(sender) {
if(!sender.isConnected())
sender.connect();
}
}
sender.sendMessage(data);
sender.setSuspect(false);
addStats(data.getMessage().length);
} catch (Exception x) {
if (log.isWarnEnabled()) {
if (!sender.getSuspect()) {
log.warn("Unable to send replicated message, is server down?",x);
}
}
sender.setSuspect(true);
failureCounter++;
}
}
Send message to concrete sender. If autoConnect is true, check is
connection broken and the reconnect the complete sender.
- failure the suspect flag is set true. After successfully sending the
suspect flag is set to false.
- Stats is only update after sussesfull sending
|
protected ClusterData serialize(ClusterMessage msg) throws IOException {
msg.setTimestamp(System.currentTimeMillis());
ByteArrayOutputStream outs = new ByteArrayOutputStream();
ObjectOutputStream out;
GZIPOutputStream gout = null;
ClusterData data = new ClusterData();
data.setType(msg.getClass().getName());
data.setUniqueId(msg.getUniqueId());
data.setTimestamp(msg.getTimestamp());
data.setCompress(msg.getCompress());
data.setResend(msg.getResend());
// FIXME add Stats how much comress and uncompress messages and bytes are transfered
if ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN)
|| msg.getCompress() == ClusterMessage.FLAG_ALLOWED) {
gout = new GZIPOutputStream(outs);
out = new ObjectOutputStream(gout);
} else {
out = new ObjectOutputStream(outs);
}
out.writeObject(msg);
// flush out the gzip stream to byte buffer
if(gout != null) {
gout.flush();
gout.close();
}
data.setMessage(outs.toByteArray());
return data;
}
serialize message and add timestamp |
public void setAckTimeout(long ackTimeout) {
this.ackTimeout = ackTimeout;
setProperty("ackTimeout", String.valueOf(ackTimeout));
}
|
public void setAutoConnect(boolean autoConnect) {
this.autoConnect = autoConnect;
setProperty("autoConnect", String.valueOf(autoConnect));
}
|
public void setCatalinaCluster(SimpleTcpCluster cluster) {
this.cluster = cluster;
}
|
public void setCompress(boolean compressMessageData) {
this.compress = compressMessageData;
}
|
public void setDoTransmitterProcessingStats(boolean doProcessingStats) {
this.doTransmitterProcessingStats = doProcessingStats;
}
|
public void setObjectName(ObjectName name) {
objectName = name;
}
|
public void setProcessSenderFrequency(int processSenderFrequency) {
this.processSenderFrequency = processSenderFrequency;
}
|
public void setProperty(String name,
Object value) {
if (log.isTraceEnabled())
log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
value, properties.get(name)));
properties.put(name, value);
}
set config attributes with reflect |
public void setReplicationMode(String mode) {
String msg = IDataSenderFactory.validateMode(mode);
if (msg == null) {
if (log.isDebugEnabled())
log.debug("Setting replcation mode to " + mode);
this.replicationMode = mode;
} else
throw new IllegalArgumentException(msg);
}
set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue) |
public void setWaitForAck(boolean waitForAck) {
this.waitForAck = waitForAck;
setProperty("waitForAck", String.valueOf(waitForAck));
}
|
public void start() throws IOException {
if (cluster != null) {
ObjectName clusterName = cluster.getObjectName();
ObjectName transmitterName = null ;
try {
MBeanServer mserver = cluster.getMBeanServer();
Container container = cluster.getContainer();
String name = clusterName.getDomain() + ":type=ClusterSender";
if (container instanceof StandardHost) {
name += ",host=" + clusterName.getKeyProperty("host");
}
transmitterName = new ObjectName(name);
if (mserver.isRegistered(transmitterName)) {
if (log.isWarnEnabled())
log.warn(sm.getString(
"cluster.mbean.register.allready",
transmitterName));
return;
}
setObjectName(transmitterName);
mserver.registerMBean(cluster.getManagedBean(this),
getObjectName());
if(log.isInfoEnabled())
log.info(sm.getString("ReplicationTransmitter.started",
clusterName, transmitterName));
} catch (Exception e) {
log.warn(e);
}
}
}
start the sender and register transmitter mbean |
public synchronized void stop() {
Iterator i = map.entrySet().iterator();
while (i.hasNext()) {
IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
.getValue();
try {
unregisterSenderMBean(sender);
sender.disconnect();
} catch (Exception x) {
}
i.remove();
}
if (cluster != null && getObjectName() != null) {
try {
MBeanServer mserver = cluster.getMBeanServer();
mserver.unregisterMBean(getObjectName());
} catch (Exception e) {
log.error(e);
}
if(log.isInfoEnabled())
log.info(sm.getString("ReplicationTransmitter.stopped",
cluster.getObjectName(), getObjectName()));
}
}
|
protected void transferSenderProperty(IDataSender sender) {
for (Iterator iter = getPropertyNames(); iter.hasNext();) {
String pkey = (String) iter.next();
Object value = getProperty(pkey);
IntrospectionUtils.setProperty(sender, pkey, value.toString());
}
}
Transfer all properties from transmitter to concrete sender |
protected void unregisterSenderMBean(IDataSender sender) {
try {
MBeanServer mserver = cluster.getMBeanServer();
if (mserver != null) {
mserver.unregisterMBean(getSenderObjectName(sender));
}
} catch (Exception e) {
log.warn(e);
}
}
unregsister sendern Mbean |