| Method from org.apache.catalina.cluster.session.SimpleTcpReplicationManager Detail: |
public Session createSession(String sessionId) {
//create a session and notify the other nodes in the cluster
Session session = createSession(sessionId,getDistributable(),true);
add(session);
return session;
}
Construct and return a new session object, based on the default
settings specified by this Manager's properties. The session
id will be assigned by this method, and available via the getId()
method of the returned session. If a new session cannot be created
for any reason, return null. |
protected Session createSession(String sessionId,
boolean notify,
boolean setId) {
//inherited from the basic manager
if ((getMaxActiveSessions() >= 0) &&
(sessions.size() >= getMaxActiveSessions()))
throw new IllegalStateException(sm.getString("standardManager.createSession.ise"));
Session session = new ReplicatedSession(this);
// Initialize the properties of the new session and return it
session.setNew(true);
session.setValid(true);
session.setCreationTime(System.currentTimeMillis());
session.setMaxInactiveInterval(this.maxInactiveInterval);
if(sessionId == null)
sessionId = generateSessionId();
if ( setId ) session.setId(sessionId);
if ( notify && (cluster!=null) ) {
((ReplicatedSession)session).setIsDirty(true);
}
return (session);
}
Creates a HTTP session.
Most of the code in here is copied from the StandardManager.
This is not pretty, yeah I know, but it was necessary since the
StandardManager had hard coded the session instantiation to the a
StandardSession, when we actually want to instantiate a ReplicatedSession
If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
nodes in the cluster that this session has been created. |
public CatalinaCluster getCluster() {
return cluster;
}
|
public boolean getDistributable() {
return distributable;
}
|
public boolean getExpireSessionsOnShutdown() {
return mExpireSessionsOnShutdown;
}
|
public String[] getInvalidatedSessions() {
synchronized ( invalidatedSessions ) {
String[] result = new String[invalidatedSessions.size()];
invalidatedSessions.values().toArray(result);
return result;
}
}
|
public String getName() {
return this.name;
}
|
public boolean isDefaultMode() {
return defaultMode;
}
|
public boolean isManagerRunning() {
return mManagerRunning;
}
|
public boolean isNotifyListenersOnReplication() {
return notifyListenersOnReplication;
}
|
public boolean isSendClusterDomainOnly() {
return sendClusterDomainOnly;
}
|
public boolean isStateTransferred() {
return stateTransferred;
}
|
public void messageDataReceived(ClusterMessage cmsg) {
try {
if ( cmsg instanceof SessionMessage ) {
SessionMessage msg = (SessionMessage)cmsg;
messageReceived(msg,
msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
} catch(Throwable ex){
log.error("InMemoryReplicationManager.messageDataReceived()", ex);
}//catch
}
|
protected void messageReceived(SessionMessage msg,
Member sender) {
try {
if(log.isInfoEnabled()) {
log.debug("Received SessionMessage of type="+msg.getEventTypeString());
log.debug("Received SessionMessage sender="+sender);
}
switch ( msg.getEventType() ) {
case SessionMessage.EVT_GET_ALL_SESSIONS: {
//get a list of all the session from this manager
Object[] sessions = findSessions();
java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
oout.writeInt(sessions.length);
for (int i=0; i< sessions.length; i++){
ReplicatedSession ses = (ReplicatedSession)sessions[i];
oout.writeUTF(ses.getIdInternal());
byte[] data = writeSession(ses);
oout.writeObject(data);
}//for
//don't send a message if we don't have to
oout.flush();
oout.close();
byte[] data = bout.toByteArray();
SessionMessage newmsg = new SessionMessageImpl(name,
SessionMessage.EVT_ALL_SESSION_DATA,
data, "SESSION-STATE","SESSION-STATE-"+getName());
cluster.send(newmsg, sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_DATA: {
java.io.ByteArrayInputStream bin =
new java.io.ByteArrayInputStream(msg.getSession());
java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
int size = oin.readInt();
for ( int i=0; i< size; i++) {
String id = oin.readUTF();
byte[] data = (byte[])oin.readObject();
Session session = readSession(data,id);
}//for
stateTransferred=true;
break;
}
case SessionMessage.EVT_SESSION_CREATED: {
Session session = this.readSession(msg.getSession(),msg.getSessionID());
if ( log.isDebugEnabled() ) {
log.debug("Received replicated session=" + session +
" isValid=" + session.isValid());
}
break;
}
case SessionMessage.EVT_SESSION_EXPIRED: {
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.expire();
this.remove(session);
}//end if
break;
}
case SessionMessage.EVT_SESSION_ACCESSED :{
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.access();
session.endAccess();
}
break;
}
default: {
//we didn't recognize the message type, do nothing
break;
}
}//switch
}
catch ( Exception x )
{
log.error("Unable to receive message through TCP channel",x);
}
}
This method is called by the received thread when a SessionMessage has
been received from one of the other nodes in the cluster. |
protected Session readSession(byte[] data,
String sessionId) {
try
{
java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream(data);
ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader());
Session session = sessionId!=null?this.findSession(sessionId):null;
boolean isNew = (session==null);
//clear the old values from the existing session
if ( session!=null ) {
ReplicatedSession rs = (ReplicatedSession)session;
rs.expire(false); //cleans up the previous values, since we are not doing removes
session = null;
}//end if
if (session==null) {
session = createSession(null,false, false);
sessions.remove(session.getIdInternal());
}
boolean hasPrincipal = session_in.readBoolean();
SerializablePrincipal p = null;
if ( hasPrincipal )
p = (SerializablePrincipal)session_in.readObject();
((ReplicatedSession)session).readObjectData(session_in);
if ( hasPrincipal )
session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
((ReplicatedSession)session).setId(sessionId,isNew);
ReplicatedSession rsession = (ReplicatedSession)session;
rsession.setAccessCount(1);
session.setManager(this);
session.setValid(true);
rsession.setLastAccessedTime(System.currentTimeMillis());
rsession.setThisAccessedTime(System.currentTimeMillis());
((ReplicatedSession)session).setAccessCount(0);
session.setNew(false);
if(log.isTraceEnabled())
log.trace("Session loaded id="+sessionId +
" actualId="+session.getId()+
" exists="+this.sessions.containsKey(sessionId)+
" valid="+rsession.isValid());
return session;
}
catch ( Exception x )
{
log.error("Failed to deserialize the session!",x);
}
return null;
}
Reinstantiates a serialized session from the data passed in.
This will first call createSession() so that we get a fresh instance with all
the managers set and all the transient fields validated.
Then it calls Session.readObjectData(byte[]) to deserialize the object |
public ClusterMessage requestCompleted(String sessionId) {
if ( !getDistributable() ) {
log.warn("Received requestCompleted message, although this context["+
getName()+"] is not distributable. Ignoring message");
return null;
}
//notify javagroups
try
{
if ( invalidatedSessions.get(sessionId) != null ) {
synchronized ( invalidatedSessions ) {
invalidatedSessions.remove(sessionId);
SessionMessage msg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_EXPIRED,
null,
sessionId,
sessionId);
return msg;
}
} else {
ReplicatedSession session = (ReplicatedSession) findSession(
sessionId);
if (session != null) {
//return immediately if the session is not dirty
if (useDirtyFlag && (!session.isDirty())) {
//but before we return doing nothing,
//see if we should send
//an updated last access message so that
//sessions across cluster dont expire
long interval = session.getMaxInactiveInterval();
long lastaccdist = System.currentTimeMillis() -
session.getLastAccessWasDistributed();
if ( ((interval*1000) / lastaccdist)< 3 ) {
SessionMessage accmsg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId,
sessionId);
session.setLastAccessWasDistributed(System.currentTimeMillis());
return accmsg;
}
return null;
}
session.setIsDirty(false);
if (log.isDebugEnabled()) {
try {
log.debug("Sending session to cluster=" + session);
}
catch (Exception ignore) {}
}
SessionMessage msg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_CREATED,
writeSession(session),
session.getIdInternal(),
session.getIdInternal());
return msg;
} //end if
}//end if
}
catch (Exception x )
{
log.error("Unable to replicate session",x);
}
return null;
}
|
public void sessionInvalidated(String sessionId) {
synchronized ( invalidatedSessions ) {
invalidatedSessions.put(sessionId, sessionId);
}
}
|
public void setCluster(CatalinaCluster cluster) {
if(log.isDebugEnabled())
log.debug("Cluster associated with SimpleTcpReplicationManager");
this.cluster = cluster;
}
|
public void setDefaultMode(boolean defaultMode) {
this.defaultMode = defaultMode;
}
|
public void setDistributable(boolean dist) {
this.distributable = dist;
}
|
public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
mExpireSessionsOnShutdown = expireSessionsOnShutdown;
}
|
public void setName(String name) {
this.name = name;
}
|
public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
this.notifyListenersOnReplication = notifyListenersOnReplication;
}
|
public void setPrintToScreen(boolean printtoscreen) {
if(log.isDebugEnabled())
log.debug("Setting screen debug to:"+printtoscreen);
mPrintToScreen = printtoscreen;
}
|
public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
this.sendClusterDomainOnly = sendClusterDomainOnly;
}
|
public void setSynchronousReplication(boolean flag) {
synchronousReplication=flag;
}
|
public void setUseDirtyFlag(boolean usedirtyflag) {
this.useDirtyFlag = usedirtyflag;
}
|
public void start() throws LifecycleException {
mManagerRunning = true;
super.start();
//start the javagroups channel
try {
//the channel is already running
if ( mChannelStarted ) return;
if(log.isInfoEnabled())
log.info("Starting clustering manager...:"+getName());
if ( cluster == null ) {
log.error("Starting... no cluster associated with this context:"+getName());
return;
}
cluster.addManager(getName(),this);
if (cluster.getMembers().length > 0) {
Member mbr = cluster.getMembers()[0];
SessionMessage msg =
new SessionMessageImpl(this.getName(),
SessionMessage.EVT_GET_ALL_SESSIONS,
null,
"GET-ALL",
"GET-ALL-"+this.getName());
cluster.send(msg, mbr);
if(log.isWarnEnabled())
log.warn("Manager["+getName()+"], requesting session state from "+mbr+
". This operation will timeout if no session state has been received within "+
"60 seconds");
long reqStart = System.currentTimeMillis();
long reqNow = 0;
boolean isTimeout=false;
do {
try {
Thread.sleep(100);
}catch ( Exception sleep) {}
reqNow = System.currentTimeMillis();
isTimeout=((reqNow-reqStart) >(1000*60));
} while ( (!isStateTransferred()) && (!isTimeout));
if ( isTimeout || (!isStateTransferred()) ) {
log.error("Manager["+getName()+"], No session state received, timing out.");
}else {
if(log.isInfoEnabled())
log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
}
} else {
if(log.isInfoEnabled())
log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
}//end if
mChannelStarted = true;
} catch ( Exception x ) {
log.error("Unable to start SimpleTcpReplicationManager",x);
}
}
Prepare for the beginning of active use of the public methods of this
component. This method should be called after configure(),
and before any of the public methods of the component are utilized.
Starts the cluster communication channel, this will connect with the other nodes
in the cluster, and request the current session state to be transferred to this node. |
public void stop() throws LifecycleException {
mManagerRunning = false;
mChannelStarted = false;
super.stop();
//stop the javagroup channel
try
{
this.sessions.clear();
cluster.removeManager(getName(),this);
// mReplicationListener.stopListening();
// mReplicationTransmitter.stop();
// service.stop();
// service = null;
}
catch ( Exception x )
{
log.error("Unable to stop SimpleTcpReplicationManager",x);
}
}
Gracefully terminate the active use of the public methods of this
component. This method should be the last one called on a given
instance of this component.
This will disconnect the cluster communication channel and stop the listener thread. |
public void unload() throws IOException {
if ( !getDistributable() ) {
super.unload();
}
}
Override persistence since they don't go hand in hand with replication for now. |
protected byte[] writeSession(Session session) {
try
{
java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
session_out.flush();
boolean hasPrincipal = session.getPrincipal() != null;
session_out.writeBoolean(hasPrincipal);
if ( hasPrincipal )
{
session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
}//end if
((ReplicatedSession)session).writeObjectData(session_out);
return session_data.toByteArray();
}
catch ( Exception x )
{
log.error("Failed to serialize the session!",x);
}
return null;
}
Serialize a session into a byte array
This method simple calls the writeObjectData method on the session
and returns the byte data from that call |