Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » ha » session » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    * 
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    * 
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.catalina.ha.session;
   19   
   20   import java.beans.PropertyChangeEvent;
   21   import java.io.BufferedOutputStream;
   22   import java.io.ByteArrayOutputStream;
   23   import java.io.IOException;
   24   import java.io.ObjectInputStream;
   25   import java.io.ObjectOutputStream;
   26   import java.util.ArrayList;
   27   import java.util.Date;
   28   import java.util.Iterator;
   29   
   30   import org.apache.catalina.Cluster;
   31   import org.apache.catalina.Container;
   32   import org.apache.catalina.Context;
   33   import org.apache.catalina.Engine;
   34   import org.apache.catalina.Host;
   35   import org.apache.catalina.LifecycleException;
   36   import org.apache.catalina.LifecycleListener;
   37   import org.apache.catalina.Session;
   38   import org.apache.catalina.Valve;
   39   import org.apache.catalina.core.StandardContext;
   40   import org.apache.catalina.ha.CatalinaCluster;
   41   import org.apache.catalina.ha.ClusterMessage;
   42   import org.apache.catalina.ha.tcp.ReplicationValve;
   43   import org.apache.catalina.tribes.Member;
   44   import org.apache.catalina.tribes.io.ReplicationStream;
   45   import org.apache.catalina.util.LifecycleSupport;
   46   import org.apache.catalina.util.StringManager;
   47   import org.apache.catalina.ha.ClusterManager;
   48   
   49   /**
   50    * The DeltaManager manages replicated sessions by only replicating the deltas
   51    * in data. For applications written to handle this, the DeltaManager is the
   52    * optimal way of replicating data.
   53    * 
   54    * This code is almost identical to StandardManager with a difference in how it
   55    * persists sessions and some modifications to it.
   56    * 
   57    * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
   58    * reloading depends upon external calls to the <code>start()</code> and
   59    * <code>stop()</code> methods of this class at the correct times.
   60    * 
   61    * @author Filip Hanik
   62    * @author Craig R. McClanahan
   63    * @author Jean-Francois Arcand
   64    * @author Peter Rossbach
   65    * @version $Revision: 561326 $ $Date: 2007-07-31 15:35:35 +0200 (mar., 31 juil. 2007) $
   66    */
   67   
   68   public class DeltaManager extends ClusterManagerBase{
   69   
   70       // ---------------------------------------------------- Security Classes
   71       public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
   72   
   73       /**
   74        * The string manager for this package.
   75        */
   76       protected static StringManager sm = StringManager.getManager(Constants.Package);
   77   
   78       // ----------------------------------------------------- Instance Variables
   79   
   80       /**
   81        * The descriptive information about this implementation.
   82        */
   83       private static final String info = "DeltaManager/2.1";
   84   
   85       /**
   86        * Has this component been started yet?
   87        */
   88       private boolean started = false;
   89   
   90       /**
   91        * The descriptive name of this Manager implementation (for logging).
   92        */
   93       protected static String managerName = "DeltaManager";
   94       protected String name = null;
   95       protected boolean defaultMode = false;
   96       private CatalinaCluster cluster = null;
   97   
   98       /**
   99        * cached replication valve cluster container!
  100        */
  101       private ReplicationValve replicationValve = null ;
  102       
  103       /**
  104        * The lifecycle event support for this component.
  105        */
  106       protected LifecycleSupport lifecycle = new LifecycleSupport(this);
  107   
  108       /**
  109        * The maximum number of active Sessions allowed, or -1 for no limit.
  110        */
  111       private int maxActiveSessions = -1;
  112       private boolean expireSessionsOnShutdown = false;
  113       private boolean notifyListenersOnReplication = true;
  114       private boolean notifySessionListenersOnReplication = true;
  115       private boolean stateTransfered = false ;
  116       private int stateTransferTimeout = 60;
  117       private boolean sendAllSessions = true;
  118       private boolean sendClusterDomainOnly = true ;
  119       private int sendAllSessionsSize = 1000 ;
  120       
  121       /**
  122        * wait time between send session block (default 2 sec) 
  123        */
  124       private int sendAllSessionsWaitTime = 2 * 1000 ; 
  125       private ArrayList receivedMessageQueue = new ArrayList() ;
  126       private boolean receiverQueue = false ;
  127       private boolean stateTimestampDrop = true ;
  128       private long stateTransferCreateSendTime; 
  129       
  130       // ------------------------------------------------------------------ stats attributes
  131       
  132       int rejectedSessions = 0;
  133       private long sessionReplaceCounter = 0 ;
  134       long processingTime = 0;
  135       private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
  136       private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
  137       private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
  138       private long counterReceive_EVT_SESSION_CREATED = 0 ;
  139       private long counterReceive_EVT_SESSION_EXPIRED = 0;
  140       private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
  141       private long counterReceive_EVT_SESSION_DELTA = 0;
  142       private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
  143       private long counterSend_EVT_SESSION_CREATED = 0;
  144       private long counterSend_EVT_SESSION_DELTA = 0 ;
  145       private long counterSend_EVT_SESSION_ACCESSED = 0;
  146       private long counterSend_EVT_SESSION_EXPIRED = 0;
  147       private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
  148       private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
  149       private int counterNoStateTransfered = 0 ;
  150   
  151   
  152       // ------------------------------------------------------------- Constructor
  153       public DeltaManager() {
  154           super();
  155       }
  156   
  157       // ------------------------------------------------------------- Properties
  158       
  159       /**
  160        * Return descriptive information about this Manager implementation and the
  161        * corresponding version number, in the format
  162        * <code>&lt;description&gt;/&lt;version&gt;</code>.
  163        */
  164       public String getInfo() {
  165           return info;
  166       }
  167   
  168       public void setName(String name) {
  169           this.name = name;
  170       }
  171   
  172       /**
  173        * Return the descriptive short name of this Manager implementation.
  174        */
  175       public String getName() {
  176           return name;
  177       }
  178   
  179       /**
  180        * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
  181        */
  182       public long getCounterSend_EVT_GET_ALL_SESSIONS() {
  183           return counterSend_EVT_GET_ALL_SESSIONS;
  184       }
  185       
  186       /**
  187        * @return Returns the counterSend_EVT_SESSION_ACCESSED.
  188        */
  189       public long getCounterSend_EVT_SESSION_ACCESSED() {
  190           return counterSend_EVT_SESSION_ACCESSED;
  191       }
  192       
  193       /**
  194        * @return Returns the counterSend_EVT_SESSION_CREATED.
  195        */
  196       public long getCounterSend_EVT_SESSION_CREATED() {
  197           return counterSend_EVT_SESSION_CREATED;
  198       }
  199   
  200       /**
  201        * @return Returns the counterSend_EVT_SESSION_DELTA.
  202        */
  203       public long getCounterSend_EVT_SESSION_DELTA() {
  204           return counterSend_EVT_SESSION_DELTA;
  205       }
  206   
  207       /**
  208        * @return Returns the counterSend_EVT_SESSION_EXPIRED.
  209        */
  210       public long getCounterSend_EVT_SESSION_EXPIRED() {
  211           return counterSend_EVT_SESSION_EXPIRED;
  212       }
  213    
  214       /**
  215        * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
  216        */
  217       public long getCounterSend_EVT_ALL_SESSION_DATA() {
  218           return counterSend_EVT_ALL_SESSION_DATA;
  219       }
  220   
  221       /**
  222        * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
  223        */
  224       public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
  225           return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
  226       }
  227    
  228       /**
  229        * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
  230        */
  231       public long getCounterReceive_EVT_ALL_SESSION_DATA() {
  232           return counterReceive_EVT_ALL_SESSION_DATA;
  233       }
  234       
  235       /**
  236        * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
  237        */
  238       public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
  239           return counterReceive_EVT_GET_ALL_SESSIONS;
  240       }
  241       
  242       /**
  243        * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
  244        */
  245       public long getCounterReceive_EVT_SESSION_ACCESSED() {
  246           return counterReceive_EVT_SESSION_ACCESSED;
  247       }
  248       
  249       /**
  250        * @return Returns the counterReceive_EVT_SESSION_CREATED.
  251        */
  252       public long getCounterReceive_EVT_SESSION_CREATED() {
  253           return counterReceive_EVT_SESSION_CREATED;
  254       }
  255       
  256       /**
  257        * @return Returns the counterReceive_EVT_SESSION_DELTA.
  258        */
  259       public long getCounterReceive_EVT_SESSION_DELTA() {
  260           return counterReceive_EVT_SESSION_DELTA;
  261       }
  262       
  263       /**
  264        * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
  265        */
  266       public long getCounterReceive_EVT_SESSION_EXPIRED() {
  267           return counterReceive_EVT_SESSION_EXPIRED;
  268       }
  269       
  270       
  271       /**
  272        * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
  273        */
  274       public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
  275           return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
  276       }
  277       
  278       /**
  279        * @return Returns the processingTime.
  280        */
  281       public long getProcessingTime() {
  282           return processingTime;
  283       }
  284    
  285       /**
  286        * @return Returns the sessionReplaceCounter.
  287        */
  288       public long getSessionReplaceCounter() {
  289           return sessionReplaceCounter;
  290       }
  291       
  292       /**
  293        * Number of session creations that failed due to maxActiveSessions
  294        * 
  295        * @return The count
  296        */
  297       public int getRejectedSessions() {
  298           return rejectedSessions;
  299       }
  300   
  301       public void setRejectedSessions(int rejectedSessions) {
  302           this.rejectedSessions = rejectedSessions;
  303       }
  304   
  305       /**
  306        * @return Returns the counterNoStateTransfered.
  307        */
  308       public int getCounterNoStateTransfered() {
  309           return counterNoStateTransfered;
  310       }
  311       
  312       public int getReceivedQueueSize() {
  313           return receivedMessageQueue.size() ;
  314       }
  315       
  316       /**
  317        * @return Returns the stateTransferTimeout.
  318        */
  319       public int getStateTransferTimeout() {
  320           return stateTransferTimeout;
  321       }
  322       /**
  323        * @param timeoutAllSession The timeout
  324        */
  325       public void setStateTransferTimeout(int timeoutAllSession) {
  326           this.stateTransferTimeout = timeoutAllSession;
  327       }
  328   
  329       /**
  330        * is session state transfered complete?
  331        * 
  332        */
  333       public boolean getStateTransfered() {
  334           return stateTransfered;
  335       }
  336   
  337       /**
  338        * set that state ist complete transfered  
  339        * @param stateTransfered
  340        */
  341       public void setStateTransfered(boolean stateTransfered) {
  342           this.stateTransfered = stateTransfered;
  343       }
  344       
  345       /**
  346        * @return Returns the sendAllSessionsWaitTime in msec
  347        */
  348       public int getSendAllSessionsWaitTime() {
  349           return sendAllSessionsWaitTime;
  350       }
  351       
  352       /**
  353        * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
  354        */
  355       public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
  356           this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
  357       }
  358       
  359       /**
  360        * @return Returns the sendClusterDomainOnly.
  361        */
  362       public boolean doDomainReplication() {
  363           return sendClusterDomainOnly;
  364       }
  365       
  366       /**
  367        * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
  368        */
  369       public void setDomainReplication(boolean sendClusterDomainOnly) {
  370           this.sendClusterDomainOnly = sendClusterDomainOnly;
  371       }
  372   
  373       /**
  374        * @return Returns the stateTimestampDrop.
  375        */
  376       public boolean isStateTimestampDrop() {
  377           return stateTimestampDrop;
  378       }
  379       
  380       /**
  381        * @param isTimestampDrop The new flag value
  382        */
  383       public void setStateTimestampDrop(boolean isTimestampDrop) {
  384           this.stateTimestampDrop = isTimestampDrop;
  385       }
  386       
  387       /**
  388        * Return the maximum number of active Sessions allowed, or -1 for no limit.
  389        */
  390       public int getMaxActiveSessions() {
  391           return (this.maxActiveSessions);
  392       }
  393   
  394       /**
  395        * Set the maximum number of actives Sessions allowed, or -1 for no limit.
  396        * 
  397        * @param max
  398        *            The new maximum number of sessions
  399        */
  400       public void setMaxActiveSessions(int max) {
  401           int oldMaxActiveSessions = this.maxActiveSessions;
  402           this.maxActiveSessions = max;
  403           support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
  404       }
  405       
  406       /**
  407        * 
  408        * @return Returns the sendAllSessions.
  409        */
  410       public boolean isSendAllSessions() {
  411           return sendAllSessions;
  412       }
  413       
  414       /**
  415        * @param sendAllSessions The sendAllSessions to set.
  416        */
  417       public void setSendAllSessions(boolean sendAllSessions) {
  418           this.sendAllSessions = sendAllSessions;
  419       }
  420       
  421       /**
  422        * @return Returns the sendAllSessionsSize.
  423        */
  424       public int getSendAllSessionsSize() {
  425           return sendAllSessionsSize;
  426       }
  427       
  428       /**
  429        * @param sendAllSessionsSize The sendAllSessionsSize to set.
  430        */
  431       public void setSendAllSessionsSize(int sendAllSessionsSize) {
  432           this.sendAllSessionsSize = sendAllSessionsSize;
  433       }
  434       
  435       /**
  436        * @return Returns the notifySessionListenersOnReplication.
  437        */
  438       public boolean isNotifySessionListenersOnReplication() {
  439           return notifySessionListenersOnReplication;
  440       }
  441       
  442       /**
  443        * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
  444        */
  445       public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) {
  446           this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
  447       }
  448       
  449       
  450       public boolean isExpireSessionsOnShutdown() {
  451           return expireSessionsOnShutdown;
  452       }
  453   
  454       public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
  455           this.expireSessionsOnShutdown = expireSessionsOnShutdown;
  456       }
  457       
  458       public boolean isNotifyListenersOnReplication() {
  459           return notifyListenersOnReplication;
  460       }
  461   
  462       public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
  463           this.notifyListenersOnReplication = notifyListenersOnReplication;
  464       }
  465   
  466       
  467       /**
  468        * @return Returns the defaultMode.
  469        */
  470       public boolean isDefaultMode() {
  471           return defaultMode;
  472       }
  473       /**
  474        * @param defaultMode The defaultMode to set.
  475        */
  476       public void setDefaultMode(boolean defaultMode) {
  477           this.defaultMode = defaultMode;
  478       }
  479       
  480       public CatalinaCluster getCluster() {
  481           return cluster;
  482       }
  483   
  484       public void setCluster(CatalinaCluster cluster) {
  485           this.cluster = cluster;
  486       }
  487   
  488       /**
  489        * Set the Container with which this Manager has been associated. If it is a
  490        * Context (the usual case), listen for changes to the session timeout
  491        * property.
  492        * 
  493        * @param container
  494        *            The associated Container
  495        */
  496       public void setContainer(Container container) {
  497           // De-register from the old Container (if any)
  498           if ((this.container != null) && (this.container instanceof Context))
  499               ((Context) this.container).removePropertyChangeListener(this);
  500   
  501           // Default processing provided by our superclass
  502           super.setContainer(container);
  503   
  504           // Register with the new Container (if any)
  505           if ((this.container != null) && (this.container instanceof Context)) {
  506               setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
  507               ((Context) this.container).addPropertyChangeListener(this);
  508           }
  509   
  510       }
  511       
  512       // --------------------------------------------------------- Public Methods
  513   
  514       /**
  515        * Construct and return a new session object, based on the default settings
  516        * specified by this Manager's properties. The session id will be assigned
  517        * by this method, and available via the getId() method of the returned
  518        * session. If a new session cannot be created for any reason, return
  519        * <code>null</code>.
  520        * 
  521        * @exception IllegalStateException
  522        *                if a new session cannot be instantiated for any reason
  523        * 
  524        * Construct and return a new session object, based on the default settings
  525        * specified by this Manager's properties. The session id will be assigned
  526        * by this method, and available via the getId() method of the returned
  527        * session. If a new session cannot be created for any reason, return
  528        * <code>null</code>.
  529        * 
  530        * @exception IllegalStateException
  531        *                if a new session cannot be instantiated for any reason
  532        */
  533       public Session createSession(String sessionId) {
  534           return createSession(sessionId, true);
  535       }
  536   
  537       /**
  538        * create new session with check maxActiveSessions and send session creation
  539        * to other cluster nodes.
  540        * 
  541        * @param distribute
  542        * @return The session
  543        */
  544       public Session createSession(String sessionId, boolean distribute) {
  545           if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
  546               rejectedSessions++;
  547               throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
  548           }
  549           DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
  550           if (distribute) {
  551               sendCreateSession(session.getId(), session);
  552           }
  553           if (log.isDebugEnabled())
  554               log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
  555           return (session);
  556   
  557       }
  558   
  559       /**
  560        * Send create session evt to all backup node
  561        * @param sessionId
  562        * @param session
  563        */
  564       protected void sendCreateSession(String sessionId, DeltaSession session) {
  565           if(cluster.getMembers().length > 0 ) {
  566               SessionMessage msg = 
  567                   new SessionMessageImpl(getName(),
  568                                          SessionMessage.EVT_SESSION_CREATED, 
  569                                          null, 
  570                                          sessionId,
  571                                          sessionId + "-" + System.currentTimeMillis());
  572               if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
  573               msg.setTimestamp(session.getCreationTime());
  574               counterSend_EVT_SESSION_CREATED++;
  575               send(msg);
  576           }
  577       }
  578       
  579       /**
  580        * Send messages to other backup member (domain or all)
  581        * @param msg Session message
  582        */
  583       protected void send(SessionMessage msg) {
  584           if(cluster != null) {
  585               if(doDomainReplication())
  586                   cluster.sendClusterDomain(msg);
  587               else
  588                   cluster.send(msg);
  589           }
  590       }
  591   
  592       /**
  593        * Create DeltaSession
  594        * @see org.apache.catalina.Manager#createEmptySession()
  595        */
  596       public Session createEmptySession() {
  597           return getNewDeltaSession() ;
  598       }
  599       
  600       /**
  601        * Get new session class to be used in the doLoad() method.
  602        */
  603       protected DeltaSession getNewDeltaSession() {
  604           return new DeltaSession(this);
  605       }
  606   
  607       /**
  608        * Load Deltarequest from external node
  609        * Load the Class at container classloader
  610        * @see DeltaRequest#readExternal(java.io.ObjectInput)
  611        * @param session
  612        * @param data message data
  613        * @return The request
  614        * @throws ClassNotFoundException
  615        * @throws IOException
  616        */
  617       protected DeltaRequest deserializeDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException {
  618           ReplicationStream ois = getReplicationStream(data);
  619           session.getDeltaRequest().readExternal(ois);
  620           ois.close();
  621           return session.getDeltaRequest();
  622       }
  623   
  624       /**
  625        * serialize DeltaRequest
  626        * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
  627        * 
  628        * @param deltaRequest
  629        * @return serialized delta request
  630        * @throws IOException
  631        */
  632       protected byte[] serializeDeltaRequest(DeltaRequest deltaRequest) throws IOException {
  633           return deltaRequest.serialize();
  634       }
  635   
  636       /**
  637        * Load sessions from other cluster node.
  638        * FIXME replace currently sessions with same id without notifcation.
  639        * FIXME SSO handling is not really correct with the session replacement!
  640        * @exception ClassNotFoundException
  641        *                if a serialized class cannot be found during the reload
  642        * @exception IOException
  643        *                if an input/output error occurs
  644        */
  645       protected void deserializeSessions(byte[] data) throws ClassNotFoundException,IOException {
  646   
  647           // Initialize our internal data structures
  648           //sessions.clear(); //should not do this
  649           // Open an input stream to the specified pathname, if any
  650           ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
  651           ObjectInputStream ois = null;
  652           // Load the previously unloaded active sessions
  653           try {
  654               ois = getReplicationStream(data);
  655               Integer count = (Integer) ois.readObject();
  656               int n = count.intValue();
  657               for (int i = 0; i < n; i++) {
  658                   DeltaSession session = (DeltaSession) createEmptySession();
  659                   session.readObjectData(ois);
  660                   session.setManager(this);
  661                   session.setValid(true);
  662                   session.setPrimarySession(false);
  663                   //in case the nodes in the cluster are out of
  664                   //time synch, this will make sure that we have the
  665                   //correct timestamp, isValid returns true, cause
  666                   // accessCount=1
  667                   session.access();
  668                   //make sure that the session gets ready to expire if
  669                   // needed
  670                   session.setAccessCount(0);
  671                   session.resetDeltaRequest();
  672                   // FIXME How inform other session id cache like SingleSignOn
  673                   // increment sessionCounter to correct stats report
  674                   if (findSession(session.getIdInternal()) == null ) {
  675                       sessionCounter++;
  676                   } else {
  677                       sessionReplaceCounter++;
  678                       // FIXME better is to grap this sessions again !
  679                       if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
  680                   }
  681                   add(session);
  682               }
  683           } catch (ClassNotFoundException e) {
  684               log.error(sm.getString("deltaManager.loading.cnfe", e), e);
  685               throw e;
  686           } catch (IOException e) {
  687               log.error(sm.getString("deltaManager.loading.ioe", e), e);
  688               throw e;
  689           } finally {
  690               // Close the input stream
  691               try {
  692                   if (ois != null) ois.close();
  693               } catch (IOException f) {
  694                   // ignored
  695               }
  696               ois = null;
  697               if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
  698           }
  699   
  700       }
  701   
  702       
  703   
  704       /**
  705        * Save any currently active sessions in the appropriate persistence
  706        * mechanism, if any. If persistence is not supported, this method returns
  707        * without doing anything.
  708        * 
  709        * @exception IOException
  710        *                if an input/output error occurs
  711        */
  712       protected byte[] serializeSessions(Session[] currentSessions) throws IOException {
  713   
  714           // Open an output stream to the specified pathname, if any
  715           ByteArrayOutputStream fos = null;
  716           ObjectOutputStream oos = null;
  717   
  718           try {
  719               fos = new ByteArrayOutputStream();
  720               oos = new ObjectOutputStream(new BufferedOutputStream(fos));
  721               oos.writeObject(new Integer(currentSessions.length));
  722               for(int i=0 ; i < currentSessions.length;i++) {
  723                   ((DeltaSession)currentSessions[i]).writeObjectData(oos);                
  724               }
  725               // Flush and close the output stream
  726               oos.flush();
  727           } catch (IOException e) {
  728               log.error(sm.getString("deltaManager.unloading.ioe", e), e);
  729               throw e;
  730           } finally {
  731               if (oos != null) {
  732                   try {
  733                       oos.close();
  734                   } catch (IOException f) {
  735                       ;
  736                   }
  737                   oos = null;
  738               }
  739           }
  740           // send object data as byte[]
  741           return fos.toByteArray();
  742       }
  743   
  744       // ------------------------------------------------------ Lifecycle Methods
  745   
  746       /**
  747        * Add a lifecycle event listener to this component.
  748        * 
  749        * @param listener
  750        *            The listener to add
  751        */
  752       public void addLifecycleListener(LifecycleListener listener) {
  753           lifecycle.addLifecycleListener(listener);
  754       }
  755   
  756       /**
  757        * Get the lifecycle listeners associated with this lifecycle. If this
  758        * Lifecycle has no listeners registered, a zero-length array is returned.
  759        */
  760       public LifecycleListener[] findLifecycleListeners() {
  761           return lifecycle.findLifecycleListeners();
  762       }
  763   
  764       /**
  765        * Remove a lifecycle event listener from this component.
  766        * 
  767        * @param listener
  768        *            The listener to remove
  769        */
  770       public void removeLifecycleListener(LifecycleListener listener) {
  771           lifecycle.removeLifecycleListener(listener);
  772       }
  773   
  774       /**
  775        * Prepare for the beginning of active use of the public methods of this
  776        * component. This method should be called after <code>configure()</code>,
  777        * and before any of the public methods of the component are utilized.
  778        * 
  779        * @exception LifecycleException
  780        *                if this component detects a fatal error that prevents this
  781        *                component from being used
  782        */
  783       public void start() throws LifecycleException {
  784           if (!initialized) init();
  785   
  786           // Validate and update our current component state
  787           if (started) {
  788               return;
  789           }
  790           started = true;
  791           lifecycle.fireLifecycleEvent(START_EVENT, null);
  792   
  793           // Force initialization of the random number generator
  794           generateSessionId();
  795   
  796           // Load unloaded sessions, if any
  797           try {
  798               //the channel is already running
  799               Cluster cluster = getCluster() ;
  800               // stop remove cluster binding
  801               //wow, how many nested levels of if statements can we have ;)
  802               if(cluster == null) {
  803                   Container context = getContainer() ;
  804                   if(context != null && context instanceof Context) {
  805                        Container host = context.getParent() ;
  806                        if(host != null && host instanceof Host) {
  807                            cluster = host.getCluster();
  808                            if(cluster != null && cluster instanceof CatalinaCluster) {
  809                                setCluster((CatalinaCluster) cluster) ;
  810                            } else {
  811                                Container engine = host.getParent() ;
  812                                if(engine != null && engine instanceof Engine) {
  813                                    cluster = engine.getCluster();
  814                                    if(cluster != null && cluster instanceof CatalinaCluster) {
  815                                        setCluster((CatalinaCluster) cluster) ;
  816                                    }
  817                                } else {
  818                                        cluster = null ;
  819                                }
  820                            }
  821                        }
  822                   }
  823               }
  824               if (cluster == null) {
  825                   log.error(sm.getString("deltaManager.noCluster", getName()));
  826                   return;
  827               } else {
  828                   if (log.isInfoEnabled()) {
  829                       String type = "unknown" ;
  830                       if( cluster.getContainer() instanceof Host){
  831                           type = "Host" ;
  832                       } else if( cluster.getContainer() instanceof Engine){
  833                           type = "Engine" ;
  834                       }
  835                       log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
  836                   }
  837               }
  838               if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
  839               //to survice context reloads, as only a stop/start is called, not
  840               // createManager
  841               cluster.registerManager(this);
  842   
  843               getAllClusterSessions();
  844   
  845           } catch (Throwable t) {
  846               log.error(sm.getString("deltaManager.managerLoad"), t);
  847           }
  848       }
  849   
  850       /**
  851        * get from first session master the backup from all clustered sessions
  852        * @see #findSessionMasterMember()
  853        */
  854       public synchronized void getAllClusterSessions() {
  855           if (cluster != null && cluster.getMembers().length > 0) {
  856               long beforeSendTime = System.currentTimeMillis();
  857               Member mbr = findSessionMasterMember();
  858               if(mbr == null) { // No domain member found
  859                    return;
  860               }
  861               SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
  862               // set reference time
  863               stateTransferCreateSendTime = beforeSendTime ;
  864               // request session state
  865               counterSend_EVT_GET_ALL_SESSIONS++;
  866               stateTransfered = false ;
  867               // FIXME This send call block the deploy thread, when sender waitForAck is enabled
  868               try {
  869                   synchronized(receivedMessageQueue) {
  870                        receiverQueue = true ;
  871                   }
  872                   cluster.send(msg, mbr);
  873                   if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
  874                   // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
  875                   waitForSendAllSessions(beforeSendTime);
  876               } finally {
  877                   synchronized(receivedMessageQueue) {
  878                       for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
  879                           SessionMessage smsg = (SessionMessage) iter.next();
  880                           if (!stateTimestampDrop) {
  881                               messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
  882                           } else {
  883                               if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
  884                                   // FIXME handle EVT_GET_ALL_SESSIONS later
  885                                   messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
  886                               } else {
  887                                   if (log.isWarnEnabled()) {
  888                                       log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
  889                                   }
  890                               }
  891                           }
  892                       }        
  893                       receivedMessageQueue.clear();
  894                       receiverQueue = false ;
  895                   }
  896              }
  897           } else {
  898               if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
  899           }
  900       }
  901   
  902       /**
  903        * Register cross context session at replication valve thread local
  904        * @param session cross context session
  905        */
  906       protected void registerSessionAtReplicationValve(DeltaSession session) {
  907           if(replicationValve == null) {
  908               if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
  909                   Cluster cluster = getCluster() ;
  910                   if(cluster != null && cluster instanceof CatalinaCluster) {
  911                       Valve[] valves = ((CatalinaCluster)cluster).getValves();
  912                       if(valves != null && valves.length > 0) {
  913                           for(int i=0; replicationValve == null && i < valves.length ; i++ ){
  914                               if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
  915                           }//for
  916   
  917                           if(replicationValve == null && log.isDebugEnabled()) {
  918                               log.debug("no ReplicationValve found for CrossContext Support");
  919                           }//endif 
  920                       }//end if
  921                   }//endif
  922               }//end if
  923           }//end if
  924           if(replicationValve != null) {
  925               replicationValve.registerReplicationSession(session);
  926           }
  927       }
  928       
  929       /**
  930        * Find the master of the session state
  931        * @return master member of sessions 
  932        */
  933       protected Member findSessionMasterMember() {
  934           Member mbr = null;
  935           Member mbrs[] = cluster.getMembers();
  936           if(mbrs.length != 0 ) mbr = mbrs[0];
  937           if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), ""));
  938           if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
  939           return mbr;
  940       }
  941   
  942       /**
  943        * Wait that cluster session state is transfer or timeout after 60 Sec
  944        * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
  945        */
  946       protected void waitForSendAllSessions(long beforeSendTime) {
  947           long reqStart = System.currentTimeMillis();
  948           long reqNow = reqStart ;
  949           boolean isTimeout = false;
  950           if(getStateTransferTimeout() > 0) {
  951               // wait that state is transfered with timeout check
  952               do {
  953                   try {
  954                       Thread.sleep(100);
  955                   } catch (Exception sleep) {
  956                       //
  957                   }
  958                   reqNow = System.currentTimeMillis();
  959                   isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
  960               } while ((!getStateTransfered()) && (!isTimeout));
  961           } else {
  962               if(getStateTransferTimeout() == -1) {
  963                   // wait that state is transfered
  964                   do {
  965                       try {
  966                           Thread.sleep(100);
  967                       } catch (Exception sleep) {
  968                       }
  969                   } while ((!getStateTransfered()));
  970                   reqNow = System.currentTimeMillis();
  971               }
  972           }
  973           if (isTimeout || (!getStateTransfered())) {
  974               counterNoStateTransfered++ ;
  975               log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
  976           } else {
  977               if (log.isInfoEnabled())
  978                   log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
  979           }
  980       }
  981   
  982       /**
  983        * Gracefully terminate the active use of the public methods of this
  984        * component. This method should be the last one called on a given instance
  985        * of this component.
  986        * 
  987        * @exception LifecycleException
  988        *                if this component detects a fatal error that needs to be
  989        *                reported
  990        */
  991       public void stop() throws LifecycleException {
  992   
  993           if (log.isDebugEnabled())
  994               log.debug(sm.getString("deltaManager.stopped", getName()));
  995   
  996   
  997           // Validate and update our current component state
  998           if (!started)
  999               throw new LifecycleException(sm.getString("deltaManager.notStarted"));
 1000           lifecycle.fireLifecycleEvent(STOP_EVENT, null);
 1001           started = false;
 1002   
 1003           // Expire all active sessions
 1004           if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
 1005           Session sessions[] = findSessions();
 1006           for (int i = 0; i < sessions.length; i++) {
 1007               DeltaSession session = (DeltaSession) sessions[i];
 1008               if (!session.isValid())
 1009                   continue;
 1010               try {
 1011                   session.expire(true, isExpireSessionsOnShutdown());
 1012               } catch (Throwable ignore) {
 1013                   ;
 1014               } 
 1015           }
 1016   
 1017           // Require a new random number generator if we are restarted
 1018           this.random = null;
 1019           getCluster().removeManager(this);
 1020           replicationValve = null;
 1021           if (initialized) {
 1022               destroy();
 1023           }
 1024       }
 1025   
 1026       // ----------------------------------------- PropertyChangeListener Methods
 1027   
 1028       /**
 1029        * Process property change events from our associated Context.
 1030        * 
 1031        * @param event
 1032        *            The property change event that has occurred
 1033        */
 1034       public void propertyChange(PropertyChangeEvent event) {
 1035   
 1036           // Validate the source of this event
 1037           if (!(event.getSource() instanceof Context))
 1038               return;
 1039           // Process a relevant property change
 1040           if (event.getPropertyName().equals("sessionTimeout")) {
 1041               try {
 1042                   setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
 1043               } catch (NumberFormatException e) {
 1044                   log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
 1045               }
 1046           }
 1047   
 1048       }
 1049   
 1050       // -------------------------------------------------------- Replication
 1051       // Methods
 1052   
 1053       /**
 1054        * A message was received from another node, this is the callback method to
 1055        * implement if you are interested in receiving replication messages.
 1056        * 
 1057        * @param cmsg -
 1058        *            the message received.
 1059        */
 1060       public void messageDataReceived(ClusterMessage cmsg) {
 1061           if (cmsg != null && cmsg instanceof SessionMessage) {
 1062               SessionMessage msg = (SessionMessage) cmsg;
 1063               switch (msg.getEventType()) {
 1064                   case SessionMessage.EVT_GET_ALL_SESSIONS:
 1065                   case SessionMessage.EVT_SESSION_CREATED: 
 1066                   case SessionMessage.EVT_SESSION_EXPIRED: 
 1067                   case SessionMessage.EVT_SESSION_ACCESSED:
 1068                   case SessionMessage.EVT_SESSION_DELTA: {
 1069                       synchronized(receivedMessageQueue) {
 1070                           if(receiverQueue) {
 1071                               receivedMessageQueue.add(msg);
 1072                               return ;
 1073                           }
 1074                       }
 1075                      break;
 1076                   }
 1077                   default: {
 1078                       //we didn't queue, do nothing
 1079                       break;
 1080                   }
 1081               } //switch
 1082               
 1083               messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
 1084           }
 1085       }
 1086   
 1087       /**
 1088        * When the request has been completed, the replication valve will notify
 1089        * the manager, and the manager will decide whether any replication is
 1090        * needed or not. If there is a need for replication, the manager will
 1091        * create a session message and that will be replicated. The cluster
 1092        * determines where it gets sent.
 1093        * 
 1094        * @param sessionId -
 1095        *            the sessionId that just completed.
 1096        * @return a SessionMessage to be sent,
 1097        */
 1098       public ClusterMessage requestCompleted(String sessionId) {
 1099           try {
 1100               DeltaSession session = (DeltaSession) findSession(sessionId);
 1101               DeltaRequest deltaRequest = session.getDeltaRequest();
 1102               SessionMessage msg = null;
 1103               boolean isDeltaRequest = false ;
 1104               synchronized(deltaRequest) {
 1105                   isDeltaRequest = deltaRequest.getSize() > 0 ;
 1106                   if (isDeltaRequest) {    
 1107                       counterSend_EVT_SESSION_DELTA++;
 1108                       byte[] data = serializeDeltaRequest(deltaRequest);
 1109                       msg = new SessionMessageImpl(getName(),
 1110                                                    SessionMessage.EVT_SESSION_DELTA, 
 1111                                                    data, 
 1112                                                    sessionId,
 1113                                                    sessionId + "-" + System.currentTimeMillis());
 1114                       session.resetDeltaRequest();
 1115                   }  
 1116               }
 1117               if(!isDeltaRequest) {
 1118                   if(!session.isPrimarySession()) {               
 1119                       counterSend_EVT_SESSION_ACCESSED++;
 1120                       msg = new SessionMessageImpl(getName(),
 1121                                                    SessionMessage.EVT_SESSION_ACCESSED, 
 1122                                                    null, 
 1123                                                    sessionId,
 1124                                                    sessionId + "-" + System.currentTimeMillis());
 1125                       if (log.isDebugEnabled()) {
 1126                           log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
 1127                       }
 1128                   }    
 1129               } else { // log only outside synch block!
 1130                   if (log.isDebugEnabled()) {
 1131                       log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
 1132                   }
 1133               }
 1134               session.setPrimarySession(true);
 1135               //check to see if we need to send out an access message
 1136               if ((msg == null)) {
 1137                   long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
 1138                   if (replDelta > (getMaxInactiveInterval() * 1000)) {
 1139                       counterSend_EVT_SESSION_ACCESSED++;
 1140                       msg = new SessionMessageImpl(getName(),
 1141                                                    SessionMessage.EVT_SESSION_ACCESSED, 
 1142                                                    null,
 1143                                                    sessionId, 
 1144                                                    sessionId + "-" + System.currentTimeMillis());
 1145                       if (log.isDebugEnabled()) {
 1146                           log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
 1147                       }
 1148                   }
 1149   
 1150               }
 1151   
 1152               //update last replicated time
 1153               if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
 1154               return msg;
 1155           } catch (IOException x) {
 1156               log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
 1157               return null;
 1158           }
 1159   
 1160       }
 1161       /**
 1162        * Reset manager statistics
 1163        */
 1164       public synchronized void resetStatistics() {
 1165           processingTime = 0 ;
 1166           expiredSessions = 0 ;
 1167           rejectedSessions = 0 ;
 1168           sessionReplaceCounter = 0 ;
 1169           counterNoStateTransfered = 0 ;
 1170           maxActive = getActiveSessions() ;
 1171           sessionCounter = getActiveSessions() ;
 1172           counterReceive_EVT_ALL_SESSION_DATA = 0;
 1173           counterReceive_EVT_GET_ALL_SESSIONS = 0;
 1174           counterReceive_EVT_SESSION_ACCESSED = 0 ;
 1175           counterReceive_EVT_SESSION_CREATED = 0 ;
 1176           counterReceive_EVT_SESSION_DELTA = 0 ;
 1177           counterReceive_EVT_SESSION_EXPIRED = 0 ;
 1178           counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
 1179           counterSend_EVT_ALL_SESSION_DATA = 0;
 1180           counterSend_EVT_GET_ALL_SESSIONS = 0;
 1181           counterSend_EVT_SESSION_ACCESSED = 0 ;
 1182           counterSend_EVT_SESSION_CREATED = 0 ;
 1183           counterSend_EVT_SESSION_DELTA = 0 ;
 1184           counterSend_EVT_SESSION_EXPIRED = 0 ;
 1185           counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
 1186           
 1187       }
 1188      
 1189       //  -------------------------------------------------------- persistence handler
 1190   
 1191       public void load() {
 1192   
 1193       }
 1194   
 1195       public void unload() {
 1196   
 1197       }
 1198   
 1199       //  -------------------------------------------------------- expire
 1200   
 1201       /**
 1202        * send session expired to other cluster nodes
 1203        * 
 1204        * @param id
 1205        *            session id
 1206        */
 1207       protected void sessionExpired(String id) {
 1208           counterSend_EVT_SESSION_EXPIRED++ ;
 1209           SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
 1210           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
 1211           send(msg);
 1212       }
 1213   
 1214       /**
 1215        * Exipre all find sessions.
 1216        */
 1217       public void expireAllLocalSessions()
 1218       {
 1219           long timeNow = System.currentTimeMillis();
 1220           Session sessions[] = findSessions();
 1221           int expireDirect  = 0 ;
 1222           int expireIndirect = 0 ;
 1223           
 1224           if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
 1225           for (int i = 0; i < sessions.length; i++) {
 1226               if (sessions[i] instanceof DeltaSession) {
 1227                   DeltaSession session = (DeltaSession) sessions[i];
 1228                   if (session.isPrimarySession()) {
 1229                       if (session.isValid()) {
 1230                           session.expire();
 1231                           expireDirect++;
 1232                       } else {
 1233                           expireIndirect++;
 1234                       }//end if
 1235                   }//end if
 1236               }//end if
 1237           }//for
 1238           long timeEnd = System.currentTimeMillis();
 1239           if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
 1240         
 1241       }
 1242       
 1243       /**
 1244        * When the manager expires session not tied to a request. The cluster will
 1245        * periodically ask for a list of sessions that should expire and that
 1246        * should be sent across the wire.
 1247        * 
 1248        * @return The invalidated sessions array
 1249        */
 1250       public String[] getInvalidatedSessions() {
 1251           return new String[0];
 1252       }
 1253   
 1254       //  -------------------------------------------------------- message receive
 1255   
 1256       /**
 1257        * Test that sender and local domain is the same
 1258        */
 1259       protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
 1260           boolean sameDomain= true;
 1261           if (!sameDomain && log.isWarnEnabled()) {
 1262                   log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
 1263                            new Object[] {getName(), 
 1264                            msg.getEventTypeString(), 
 1265                            sender,
 1266                            "",
 1267                            "" }));
 1268           }
 1269           return sameDomain ;
 1270       }
 1271   
 1272       /**
 1273        * This method is called by the received thread when a SessionMessage has
 1274        * been received from one of the other nodes in the cluster.
 1275        * 
 1276        * @param msg -
 1277        *            the message received
 1278        * @param sender -
 1279        *            the sender of the message, this is used if we receive a
 1280        *            EVT_GET_ALL_SESSION message, so that we only reply to the
 1281        *            requesting node
 1282        */
 1283       protected void messageReceived(SessionMessage msg, Member sender) {
 1284           if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
 1285               return;
 1286           }
 1287           ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
 1288           try {
 1289               
 1290               ClassLoader[] loaders = getClassLoaders();
 1291               if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
 1292               if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
 1293    
 1294               switch (msg.getEventType()) {
 1295                   case SessionMessage.EVT_GET_ALL_SESSIONS: {
 1296                       handleGET_ALL_SESSIONS(msg,sender);
 1297                       break;
 1298                   }
 1299                   case SessionMessage.EVT_ALL_SESSION_DATA: {
 1300                       handleALL_SESSION_DATA(msg,sender);
 1301                       break;
 1302                   }
 1303                   case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
 1304                       handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
 1305                       break;
 1306                   }
 1307                   case SessionMessage.EVT_SESSION_CREATED: {
 1308                       handleSESSION_CREATED(msg,sender);
 1309                       break;
 1310                   }
 1311                   case SessionMessage.EVT_SESSION_EXPIRED: {
 1312                       handleSESSION_EXPIRED(msg,sender);
 1313                       break;
 1314                   }
 1315                   case SessionMessage.EVT_SESSION_ACCESSED: {
 1316                       handleSESSION_ACCESSED(msg,sender);
 1317                       break;
 1318                   }
 1319                   case SessionMessage.EVT_SESSION_DELTA: {
 1320                      handleSESSION_DELTA(msg,sender);
 1321                      break;
 1322                   }
 1323                   default: {
 1324                       //we didn't recognize the message type, do nothing
 1325                       break;
 1326                   }
 1327               } //switch
 1328           } catch (Exception x) {
 1329               log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
 1330           } finally {
 1331               Thread.currentThread().setContextClassLoader(contextLoader);
 1332           }
 1333       }
 1334   
 1335       // -------------------------------------------------------- message receiver handler
 1336   
 1337   
 1338       /**
 1339        * handle receive session state is complete transfered
 1340        * @param msg
 1341        * @param sender
 1342        */
 1343       protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
 1344           counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
 1345           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
 1346           stateTransferCreateSendTime = msg.getTimestamp() ;
 1347           stateTransfered = true ;
 1348       }
 1349   
 1350       /**
 1351        * handle receive session delta
 1352        * @param msg
 1353        * @param sender
 1354        * @throws IOException
 1355        * @throws ClassNotFoundException
 1356        */
 1357       protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {
 1358           counterReceive_EVT_SESSION_DELTA++;
 1359           byte[] delta = msg.getSession();
 1360           DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
 1361           if (session != null) {
 1362               if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
 1363               DeltaRequest dreq = deserializeDeltaRequest(session, delta);
 1364               dreq.execute(session, notifyListenersOnReplication);
 1365               session.setPrimarySession(false);
 1366           }
 1367       }
 1368   
 1369       /**
 1370        * handle receive session is access at other node ( primary session is now false)
 1371        * @param msg
 1372        * @param sender
 1373        * @throws IOException
 1374        */
 1375       protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
 1376           counterReceive_EVT_SESSION_ACCESSED++;
 1377           DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
 1378           if (session != null) {
 1379               if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
 1380               session.access();
 1381               session.setPrimarySession(false);
 1382               session.endAccess();
 1383           }
 1384       }
 1385   
 1386       /**
 1387        * handle receive session is expire at other node ( expire session also here)
 1388        * @param msg
 1389        * @param sender
 1390        * @throws IOException
 1391        */
 1392       protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {
 1393           counterReceive_EVT_SESSION_EXPIRED++;
 1394           DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
 1395           if (session != null) {
 1396               if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
 1397               session.expire(notifySessionListenersOnReplication, false);
 1398           }
 1399       }
 1400   
 1401       /**
 1402        * handle receive new session is created at other node (create backup - primary false)
 1403        * @param msg
 1404        * @param sender
 1405        */
 1406       protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
 1407           counterReceive_EVT_SESSION_CREATED++;
 1408           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
 1409           DeltaSession session = (DeltaSession) createEmptySession();
 1410           session.setManager(this);
 1411           session.setValid(true);
 1412           session.setPrimarySession(false);
 1413           session.setCreationTime(msg.getTimestamp());
 1414           // use container maxInactiveInterval so that session will expire correctly in case of primary transfer
 1415           session.setMaxInactiveInterval(getMaxInactiveInterval());
 1416           session.access();
 1417           if(notifySessionListenersOnReplication)
 1418               session.setId(msg.getSessionID());
 1419           else
 1420               session.setIdInternal(msg.getSessionID());
 1421           session.resetDeltaRequest();
 1422           session.endAccess();
 1423   
 1424       }
 1425   
 1426       /**
 1427        * handle receive sessions from other not ( restart )
 1428        * @param msg
 1429        * @param sender
 1430        * @throws ClassNotFoundException
 1431        * @throws IOException
 1432        */
 1433       protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
 1434           counterReceive_EVT_ALL_SESSION_DATA++;
 1435           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
 1436           byte[] data = msg.getSession();
 1437           deserializeSessions(data);
 1438           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
 1439           //stateTransferred = true;
 1440       }
 1441   
 1442       /**
 1443        * handle receive that other node want all sessions ( restart )
 1444        * a) send all sessions with one message
 1445        * b) send session at blocks
 1446        * After sending send state is complete transfered
 1447        * @param msg
 1448        * @param sender
 1449        * @throws IOException
 1450        */
 1451       protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
 1452           counterReceive_EVT_GET_ALL_SESSIONS++;
 1453           //get a list of all the session from this manager
 1454           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
 1455           // Write the number of active sessions, followed by the details
 1456           // get all sessions and serialize without sync
 1457           Session[] currentSessions = findSessions();
 1458           long findSessionTimestamp = System.currentTimeMillis() ;
 1459           if (isSendAllSessions()) {
 1460               sendSessions(sender, currentSessions, findSessionTimestamp);
 1461           } else {
 1462               // send session at blocks
 1463               int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
 1464               Session[] sendSessions = new Session[len];
 1465               for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
 1466                   len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
 1467                   System.arraycopy(currentSessions, i, sendSessions, 0, len);
 1468                   sendSessions(sender, sendSessions,findSessionTimestamp);
 1469                   if (getSendAllSessionsWaitTime() > 0) {
 1470                       try {
 1471                           Thread.sleep(getSendAllSessionsWaitTime());
 1472                       } catch (Exception sleep) {
 1473                       }
 1474                   }//end if
 1475               }//for
 1476           }//end if
 1477           
 1478           SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
 1479           newmsg.setTimestamp(findSessionTimestamp);
 1480           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
 1481           counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
 1482           cluster.send(newmsg, sender);
 1483       }
 1484   
 1485   
 1486       /**
 1487        * send a block of session to sender
 1488        * @param sender
 1489        * @param currentSessions
 1490        * @param sendTimestamp
 1491        * @throws IOException
 1492        */
 1493       protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {
 1494           byte[] data = serializeSessions(currentSessions);
 1495           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
 1496           SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
 1497           newmsg.setTimestamp(sendTimestamp);
 1498           if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
 1499           counterSend_EVT_ALL_SESSION_DATA++;
 1500           cluster.send(newmsg, sender);
 1501       }
 1502   
 1503       public ClusterManager cloneFromTemplate() {
 1504           DeltaManager result = new DeltaManager();
 1505           result.name = "Clone-from-"+name;
 1506           result.cluster = cluster;
 1507           result.replicationValve = replicationValve;
 1508           result.maxActiveSessions = maxActiveSessions;
 1509           result.expireSessionsOnShutdown = expireSessionsOnShutdown;
 1510           result.notifyListenersOnReplication = notifyListenersOnReplication;
 1511           result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
 1512           result.stateTransferTimeout = stateTransferTimeout;
 1513           result.sendAllSessions = sendAllSessions;
 1514           result.sendClusterDomainOnly = sendClusterDomainOnly ;
 1515           result.sendAllSessionsSize = sendAllSessionsSize;
 1516           result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; 
 1517           result.receiverQueue = receiverQueue ;
 1518           result.stateTimestampDrop = stateTimestampDrop ;
 1519           result.stateTransferCreateSendTime = stateTransferCreateSendTime; 
 1520           return result;
 1521       }
 1522   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » ha » session » [javadoc | source]