Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » session » [javadoc | source]
    1   /*
    2    * Copyright 1999,2004-2005 The Apache Software Foundation.
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License");
    5    * you may not use this file except in compliance with the License.
    6    * You may obtain a copy of the License at
    7    * 
    8    *      http://www.apache.org/licenses/LICENSE-2.0
    9    * 
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS,
   12    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13    * See the License for the specific language governing permissions and
   14    * limitations under the License.
   15    */
   16   
   17   package org.apache.catalina.cluster.session;
   18   
   19   import java.beans.PropertyChangeEvent;
   20   import java.beans.PropertyChangeListener;
   21   import java.io.BufferedInputStream;
   22   import java.io.BufferedOutputStream;
   23   import java.io.ByteArrayInputStream;
   24   import java.io.ByteArrayOutputStream;
   25   import java.io.IOException;
   26   import java.io.ObjectInputStream;
   27   import java.io.ObjectOutputStream;
   28   import java.util.ArrayList;
   29   import java.util.Date;
   30   import java.util.Iterator;
   31   
   32   import org.apache.catalina.Cluster;
   33   import org.apache.catalina.Container;
   34   import org.apache.catalina.Context;
   35   import org.apache.catalina.Engine;
   36   import org.apache.catalina.Host;
   37   import org.apache.catalina.Lifecycle;
   38   import org.apache.catalina.LifecycleException;
   39   import org.apache.catalina.LifecycleListener;
   40   import org.apache.catalina.Loader;
   41   import org.apache.catalina.Session;
   42   import org.apache.catalina.cluster.CatalinaCluster;
   43   import org.apache.catalina.cluster.ClusterManager;
   44   import org.apache.catalina.cluster.ClusterMessage;
   45   import org.apache.catalina.cluster.Member;
   46   import org.apache.catalina.session.ManagerBase;
   47   import org.apache.catalina.util.CustomObjectInputStream;
   48   import org.apache.catalina.util.LifecycleSupport;
   49   import org.apache.catalina.util.StringManager;
   50   
   51   /**
   52    * The DeltaManager manages replicated sessions by only replicating the deltas
   53    * in data. For applications written to handle this, the DeltaManager is the
   54    * optimal way of replicating data.
   55    * 
   56    * This code is almost identical to StandardManager with a difference in how it
   57    * persists sessions and some modifications to it.
   58    * 
   59    * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
   60    * reloading depends upon external calls to the <code>start()</code> and
   61    * <code>stop()</code> methods of this class at the correct times.
   62    * 
   63    * @author Filip Hanik
   64    * @author Craig R. McClanahan
   65    * @author Jean-Francois Arcand
   66    * @author Peter Rossbach
   67    * @version $Revision: 348972 $ $Date: 2005-11-25 10:56:09 -0500 (Fri, 25 Nov 2005) $
   68    */
   69   
   70   public class DeltaManager extends ManagerBase implements Lifecycle,
   71           PropertyChangeListener, ClusterManager {
   72   
   73       // ---------------------------------------------------- Security Classes
   74   
   75       public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
   76               .getLog(DeltaManager.class);
   77   
   78       /**
   79        * The string manager for this package.
   80        */
   81       protected static StringManager sm = StringManager
   82               .getManager(Constants.Package);
   83   
   84       // ----------------------------------------------------- Instance Variables
   85   
   86       /**
   87        * The descriptive information about this implementation.
   88        */
   89       private static final String info = "DeltaManager/2.0";
   90   
   91       /**
   92        * Has this component been started yet?
   93        */
   94       private boolean started = false;
   95   
   96       /**
   97        * The descriptive name of this Manager implementation (for logging).
   98        */
   99       protected static String managerName = "DeltaManager";
  100   
  101       protected String name = null;
  102       
  103       protected boolean defaultMode = false;
  104   
  105       private CatalinaCluster cluster = null;
  106   
  107       /**
  108        * The lifecycle event support for this component.
  109        */
  110       protected LifecycleSupport lifecycle = new LifecycleSupport(this);
  111   
  112       /**
  113        * The maximum number of active Sessions allowed, or -1 for no limit.
  114        */
  115       private int maxActiveSessions = -1;
  116       
  117       private boolean expireSessionsOnShutdown = false;
  118   
  119       private boolean notifyListenersOnReplication = true;
  120   
  121       private boolean notifySessionListenersOnReplication = true;
  122   
  123       private boolean stateTransferred = false ;
  124   
  125       private int stateTransferTimeout = 60;
  126   
  127       private boolean sendAllSessions = true;
  128   
  129       private boolean sendClusterDomainOnly = true ;
  130       
  131       private int sendAllSessionsSize = 1000 ;
  132       
  133       /**
  134        * wait time between send session block (default 2 sec) 
  135        */
  136       private int sendAllSessionsWaitTime = 2 * 1000 ; 
  137   
  138       private ArrayList receivedMessageQueue = new ArrayList() ;
  139       
  140       private boolean receiverQueue = false ;
  141   
  142       private boolean stateTimestampDrop = true ;
  143   
  144       private long stateTransferCreateSendTime; 
  145       
  146       // ------------------------------------------------------------------ stats attributes
  147       
  148       int rejectedSessions = 0;
  149   
  150       private long sessionReplaceCounter = 0 ;
  151   
  152       long processingTime = 0;
  153   
  154       private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
  155   
  156       private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
  157   
  158       private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
  159   
  160       private long counterReceive_EVT_SESSION_CREATED = 0 ;
  161   
  162       private long counterReceive_EVT_SESSION_EXPIRED = 0;
  163   
  164       private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
  165   
  166       private long counterReceive_EVT_SESSION_DELTA = 0;
  167   
  168       private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
  169   
  170       private long counterSend_EVT_SESSION_CREATED = 0;
  171   
  172       private long counterSend_EVT_SESSION_DELTA = 0 ;
  173   
  174       private long counterSend_EVT_SESSION_ACCESSED = 0;
  175   
  176       private long counterSend_EVT_SESSION_EXPIRED = 0;
  177   
  178       private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
  179   
  180       private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
  181   
  182       private int counterNoStateTransfered = 0 ;
  183   
  184   
  185       // ------------------------------------------------------------- Constructor
  186     
  187       public DeltaManager() {
  188           super();
  189       }
  190   
  191       // ------------------------------------------------------------- Properties
  192       
  193       /**
  194        * Return descriptive information about this Manager implementation and the
  195        * corresponding version number, in the format
  196        * <code>&lt;description&gt;/&lt;version&gt;</code>.
  197        */
  198       public String getInfo() {
  199   
  200           return (info);
  201   
  202       }
  203   
  204       public void setName(String name) {
  205           this.name = name;
  206       }
  207   
  208       /**
  209        * Return the descriptive short name of this Manager implementation.
  210        */
  211       public String getName() {
  212   
  213           return (name);
  214   
  215       }
  216   
  217       /**
  218        * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
  219        */
  220       public long getCounterSend_EVT_GET_ALL_SESSIONS() {
  221           return counterSend_EVT_GET_ALL_SESSIONS;
  222       }
  223       
  224       /**
  225        * @return Returns the counterSend_EVT_SESSION_ACCESSED.
  226        */
  227       public long getCounterSend_EVT_SESSION_ACCESSED() {
  228           return counterSend_EVT_SESSION_ACCESSED;
  229       }
  230       
  231       /**
  232        * @return Returns the counterSend_EVT_SESSION_CREATED.
  233        */
  234       public long getCounterSend_EVT_SESSION_CREATED() {
  235           return counterSend_EVT_SESSION_CREATED;
  236       }
  237   
  238       /**
  239        * @return Returns the counterSend_EVT_SESSION_DELTA.
  240        */
  241       public long getCounterSend_EVT_SESSION_DELTA() {
  242           return counterSend_EVT_SESSION_DELTA;
  243       }
  244   
  245       /**
  246        * @return Returns the counterSend_EVT_SESSION_EXPIRED.
  247        */
  248       public long getCounterSend_EVT_SESSION_EXPIRED() {
  249           return counterSend_EVT_SESSION_EXPIRED;
  250       }
  251    
  252       /**
  253        * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
  254        */
  255       public long getCounterSend_EVT_ALL_SESSION_DATA() {
  256           return counterSend_EVT_ALL_SESSION_DATA;
  257       }
  258   
  259       /**
  260        * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
  261        */
  262       public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
  263           return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
  264       }
  265    
  266       /**
  267        * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
  268        */
  269       public long getCounterReceive_EVT_ALL_SESSION_DATA() {
  270           return counterReceive_EVT_ALL_SESSION_DATA;
  271       }
  272       
  273       /**
  274        * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
  275        */
  276       public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
  277           return counterReceive_EVT_GET_ALL_SESSIONS;
  278       }
  279       
  280       /**
  281        * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
  282        */
  283       public long getCounterReceive_EVT_SESSION_ACCESSED() {
  284           return counterReceive_EVT_SESSION_ACCESSED;
  285       }
  286       
  287       /**
  288        * @return Returns the counterReceive_EVT_SESSION_CREATED.
  289        */
  290       public long getCounterReceive_EVT_SESSION_CREATED() {
  291           return counterReceive_EVT_SESSION_CREATED;
  292       }
  293       
  294       /**
  295        * @return Returns the counterReceive_EVT_SESSION_DELTA.
  296        */
  297       public long getCounterReceive_EVT_SESSION_DELTA() {
  298           return counterReceive_EVT_SESSION_DELTA;
  299       }
  300       
  301       /**
  302        * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
  303        */
  304       public long getCounterReceive_EVT_SESSION_EXPIRED() {
  305           return counterReceive_EVT_SESSION_EXPIRED;
  306       }
  307       
  308       
  309       /**
  310        * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
  311        */
  312       public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
  313           return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
  314       }
  315       
  316       /**
  317        * @return Returns the processingTime.
  318        */
  319       public long getProcessingTime() {
  320           return processingTime;
  321       }
  322    
  323       /**
  324        * @return Returns the sessionReplaceCounter.
  325        */
  326       public long getSessionReplaceCounter() {
  327           return sessionReplaceCounter;
  328       }
  329       
  330       /**
  331        * Number of session creations that failed due to maxActiveSessions
  332        * 
  333        * @return The count
  334        */
  335       public int getRejectedSessions() {
  336           return rejectedSessions;
  337       }
  338   
  339       public void setRejectedSessions(int rejectedSessions) {
  340           this.rejectedSessions = rejectedSessions;
  341       }
  342   
  343       /**
  344        * @return Returns the counterNoStateTransfered.
  345        */
  346       public int getCounterNoStateTransfered() {
  347           return counterNoStateTransfered;
  348       }
  349       
  350       public int getReceivedQueueSize() {
  351           return receivedMessageQueue.size() ;
  352       }
  353       
  354       /**
  355        * @return Returns the stateTransferTimeout.
  356        */
  357       public int getStateTransferTimeout() {
  358           return stateTransferTimeout;
  359       }
  360       /**
  361        * @param timeoutAllSession The timeout
  362        */
  363       public void setStateTransferTimeout(int timeoutAllSession) {
  364           this.stateTransferTimeout = timeoutAllSession;
  365       }
  366   
  367       public boolean getStateTransferred() {
  368           return stateTransferred;
  369       }
  370   
  371       public void setStateTransferred(boolean stateTransferred) {
  372           this.stateTransferred = stateTransferred;
  373       }
  374       
  375       /**
  376        * @return Returns the sendAllSessionsWaitTime in msec
  377        */
  378       public int getSendAllSessionsWaitTime() {
  379           return sendAllSessionsWaitTime;
  380       }
  381       
  382       /**
  383        * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
  384        */
  385       public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
  386           this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
  387       }
  388       
  389       /**
  390        * @return Returns the sendClusterDomainOnly.
  391        */
  392       public boolean isSendClusterDomainOnly() {
  393           return sendClusterDomainOnly;
  394       }
  395       
  396       /**
  397        * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
  398        */
  399       public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
  400           this.sendClusterDomainOnly = sendClusterDomainOnly;
  401       }
  402   
  403       /**
  404        * @return Returns the stateTimestampDrop.
  405        */
  406       public boolean isStateTimestampDrop() {
  407           return stateTimestampDrop;
  408       }
  409       
  410       /**
  411        * @param isTimestampDrop The new flag value
  412        */
  413       public void setStateTimestampDrop(boolean isTimestampDrop) {
  414           this.stateTimestampDrop = isTimestampDrop;
  415       }
  416       
  417       /**
  418        * Return the maximum number of active Sessions allowed, or -1 for no limit.
  419        */
  420       public int getMaxActiveSessions() {
  421   
  422           return (this.maxActiveSessions);
  423   
  424       }
  425   
  426       /**
  427        * Set the maximum number of actives Sessions allowed, or -1 for no limit.
  428        * 
  429        * @param max
  430        *            The new maximum number of sessions
  431        */
  432       public void setMaxActiveSessions(int max) {
  433   
  434           int oldMaxActiveSessions = this.maxActiveSessions;
  435           this.maxActiveSessions = max;
  436           support.firePropertyChange("maxActiveSessions", new Integer(
  437                   oldMaxActiveSessions), new Integer(this.maxActiveSessions));
  438   
  439       }
  440       
  441       /**
  442        * @return Returns the sendAllSessions.
  443        */
  444       public boolean isSendAllSessions() {
  445           return sendAllSessions;
  446       }
  447       
  448       /**
  449        * @param sendAllSessions The sendAllSessions to set.
  450        */
  451       public void setSendAllSessions(boolean sendAllSessions) {
  452           this.sendAllSessions = sendAllSessions;
  453       }
  454       
  455       /**
  456        * @return Returns the sendAllSessionsSize.
  457        */
  458       public int getSendAllSessionsSize() {
  459           return sendAllSessionsSize;
  460       }
  461       
  462       /**
  463        * @param sendAllSessionsSize The sendAllSessionsSize to set.
  464        */
  465       public void setSendAllSessionsSize(int sendAllSessionsSize) {
  466           this.sendAllSessionsSize = sendAllSessionsSize;
  467       }
  468       
  469       /**
  470        * @return Returns the notifySessionListenersOnReplication.
  471        */
  472       public boolean isNotifySessionListenersOnReplication() {
  473           return notifySessionListenersOnReplication;
  474       }
  475       
  476       /**
  477        * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
  478        */
  479       public void setNotifySessionListenersOnReplication(
  480               boolean notifyListenersCreateSessionOnReplication) {
  481           this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
  482       }
  483       
  484       
  485       public boolean isExpireSessionsOnShutdown() {
  486           return expireSessionsOnShutdown;
  487       }
  488   
  489       public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
  490           this.expireSessionsOnShutdown = expireSessionsOnShutdown;
  491       }
  492       
  493       public boolean isNotifyListenersOnReplication() {
  494           return notifyListenersOnReplication;
  495       }
  496   
  497       public void setNotifyListenersOnReplication(
  498               boolean notifyListenersOnReplication) {
  499           this.notifyListenersOnReplication = notifyListenersOnReplication;
  500       }
  501   
  502       
  503       /**
  504        * @return Returns the defaultMode.
  505        */
  506       public boolean isDefaultMode() {
  507           return defaultMode;
  508       }
  509       /**
  510        * @param defaultMode The defaultMode to set.
  511        */
  512       public void setDefaultMode(boolean defaultMode) {
  513           this.defaultMode = defaultMode;
  514       }
  515       
  516       public CatalinaCluster getCluster() {
  517           return cluster;
  518       }
  519   
  520       public void setCluster(CatalinaCluster cluster) {
  521           this.cluster = cluster;
  522       }
  523   
  524       /**
  525        * Set the Container with which this Manager has been associated. If it is a
  526        * Context (the usual case), listen for changes to the session timeout
  527        * property.
  528        * 
  529        * @param container
  530        *            The associated Container
  531        */
  532       public void setContainer(Container container) {
  533   
  534           // De-register from the old Container (if any)
  535           if ((this.container != null) && (this.container instanceof Context))
  536               ((Context) this.container).removePropertyChangeListener(this);
  537   
  538           // Default processing provided by our superclass
  539           super.setContainer(container);
  540   
  541           // Register with the new Container (if any)
  542           if ((this.container != null) && (this.container instanceof Context)) {
  543               setMaxInactiveInterval(((Context) this.container)
  544                       .getSessionTimeout() * 60);
  545               ((Context) this.container).addPropertyChangeListener(this);
  546           }
  547   
  548       }
  549       
  550       // --------------------------------------------------------- Public Methods
  551   
  552       /**
  553        * Construct and return a new session object, based on the default settings
  554        * specified by this Manager's properties. The session id will be assigned
  555        * by this method, and available via the getId() method of the returned
  556        * session. If a new session cannot be created for any reason, return
  557        * <code>null</code>.
  558        * 
  559        * @exception IllegalStateException
  560        *                if a new session cannot be instantiated for any reason
  561        * 
  562        * Construct and return a new session object, based on the default settings
  563        * specified by this Manager's properties. The session id will be assigned
  564        * by this method, and available via the getId() method of the returned
  565        * session. If a new session cannot be created for any reason, return
  566        * <code>null</code>.
  567        * 
  568        * @exception IllegalStateException
  569        *                if a new session cannot be instantiated for any reason
  570        */
  571       public Session createSession(String sessionId) {
  572           return createSession(sessionId, true);
  573       }
  574   
  575       /**
  576        * create new session with check maxActiveSessions and send session creation
  577        * to other cluster nodes.
  578        * 
  579        * @param distribute
  580        * @return The session
  581        */
  582       public Session createSession(String sessionId, boolean distribute) {
  583   
  584           if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
  585               rejectedSessions++;
  586               throw new IllegalStateException(sm
  587                       .getString("deltaManager.createSession.ise"));
  588           }
  589   
  590           DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
  591           session.resetDeltaRequest();
  592           if (distribute) {
  593               sendCreateSession(session.getId(), session);
  594           }
  595           if (log.isDebugEnabled())
  596               log.debug(sm.getString("deltaManager.createSession.newSession",
  597                       session.getId(), new Integer(sessions.size())));
  598   
  599           return (session);
  600   
  601       }
  602   
  603       /**
  604        * Send create session evt to all backup node
  605        * @param sessionId
  606        * @param session
  607        */
  608       protected void sendCreateSession(String sessionId, DeltaSession session) {
  609           if(cluster.getMembers().length > 0 ) {
  610               SessionMessage msg = new SessionMessageImpl(getName(),
  611                       SessionMessage.EVT_SESSION_CREATED, null, sessionId,
  612                       sessionId + "-" + System.currentTimeMillis());
  613               if (log.isDebugEnabled())
  614                   log.debug(sm.getString("deltaManager.sendMessage.newSession",
  615                           name, sessionId));
  616               counterSend_EVT_SESSION_CREATED++;
  617               send(msg);
  618           }
  619           session.resetDeltaRequest();
  620       }
  621       
  622       /**
  623        * Send messages to other backup member (domain or all)
  624        * @param msg Session message
  625        */
  626       protected void send(SessionMessage msg) {
  627           if(cluster != null) {
  628               if(isSendClusterDomainOnly())
  629                   cluster.sendClusterDomain(msg);
  630               else
  631                   cluster.send(msg);
  632           }
  633       }
  634   
  635       /**
  636        * Create DeltaSession
  637        * @see org.apache.catalina.Manager#createEmptySession()
  638        */
  639       public Session createEmptySession() {
  640           return getNewDeltaSession() ;
  641       }
  642       
  643       /**
  644        * Get new session class to be used in the doLoad() method.
  645        */
  646       protected DeltaSession getNewDeltaSession() {
  647           return new DeltaSession(this);
  648       }
  649   
  650       /**
  651        * Load Deltarequest from external node
  652        * Load the Class at container classloader
  653        * @see DeltaRequest#readExternal(java.io.ObjectInput)
  654        * @param session
  655        * @param data message data
  656        * @return The request
  657        * @throws ClassNotFoundException
  658        * @throws IOException
  659        */
  660       protected DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data)
  661               throws ClassNotFoundException, IOException {
  662           ByteArrayInputStream fis = null;
  663           ReplicationStream ois = null;
  664           Loader loader = null;
  665           ClassLoader classLoader = null;
  666           //fix to be able to run the DeltaManager
  667           //stand alone without a container.
  668           //use the Threads context class loader
  669           if (container != null)
  670               loader = container.getLoader();
  671           if (loader != null)
  672               classLoader = loader.getClassLoader();
  673           else
  674               classLoader = Thread.currentThread().getContextClassLoader();
  675           //end fix
  676           fis = new ByteArrayInputStream(data);
  677           ois = new ReplicationStream(fis, classLoader);
  678           session.getDeltaRequest().readExternal(ois);
  679           ois.close();
  680           return session.getDeltaRequest();
  681       }
  682   
  683       /**
  684        * serialize DeltaRequest
  685        * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
  686        * 
  687        * @param deltaRequest
  688        * @return serialized delta request
  689        * @throws IOException
  690        */
  691       protected byte[] unloadDeltaRequest(DeltaRequest deltaRequest)
  692               throws IOException {
  693           ByteArrayOutputStream bos = new ByteArrayOutputStream();
  694           ObjectOutputStream oos = new ObjectOutputStream(bos);
  695           deltaRequest.writeExternal(oos);
  696           oos.flush();
  697           oos.close();
  698           return bos.toByteArray();
  699       }
  700   
  701       /**
  702        * Load sessions from other cluster node.
  703        * FIXME replace currently sessions with same id without notifcation.
  704        * FIXME SSO handling is not really correct with the session replacement!
  705        * @exception ClassNotFoundException
  706        *                if a serialized class cannot be found during the reload
  707        * @exception IOException
  708        *                if an input/output error occurs
  709        */
  710       protected void deserializeSessions(byte[] data) throws ClassNotFoundException,
  711               IOException {
  712   
  713           // Initialize our internal data structures
  714           //sessions.clear(); //should not do this
  715           // Open an input stream to the specified pathname, if any
  716           ClassLoader originalLoader = Thread.currentThread()
  717                   .getContextClassLoader();
  718           ObjectInputStream ois = null;
  719           // Load the previously unloaded active sessions
  720           try {
  721               ois = openDeserializeObjectStream(data);
  722               Integer count = (Integer) ois.readObject();
  723               int n = count.intValue();
  724               for (int i = 0; i < n; i++) {
  725                   DeltaSession session = (DeltaSession) createEmptySession();
  726                   session.readObjectData(ois);
  727                   session.setManager(this);
  728                   session.setValid(true);
  729                   session.setPrimarySession(false);
  730                   //in case the nodes in the cluster are out of
  731                   //time synch, this will make sure that we have the
  732                   //correct timestamp, isValid returns true, cause
  733                   // accessCount=1
  734                   session.access();
  735                   //make sure that the session gets ready to expire if
  736                   // needed
  737                   session.setAccessCount(0);
  738                   session.resetDeltaRequest();
  739                   // FIXME How inform other session id cache like SingleSignOn
  740                   // increment sessionCounter to correct stats report
  741                   if (findSession(session.getIdInternal()) == null ) {
  742                       sessionCounter++;
  743                   } else {
  744                       sessionReplaceCounter++;
  745                       // FIXME better is to grap this sessions again !
  746                       if (log.isWarnEnabled())
  747                           log.warn(sm.getString(
  748                                   "deltaManager.loading.existing.session",
  749                                   session.getIdInternal()));
  750                   }
  751                   add(session);
  752               }
  753           } catch (ClassNotFoundException e) {
  754               log.error(sm.getString("deltaManager.loading.cnfe", e), e);
  755               throw e;
  756           } catch (IOException e) {
  757               log.error(sm.getString("deltaManager.loading.ioe", e), e);
  758               throw e;
  759           } finally {
  760               // Close the input stream
  761               try {
  762                   if (ois != null)
  763                       ois.close();
  764               } catch (IOException f) {
  765                   // ignored
  766               }
  767               ois = null;
  768               if (originalLoader != null)
  769                   Thread.currentThread().setContextClassLoader(originalLoader);
  770           }
  771   
  772       }
  773   
  774       /**
  775        * Open Stream and use correct ClassLoader (Container) Switch
  776        * ThreadClassLoader
  777        * 
  778        * @param data
  779        * @return The object input stream
  780        * @throws IOException
  781        */
  782       protected ObjectInputStream openDeserializeObjectStream(byte[] data) throws IOException {
  783           ObjectInputStream ois = null;
  784           ByteArrayInputStream fis = null;
  785           try {
  786               Loader loader = null;
  787               ClassLoader classLoader = null;
  788               fis = new ByteArrayInputStream(data);
  789               BufferedInputStream bis = new BufferedInputStream(fis);
  790               if (container != null)
  791                   loader = container.getLoader();
  792               if (loader != null)
  793                   classLoader = loader.getClassLoader();
  794               if (classLoader != null) {
  795                   if (log.isTraceEnabled())
  796                       log.trace(sm.getString(
  797                               "deltaManager.loading.withContextClassLoader",
  798                               getName()));
  799                   ois = new CustomObjectInputStream(bis, classLoader);
  800                   Thread.currentThread().setContextClassLoader(classLoader);
  801               } else {
  802                   if (log.isTraceEnabled())
  803                       log.trace(sm.getString(
  804                               "deltaManager.loading.withoutClassLoader",
  805                               getName()));
  806                   ois = new ObjectInputStream(bis);
  807               }
  808           } catch (IOException e) {
  809               log.error(sm.getString("deltaManager.loading.ioe", e), e);
  810               if (ois != null) {
  811                   try {
  812                       ois.close();
  813                   } catch (IOException f) {
  814                       ;
  815                   }
  816                   ois = null;
  817               }
  818               throw e;
  819           }
  820           return ois;
  821       }
  822   
  823       /**
  824        * Save any currently active sessions in the appropriate persistence
  825        * mechanism, if any. If persistence is not supported, this method returns
  826        * without doing anything.
  827        * 
  828        * @exception IOException
  829        *                if an input/output error occurs
  830        */
  831       protected byte[] serializeSessions(Session[] currentSessions) throws IOException {
  832   
  833           // Open an output stream to the specified pathname, if any
  834           ByteArrayOutputStream fos = null;
  835           ObjectOutputStream oos = null;
  836   
  837           try {
  838               fos = new ByteArrayOutputStream();
  839               oos = new ObjectOutputStream(new BufferedOutputStream(fos));
  840               oos.writeObject(new Integer(currentSessions.length));
  841               for(int i=0 ; i < currentSessions.length;i++) {
  842                   ((DeltaSession)currentSessions[i]).writeObjectData(oos);                
  843               }
  844               // Flush and close the output stream
  845               oos.flush();
  846           } catch (IOException e) {
  847               log.error(sm.getString("deltaManager.unloading.ioe", e), e);
  848               throw e;
  849           } finally {
  850               if (oos != null) {
  851                   try {
  852                       oos.close();
  853                   } catch (IOException f) {
  854                       ;
  855                   }
  856                   oos = null;
  857               }
  858           }
  859           // send object data as byte[]
  860           return fos.toByteArray();
  861       }
  862   
  863       // ------------------------------------------------------ Lifecycle Methods
  864   
  865       /**
  866        * Add a lifecycle event listener to this component.
  867        * 
  868        * @param listener
  869        *            The listener to add
  870        */
  871       public void addLifecycleListener(LifecycleListener listener) {
  872   
  873           lifecycle.addLifecycleListener(listener);
  874   
  875       }
  876   
  877       /**
  878        * Get the lifecycle listeners associated with this lifecycle. If this
  879        * Lifecycle has no listeners registered, a zero-length array is returned.
  880        */
  881       public LifecycleListener[] findLifecycleListeners() {
  882   
  883           return lifecycle.findLifecycleListeners();
  884   
  885       }
  886   
  887       /**
  888        * Remove a lifecycle event listener from this component.
  889        * 
  890        * @param listener
  891        *            The listener to remove
  892        */
  893       public void removeLifecycleListener(LifecycleListener listener) {
  894   
  895           lifecycle.removeLifecycleListener(listener);
  896   
  897       }
  898   
  899       /**
  900        * Prepare for the beginning of active use of the public methods of this
  901        * component. This method should be called after <code>configure()</code>,
  902        * and before any of the public methods of the component are utilized.
  903        * 
  904        * @exception LifecycleException
  905        *                if this component detects a fatal error that prevents this
  906        *                component from being used
  907        */
  908       public void start() throws LifecycleException {
  909           if (!initialized)
  910               init();
  911   
  912           // Validate and update our current component state
  913           if (started) {
  914               return;
  915           }
  916           started = true;
  917           lifecycle.fireLifecycleEvent(START_EVENT, null);
  918   
  919           // Force initialization of the random number generator
  920           String dummy = generateSessionId();
  921   
  922           // Load unloaded sessions, if any
  923           try {
  924               //the channel is already running
  925               Cluster cluster = getCluster() ;
  926               // stop remove cluster binding
  927               if(cluster == null) {
  928                   Container context = getContainer() ;
  929                   if(context != null && context instanceof Context) {
  930                        Container host = context.getParent() ;
  931                        if(host != null && host instanceof Host) {
  932                            cluster = host.getCluster();
  933                            if(cluster != null && cluster instanceof CatalinaCluster) {
  934                                setCluster((CatalinaCluster) cluster) ;
  935                            } else {
  936                                Container engine = host.getParent() ;
  937                                if(engine != null && engine instanceof Engine) {
  938                                    cluster = engine.getCluster();
  939                                    if(cluster != null && cluster instanceof CatalinaCluster) {
  940                                            setCluster((CatalinaCluster) cluster) ;
  941                                    }
  942                                } else {
  943                                        cluster = null ;
  944                                }
  945                            }
  946                        }
  947                   }
  948               }
  949               if (cluster == null) {
  950                   log.error(sm.getString("deltaManager.noCluster", getName()));
  951                   return;
  952               } else {
  953                   if (log.isInfoEnabled()) {
  954                       String type = "unknown" ;
  955                       if( cluster.getContainer() instanceof Host){
  956                           type = "Host" ;
  957                       } else if( cluster.getContainer() instanceof Engine){
  958                           type = "Engine" ;
  959                       }
  960                       log.info(sm
  961                               .getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
  962                   }
  963               }
  964               if (log.isInfoEnabled())
  965                   log.info(sm
  966                           .getString("deltaManager.startClustering", getName()));
  967               //to survice context reloads, as only a stop/start is called, not
  968               // createManager
  969               ((CatalinaCluster)cluster).addManager(getName(), this);
  970   
  971               getAllClusterSessions();
  972   
  973           } catch (Throwable t) {
  974               log.error(sm.getString("deltaManager.managerLoad"), t);
  975           }
  976       }
  977   
  978       /**
  979        * get from first session master the backup from all clustered sessions
  980        * @see #findSessionMasterMember()
  981        */
  982       public synchronized void getAllClusterSessions() {
  983           if (cluster != null && cluster.getMembers().length > 0) {
  984               long beforeSendTime = System.currentTimeMillis();
  985               Member mbr = findSessionMasterMember();
  986               if(mbr == null) { // No domain member found
  987                    return;
  988               }
  989               SessionMessage msg = new SessionMessageImpl(this.getName(),
  990                       SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL",
  991                       "GET-ALL-" + getName());
  992               msg.setResend(ClusterMessage.FLAG_FORBIDDEN);
  993               // set reference time
  994               msg.setTimestamp(beforeSendTime);
  995               stateTransferCreateSendTime = beforeSendTime ;
  996               // request session state
  997               counterSend_EVT_GET_ALL_SESSIONS++;
  998               stateTransferred = false ;
  999               // FIXME This send call block the deploy thread, when sender waitForAck is enabled
 1000               try {
 1001                   synchronized(receivedMessageQueue) {
 1002                        receiverQueue = true ;
 1003                   }
 1004                   cluster.send(msg, mbr);
 1005                   if (log.isWarnEnabled())
 1006                       log.warn(sm.getString("deltaManager.waitForSessionState",
 1007                               getName(), mbr));
 1008                   // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
 1009                   waitForSendAllSessions(beforeSendTime);
 1010               } finally {
 1011                   synchronized(receivedMessageQueue) {
 1012                       for (Iterator iter = receivedMessageQueue.iterator(); iter
 1013                               .hasNext();) {
 1014                           SessionMessage smsg = (SessionMessage) iter.next();
 1015                           if (!stateTimestampDrop) {
 1016                               messageReceived(smsg,
 1017                                       smsg.getAddress() != null ? (Member) smsg
 1018                                               .getAddress() : null);
 1019                           } else {
 1020                               if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS
 1021                                       && smsg.getTimestamp() >= stateTransferCreateSendTime) {
 1022                                   // FIXME handle EVT_GET_ALL_SESSIONS later
 1023                                   messageReceived(
 1024                                           smsg,
 1025                                           smsg.getAddress() != null ? (Member) smsg
 1026                                                   .getAddress()
 1027                                                   : null);
 1028                               } else {
 1029                                   if (log.isWarnEnabled()) {
 1030                                       log.warn(sm.getString(
 1031                                               "deltaManager.dropMessage",
 1032                                               getName(), smsg
 1033                                                       .getEventTypeString(),
 1034                                               new Date(stateTransferCreateSendTime), new Date(
 1035                                                       smsg.getTimestamp())));
 1036                                   }
 1037                               }
 1038                           }
 1039                       }        
 1040                       receivedMessageQueue.clear();
 1041                       receiverQueue = false ;
 1042                   }
 1043              }
 1044           } else {
 1045               if (log.isInfoEnabled())
 1046                   log.info(sm.getString("deltaManager.noMembers", getName()));
 1047           }
 1048       }
 1049   
 1050       /**
 1051        * Find the master of the session state
 1052        * @return master member of sessions 
 1053        */
 1054       protected Member findSessionMasterMember() {
 1055           Member mbr = null;
 1056           Member mbrs[] = cluster.getMembers();
 1057           String localMemberDomain = cluster.getMembershipService().getLocalMember().getDomain();
 1058           if(isSendClusterDomainOnly()) {
 1059               for (int i = 0; mbr == null && i < mbrs.length; i++) {
 1060                   Member member = mbrs[i];
 1061                   if(localMemberDomain.equals(member.getDomain()))
 1062                       mbr = member ;
 1063               }
 1064           } else {
 1065               // FIXME Why only the first Member?
 1066               if(mbrs.length != 0 )
 1067                   mbr = mbrs[0];
 1068           }
 1069           if(mbr == null && log.isWarnEnabled())
 1070              log.warn(sm.getString("deltaManager.noMasterMember",
 1071                       getName(), localMemberDomain));
 1072           if(mbr != null && log.isDebugEnabled())
 1073               log.warn(sm.getString("deltaManager.foundMasterMember",
 1074                        getName(), mbr));
 1075           return mbr;
 1076       }
 1077   
 1078       /**
 1079        * Wait that cluster session state is transfer or timeout after 60 Sec
 1080        * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
 1081        */
 1082       protected void waitForSendAllSessions(long beforeSendTime) {
 1083           long reqStart = System.currentTimeMillis();
 1084           long reqNow = reqStart ;
 1085           boolean isTimeout = false;
 1086           if(getStateTransferTimeout() > 0) {
 1087               // wait that state is transfered with timeout check
 1088               do {
 1089                   try {
 1090                       Thread.sleep(100);
 1091                   } catch (Exception sleep) {
 1092                   }
 1093                   reqNow = System.currentTimeMillis();
 1094                   isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
 1095               } while ((!getStateTransferred()) && (!isTimeout));
 1096           } else {
 1097               if(getStateTransferTimeout() == -1) {
 1098                   // wait that state is transfered
 1099                   do {
 1100                       try {
 1101                           Thread.sleep(100);
 1102                       } catch (Exception sleep) {
 1103                       }
 1104                   } while ((!getStateTransferred()));
 1105                   reqNow = System.currentTimeMillis();
 1106               }
 1107           }
 1108           if (isTimeout || (!getStateTransferred())) {
 1109               counterNoStateTransfered++ ;
 1110               log.error(sm.getString("deltaManager.noSessionState",
 1111                       getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
 1112           } else {
 1113               if (log.isInfoEnabled())
 1114                   log.info(sm.getString("deltaManager.sessionReceived",
 1115                           getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
 1116           }
 1117       }
 1118   
 1119       /**
 1120        * Gracefully terminate the active use of the public methods of this
 1121        * component. This method should be the last one called on a given instance
 1122        * of this component.
 1123        * 
 1124        * @exception LifecycleException
 1125        *                if this component detects a fatal error that needs to be
 1126        *                reported
 1127        */
 1128       public void stop() throws LifecycleException {
 1129   
 1130           if (log.isDebugEnabled())
 1131               log.debug(sm.getString("deltaManager.stopped", getName()));
 1132   
 1133   
 1134           // Validate and update our current component state
 1135           if (!started)
 1136               throw new LifecycleException(sm
 1137                       .getString("deltaManager.notStarted"));
 1138           lifecycle.fireLifecycleEvent(STOP_EVENT, null);
 1139           started = false;
 1140   
 1141           // Expire all active sessions
 1142           if (log.isInfoEnabled())
 1143               log.info(sm.getString("deltaManager.expireSessions", getName()));
 1144           Session sessions[] = findSessions();
 1145           for (int i = 0; i < sessions.length; i++) {
 1146               DeltaSession session = (DeltaSession) sessions[i];
 1147               if (!session.isValid())
 1148                   continue;
 1149               try {
 1150                   session.expire(true, isExpireSessionsOnShutdown());
 1151               } catch (Throwable ignore) {
 1152                   ;
 1153               } 
 1154           }
 1155   
 1156           // Require a new random number generator if we are restarted
 1157           this.random = null;
 1158           getCluster().removeManager(getName(),this);
 1159           if (initialized) {
 1160               destroy();
 1161           }
 1162       }
 1163   
 1164       // ----------------------------------------- PropertyChangeListener Methods
 1165   
 1166       /**
 1167        * Process property change events from our associated Context.
 1168        * 
 1169        * @param event
 1170        *            The property change event that has occurred
 1171        */
 1172       public void propertyChange(PropertyChangeEvent event) {
 1173   
 1174           // Validate the source of this event
 1175           if (!(event.getSource() instanceof Context))
 1176               return;
 1177           Context context = (Context) event.getSource();
 1178   
 1179           // Process a relevant property change
 1180           if (event.getPropertyName().equals("sessionTimeout")) {
 1181               try {
 1182                   setMaxInactiveInterval(((Integer) event.getNewValue())
 1183                           .intValue() * 60);
 1184               } catch (NumberFormatException e) {
 1185                   log.error(sm.getString("deltaManager.sessionTimeout", event
 1186                           .getNewValue()));
 1187               }
 1188           }
 1189   
 1190       }
 1191   
 1192       // -------------------------------------------------------- Replication
 1193       // Methods
 1194   
 1195       /**
 1196        * A message was received from another node, this is the callback method to
 1197        * implement if you are interested in receiving replication messages.
 1198        * 
 1199        * @param cmsg -
 1200        *            the message received.
 1201        */
 1202       public void messageDataReceived(ClusterMessage cmsg) {
 1203           if (cmsg != null && cmsg instanceof SessionMessage) {
 1204               SessionMessage msg = (SessionMessage) cmsg;
 1205               switch (msg.getEventType()) {
 1206               case SessionMessage.EVT_GET_ALL_SESSIONS:
 1207               case SessionMessage.EVT_SESSION_CREATED: 
 1208               case SessionMessage.EVT_SESSION_EXPIRED: 
 1209               case SessionMessage.EVT_SESSION_ACCESSED:
 1210               case SessionMessage.EVT_SESSION_DELTA: {
 1211                   synchronized(receivedMessageQueue) {
 1212                       if(receiverQueue) {
 1213                           receivedMessageQueue.add(msg);
 1214                           return ;
 1215                       }
 1216                   }
 1217                  break;
 1218               }
 1219               default: {
 1220                   //we didn't queue, do nothing
 1221                   break;
 1222               }
 1223               } //switch
 1224               
 1225               messageReceived(msg, msg.getAddress() != null ? (Member) msg
 1226                       .getAddress() : null);
 1227           }
 1228       }
 1229   
 1230       /**
 1231        * When the request has been completed, the replication valve will notify
 1232        * the manager, and the manager will decide whether any replication is
 1233        * needed or not. If there is a need for replication, the manager will
 1234        * create a session message and that will be replicated. The cluster
 1235        * determines where it gets sent.
 1236        * 
 1237        * @param sessionId -
 1238        *            the sessionId that just completed.
 1239        * @return a SessionMessage to be sent,
 1240        */
 1241       public ClusterMessage requestCompleted(String sessionId) {
 1242           try {
 1243               DeltaSession session = (DeltaSession) findSession(sessionId);
 1244               DeltaRequest deltaRequest = session.getDeltaRequest();
 1245               SessionMessage msg = null;
 1246               if (deltaRequest.getSize() > 0) {
 1247   
 1248                   counterSend_EVT_SESSION_DELTA++;
 1249                   byte[] data = unloadDeltaRequest(deltaRequest);
 1250                   msg = new SessionMessageImpl(name,
 1251                           SessionMessage.EVT_SESSION_DELTA, data, sessionId,
 1252                           sessionId + "-" + System.currentTimeMillis());
 1253                   session.resetDeltaRequest();
 1254                   if (log.isDebugEnabled()) {
 1255                       log.debug(sm.getString(
 1256                               "deltaManager.createMessage.delta",
 1257                               getName(), sessionId));
 1258                   }
 1259                   
 1260               } else if (!session.isPrimarySession()) {
 1261                   counterSend_EVT_SESSION_ACCESSED++;
 1262                   msg = new SessionMessageImpl(getName(),
 1263                           SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
 1264                           sessionId + "-" + System.currentTimeMillis());
 1265                   if (log.isDebugEnabled()) {
 1266                       log.debug(sm.getString(
 1267                               "deltaManager.createMessage.accessChangePrimary",
 1268                               getName(), sessionId));
 1269                   }
 1270               }
 1271               session.setPrimarySession(true);
 1272               //check to see if we need to send out an access message
 1273               if ((msg == null)) {
 1274                   long replDelta = System.currentTimeMillis()
 1275                           - session.getLastTimeReplicated();
 1276                   if (replDelta > (getMaxInactiveInterval() * 1000)) {
 1277                       counterSend_EVT_SESSION_ACCESSED++;
 1278                       msg = new SessionMessageImpl(getName(),
 1279                               SessionMessage.EVT_SESSION_ACCESSED, null,
 1280                               sessionId, sessionId + "-" + System.currentTimeMillis());
 1281                       if (log.isDebugEnabled()) {
 1282                           log.debug(sm.getString(
 1283                                   "deltaManager.createMessage.access", getName(),
 1284                                   sessionId));
 1285                       }
 1286                   }
 1287   
 1288               }
 1289   
 1290               //update last replicated time
 1291               if (msg != null)
 1292                   session.setLastTimeReplicated(System.currentTimeMillis());
 1293               return msg;
 1294           } catch (IOException x) {
 1295               log.error(sm.getString(
 1296                       "deltaManager.createMessage.unableCreateDeltaRequest",
 1297                       sessionId), x);
 1298               return null;
 1299           }
 1300   
 1301       }
 1302       /**
 1303        * Reset manager statistics
 1304        */
 1305       public synchronized void resetStatistics() {
 1306           processingTime = 0 ;
 1307           expiredSessions = 0 ;
 1308           rejectedSessions = 0 ;
 1309           sessionReplaceCounter = 0 ;
 1310           counterNoStateTransfered = 0 ;
 1311           maxActive = getActiveSessions() ;
 1312           sessionCounter = getActiveSessions() ;
 1313           counterReceive_EVT_ALL_SESSION_DATA = 0;
 1314           counterReceive_EVT_GET_ALL_SESSIONS = 0;
 1315           counterReceive_EVT_SESSION_ACCESSED = 0 ;
 1316           counterReceive_EVT_SESSION_CREATED = 0 ;
 1317           counterReceive_EVT_SESSION_DELTA = 0 ;
 1318           counterReceive_EVT_SESSION_EXPIRED = 0 ;
 1319           counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
 1320           counterSend_EVT_ALL_SESSION_DATA = 0;
 1321           counterSend_EVT_GET_ALL_SESSIONS = 0;
 1322           counterSend_EVT_SESSION_ACCESSED = 0 ;
 1323           counterSend_EVT_SESSION_CREATED = 0 ;
 1324           counterSend_EVT_SESSION_DELTA = 0 ;
 1325           counterSend_EVT_SESSION_EXPIRED = 0 ;
 1326           counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
 1327           
 1328       }
 1329      
 1330       //  -------------------------------------------------------- persistence handler
 1331   
 1332       public void load() {
 1333   
 1334       }
 1335   
 1336       public void unload() {
 1337   
 1338       }
 1339   
 1340       //  -------------------------------------------------------- expire
 1341   
 1342       /**
 1343        * send session expired to other cluster nodes
 1344        * 
 1345        * @param id
 1346        *            session id
 1347        */
 1348       protected void sessionExpired(String id) {
 1349           counterSend_EVT_SESSION_EXPIRED++ ;
 1350           SessionMessage msg = new SessionMessageImpl(getName(),
 1351                   SessionMessage.EVT_SESSION_EXPIRED, null, id, id
 1352                           + "-EXPIRED-MSG");
 1353           if (log.isDebugEnabled())
 1354               log.debug(sm.getString("deltaManager.createMessage.expire",
 1355                       getName(), id));
 1356           send(msg);
 1357       }
 1358   
 1359       /**
 1360        * Exipre all find sessions.
 1361        */
 1362       public void expireAllLocalSessions()
 1363       {
 1364           long timeNow = System.currentTimeMillis();
 1365           Session sessions[] = findSessions();
 1366           int expireDirect  = 0 ;
 1367           int expireIndirect = 0 ;
 1368           
 1369           if(log.isDebugEnabled())
 1370               log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
 1371           for (int i = 0; i < sessions.length; i++) {
 1372               if (sessions[i] instanceof DeltaSession) {
 1373                   DeltaSession session = (DeltaSession) sessions[i];
 1374                   if (session.isPrimarySession()) {
 1375                       if (session.isValid()) {
 1376                           session.expire();
 1377                           expireDirect++;
 1378                       } else {
 1379                           expireIndirect++;
 1380                       }
 1381                   }
 1382               }
 1383           }
 1384           long timeEnd = System.currentTimeMillis();
 1385           if(log.isDebugEnabled())
 1386                log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
 1387         
 1388       }
 1389       
 1390       /**
 1391        * When the manager expires session not tied to a request. The cluster will
 1392        * periodically ask for a list of sessions that should expire and that
 1393        * should be sent across the wire.
 1394        * 
 1395        * @return The invalidated sessions array
 1396        */
 1397       public String[] getInvalidatedSessions() {
 1398           return new String[0];
 1399       }
 1400   
 1401       //  -------------------------------------------------------- message receive
 1402   
 1403       /**
 1404        * Test that sender and local domain is the same
 1405        */
 1406       protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
 1407           String localMemberDomain = cluster.getMembershipService().getLocalMember().getDomain();
 1408           boolean sameDomain= localMemberDomain.equals(sender.getDomain());
 1409           if (!sameDomain && log.isWarnEnabled()) {
 1410                   log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
 1411                           new Object[] {getName(), 
 1412                           msg.getEventTypeString(), 
 1413                           sender,
 1414                           sender.getDomain(),
 1415                           localMemberDomain }
 1416                   ));
 1417           }
 1418           return sameDomain ;
 1419       }
 1420   
 1421       /**
 1422        * This method is called by the received thread when a SessionMessage has
 1423        * been received from one of the other nodes in the cluster.
 1424        * 
 1425        * @param msg -
 1426        *            the message received
 1427        * @param sender -
 1428        *            the sender of the message, this is used if we receive a
 1429        *            EVT_GET_ALL_SESSION message, so that we only reply to the
 1430        *            requesting node
 1431        */
 1432       protected void messageReceived(SessionMessage msg, Member sender) {
 1433           if(isSendClusterDomainOnly() && !checkSenderDomain(msg,sender)) {
 1434               return;
 1435           }
 1436           try {
 1437               if (log.isDebugEnabled())
 1438                   log.debug(sm.getString("deltaManager.receiveMessage.eventType",
 1439                           getName(), msg.getEventTypeString(), sender));
 1440    
 1441               switch (msg.getEventType()) {
 1442               case SessionMessage.EVT_GET_ALL_SESSIONS: {
 1443                   handleGET_ALL_SESSIONS(msg,sender);
 1444                   break;
 1445               }
 1446               case SessionMessage.EVT_ALL_SESSION_DATA: {
 1447                   handleALL_SESSION_DATA(msg,sender);
 1448                   break;
 1449               }
 1450               case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
 1451                   handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
 1452                   break;
 1453               }
 1454               case SessionMessage.EVT_SESSION_CREATED: {
 1455                   handleSESSION_CREATED(msg,sender);
 1456                   break;
 1457               }
 1458               case SessionMessage.EVT_SESSION_EXPIRED: {
 1459                   handleSESSION_EXPIRRED(msg,sender);
 1460                   break;
 1461               }
 1462               case SessionMessage.EVT_SESSION_ACCESSED: {
 1463                   handleSESSION_ACCESSED(msg,sender);
 1464                   break;
 1465               }
 1466               case SessionMessage.EVT_SESSION_DELTA: {
 1467                  handleSESSION_DELTA(msg,sender);
 1468                  break;
 1469               }
 1470               default: {
 1471                   //we didn't recognize the message type, do nothing
 1472                   break;
 1473               }
 1474               } //switch
 1475           } catch (Exception x) {
 1476               log.error(sm.getString("deltaManager.receiveMessage.error",
 1477                       getName()), x);
 1478           }
 1479       }
 1480   
 1481       // -------------------------------------------------------- message receiver handler
 1482   
 1483   
 1484       /**
 1485        * handle receive session state is complete transfered
 1486        * @param msg
 1487        * @param sender
 1488        */
 1489       protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
 1490           counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
 1491           if (log.isDebugEnabled())
 1492               log.debug(sm.getString(
 1493                       "deltaManager.receiveMessage.transfercomplete",
 1494                       getName(), sender.getHost(), new Integer(sender.getPort())));
 1495           stateTransferCreateSendTime = msg.getTimestamp() ;
 1496           stateTransferred = true ;
 1497       }
 1498   
 1499       /**
 1500        * handle receive session delta
 1501        * @param msg
 1502        * @param sender
 1503        * @throws IOException
 1504        * @throws ClassNotFoundException
 1505        */
 1506       protected void handleSESSION_DELTA(SessionMessage msg, Member sender)
 1507               throws IOException, ClassNotFoundException {
 1508           counterReceive_EVT_SESSION_DELTA++;
 1509           byte[] delta = msg.getSession();
 1510           DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
 1511           if (session != null) {
 1512               if (log.isDebugEnabled())
 1513                   log.debug(sm.getString("deltaManager.receiveMessage.delta",
 1514                       getName(), msg.getSessionID()));
 1515               DeltaRequest dreq = loadDeltaRequest(session, delta);
 1516               dreq.execute(session, notifyListenersOnReplication);
 1517               session.setPrimarySession(false);
 1518           }
 1519       }
 1520   
 1521       /**
 1522        * handle receive session is access at other node ( primary session is now false)
 1523        * @param msg
 1524        * @param sender
 1525        * @throws IOException
 1526        */
 1527       protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
 1528           counterReceive_EVT_SESSION_ACCESSED++;
 1529           DeltaSession session = (DeltaSession) findSession(msg
 1530                   .getSessionID());
 1531           if (session != null) {
 1532               if (log.isDebugEnabled())
 1533                   log.debug(sm.getString(
 1534                           "deltaManager.receiveMessage.accessed",
 1535                           getName(), msg.getSessionID()));
 1536               session.access();
 1537               session.setPrimarySession(false);
 1538               session.endAccess();
 1539           }
 1540       }
 1541   
 1542       /**
 1543        * handle receive session is expire at other node ( expire session also here)
 1544        * @param msg
 1545        * @param sender
 1546        * @throws IOException
 1547        */
 1548       protected void handleSESSION_EXPIRRED(SessionMessage msg,Member sender) throws IOException {
 1549           counterReceive_EVT_SESSION_EXPIRED++;
 1550           DeltaSession session = (DeltaSession) findSession(msg
 1551                   .getSessionID());
 1552           if (session != null) {
 1553               if (log.isDebugEnabled())
 1554                   log.debug(sm.getString(
 1555                           "deltaManager.receiveMessage.expired",
 1556                           getName(), msg.getSessionID()));
 1557               session.expire(notifySessionListenersOnReplication, false);
 1558           }
 1559       }
 1560   
 1561       /**
 1562        * handle receive new session is created at other node (create backup - primary false)
 1563        * @param msg
 1564        * @param sender
 1565        */
 1566       protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
 1567           counterReceive_EVT_SESSION_CREATED++;
 1568           if (log.isDebugEnabled())
 1569               log.debug(sm.getString(
 1570                       "deltaManager.receiveMessage.createNewSession",
 1571                       getName(), msg.getSessionID()));
 1572           DeltaSession session = (DeltaSession) createEmptySession();
 1573           session.setManager(this);
 1574           session.setValid(true);
 1575           session.setPrimarySession(false);
 1576           session.access();
 1577           if(notifySessionListenersOnReplication)
 1578               session.setId(msg.getSessionID());
 1579           else
 1580               session.setIdInternal(msg.getSessionID());
 1581           session.resetDeltaRequest();
 1582           session.endAccess();
 1583   
 1584       }
 1585   
 1586       /**
 1587        * handle receive sessions from other not ( restart )
 1588        * @param msg
 1589        * @param sender
 1590        * @throws ClassNotFoundException
 1591        * @throws IOException
 1592        */
 1593       protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
 1594           counterReceive_EVT_ALL_SESSION_DATA++;
 1595           if (log.isDebugEnabled())
 1596               log.debug(sm.getString(
 1597                       "deltaManager.receiveMessage.allSessionDataBegin",
 1598                       getName()));
 1599           byte[] data = msg.getSession();
 1600           deserializeSessions(data);
 1601           if (log.isDebugEnabled())
 1602               log.debug(sm.getString(
 1603                       "deltaManager.receiveMessage.allSessionDataAfter",
 1604                       getName()));
 1605           //stateTransferred = true;
 1606       }
 1607   
 1608       /**
 1609        * handle receive that other node want all sessions ( restart )
 1610        * a) send all sessions with one message
 1611        * b) send session at blocks
 1612        * After sending send state is complete transfered
 1613        * @param msg
 1614        * @param sender
 1615        * @throws IOException
 1616        */
 1617       protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender)
 1618               throws IOException {
 1619           counterReceive_EVT_GET_ALL_SESSIONS++;
 1620           //get a list of all the session from this manager
 1621           if (log.isDebugEnabled())
 1622               log.debug(sm.getString(
 1623                       "deltaManager.receiveMessage.unloadingBegin", getName()));
 1624           // Write the number of active sessions, followed by the details
 1625           // get all sessions and serialize without sync
 1626           Session[] currentSessions = findSessions();
 1627           long findSessionTimestamp = System.currentTimeMillis() ;
 1628           if (isSendAllSessions()) {
 1629               sendSessions(sender, currentSessions, findSessionTimestamp);
 1630           } else {
 1631               // send session at blocks
 1632               int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length
 1633                       : getSendAllSessionsSize();
 1634               Session[] sendSessions = new Session[len];
 1635               for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
 1636                   len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length
 1637                           - i 
 1638                           : getSendAllSessionsSize();
 1639                   System.arraycopy(currentSessions, i, sendSessions, 0, len);
 1640                   sendSessions(sender, sendSessions,findSessionTimestamp);
 1641                   if (getSendAllSessionsWaitTime() > 0) {
 1642                       try {
 1643                           Thread.sleep(getSendAllSessionsWaitTime());
 1644                       } catch (Exception sleep) {
 1645                       }
 1646                   }
 1647               }
 1648           }
 1649           
 1650           SessionMessage newmsg = new SessionMessageImpl(name,
 1651                   SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,
 1652                   "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"
 1653                           + getName());
 1654           newmsg.setTimestamp(findSessionTimestamp);
 1655           if (log.isDebugEnabled())
 1656               log.debug(sm.getString(
 1657                       "deltaManager.createMessage.allSessionTransfered",
 1658                       getName()));
 1659           counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
 1660           cluster.send(newmsg, sender);
 1661       }
 1662   
 1663   
 1664       /**
 1665        * send a block of session to sender
 1666        * @param sender
 1667        * @param currentSessions
 1668        * @param sendTimestamp
 1669        * @throws IOException
 1670        */
 1671       protected void sendSessions(Member sender, Session[] currentSessions,
 1672               long sendTimestamp) throws IOException {
 1673           byte[] data = serializeSessions(currentSessions);
 1674           if (log.isDebugEnabled())
 1675               log.debug(sm.getString(
 1676                       "deltaManager.receiveMessage.unloadingAfter",
 1677                       getName()));
 1678           SessionMessage newmsg = new SessionMessageImpl(name,
 1679                   SessionMessage.EVT_ALL_SESSION_DATA, data,
 1680                   "SESSION-STATE", "SESSION-STATE-" + getName());
 1681           newmsg.setTimestamp(sendTimestamp);
 1682           //if(isSendSESSIONSTATEcompressed()) {
 1683           //    newmsg.setCompress(ClusterMessage.FLAG_ALLOWED);
 1684           //}
 1685           if (log.isDebugEnabled())
 1686               log.debug(sm.getString(
 1687                       "deltaManager.createMessage.allSessionData",
 1688                       getName()));
 1689           counterSend_EVT_ALL_SESSION_DATA++;
 1690           cluster.send(newmsg, sender);
 1691       }
 1692   
 1693   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » session » [javadoc | source]