| Method from org.apache.catalina.tribes.membership.McastServiceImpl Detail: |
protected void checkExpired() {
synchronized (expiredMutex) {
MemberImpl[] expired = membership.expire(timeToExpiration);
for (int i = 0; i < expired.length; i++) {
final MemberImpl member = expired[i];
if (log.isDebugEnabled())
log.debug("Mcast exipre member " + expired[i]);
try {
Thread t = new Thread() {
public void run() {
service.memberDisappeared(member);
}
};
t.start();
} catch (Exception x) {
log.error("Unable to process member disappeared message.", x);
}
}
}
}
|
public int getRecoveryCounter() {
return recoveryCounter;
}
|
public long getRecoverySleepTime() {
return recoverySleepTime;
}
|
public long getServiceStartTime() {
return this.serviceStartTime;
}
|
public void init() throws IOException {
setupSocket();
sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
sendPacket.setAddress(address);
sendPacket.setPort(port);
receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
receivePacket.setAddress(address);
receivePacket.setPort(port);
member.setCommand(new byte[0]);
member.getData(true, true);
if ( membership == null ) membership = new Membership(member);
}
|
public boolean isRecoveryEnabled() {
return recoveryEnabled;
}
|
public void receive() throws IOException {
try {
socket.receive(receivePacket);
if(receivePacket.getLength() > MAX_PACKET_SIZE) {
log.error("Multicast packet received was too long, dropping package:"+receivePacket.getLength());
} else {
byte[] data = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length);
final MemberImpl m = MemberImpl.getMember(data);
if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m);
Thread t = null;
if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
membership.removeMember(m);
t = new Thread() {
public void run() {
service.memberDisappeared(m);
}
};
} else if (membership.memberAlive(m)) {
if (log.isDebugEnabled()) log.debug("Mcast add member " + m);
t = new Thread() {
public void run() {
service.memberAdded(m);
}
};
} //end if
if ( t != null ) t.start();
}
} catch (SocketTimeoutException x ) {
//do nothing, this is normal, we don't want to block forever
//since the receive thread is the same thread
//that does membership expiration
}
checkExpired();
}
Receive a datagram packet, locking wait |
public void send(boolean checkexpired) throws IOException {
//ignore if we haven't started the sender
//if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
member.inc();
if(log.isTraceEnabled())
log.trace("Mcast send ping from member " + member);
byte[] data = member.getData();
DatagramPacket p = new DatagramPacket(data,data.length);
p.setAddress(address);
p.setPort(port);
socket.send(p);
if ( checkexpired ) checkExpired();
}
|
public void setRecoveryCounter(int recoveryCounter) {
this.recoveryCounter = recoveryCounter;
}
|
public void setRecoveryEnabled(boolean recoveryEnabled) {
this.recoveryEnabled = recoveryEnabled;
}
|
public void setRecoverySleepTime(long recoverySleepTime) {
this.recoverySleepTime = recoverySleepTime;
}
|
protected void setupSocket() throws IOException {
if (mcastBindAddress != null) {
try {
log.info("Attempting to bind the multicast socket to "+address+":"+port);
socket = new MulticastSocket(new InetSocketAddress(address,port));
} catch (BindException e) {
/*
* On some plattforms (e.g. Linux) it is not possible to bind
* to the multicast address. In this case only bind to the
* port.
*/
log.info("Binding to multicast address, failed. Binding to port only.");
socket = new MulticastSocket(port);
}
} else {
socket = new MulticastSocket(port);
}
socket.setLoopbackMode(false); //hint that we don't need loop back messages
if (mcastBindAddress != null) {
if(log.isInfoEnabled())
log.info("Setting multihome multicast interface to:" +mcastBindAddress);
socket.setInterface(mcastBindAddress);
} //end if
//force a so timeout so that we don't block forever
if ( mcastSoTimeout < = 0 ) mcastSoTimeout = (int)sendFrequency;
if(log.isInfoEnabled())
log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
socket.setSoTimeout(mcastSoTimeout);
if ( mcastTTL >= 0 ) {
if(log.isInfoEnabled())
log.info("Setting cluster mcast TTL to " + mcastTTL);
socket.setTimeToLive(mcastTTL);
}
}
|
public synchronized void start(int level) throws IOException {
boolean valid = false;
if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
if ( receiver != null ) throw new IllegalStateException("McastService.receive already running.");
if ( sender == null ) socket.joinGroup(address);
doRunReceiver = true;
receiver = new ReceiverThread();
receiver.setDaemon(true);
receiver.start();
valid = true;
}
if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
if ( sender != null ) throw new IllegalStateException("McastService.send already running.");
if ( receiver == null ) socket.joinGroup(address);
//make sure at least one packet gets out there
send(false);
doRunSender = true;
serviceStartTime = System.currentTimeMillis();
sender = new SenderThread(sendFrequency);
sender.setDaemon(true);
sender.start();
//we have started the receiver, but not yet waited for membership to establish
valid = true;
}
if (!valid) {
throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
}
//pause, once or twice
waitForMembers(level);
startLevel = (startLevel | level);
}
|
public synchronized boolean stop(int level) throws IOException {
boolean valid = false;
if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
valid = true;
doRunReceiver = false;
if ( receiver !=null ) receiver.interrupt();
receiver = null;
}
if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
valid = true;
doRunSender = false;
if ( sender != null )sender.interrupt();
sender = null;
}
if (!valid) {
throw new IllegalArgumentException("Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
}
startLevel = (startLevel & (~level));
//we're shutting down, send a shutdown message and close the socket
if ( startLevel == 0 ) {
//send a stop message
member.setCommand(Member.SHUTDOWN_PAYLOAD);
member.getData(true, true);
send(false);
//leave mcast group
try {socket.leaveGroup(address);}catch ( Exception ignore){}
serviceStartTime = Long.MAX_VALUE;
}
return (startLevel == 0);
}
|