Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » session » [javadoc | source]
    1   /*
    2    * Copyright 1999,2004 The Apache Software Foundation.
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License");
    5    * you may not use this file except in compliance with the License.
    6    * You may obtain a copy of the License at
    7    * 
    8    *      http://www.apache.org/licenses/LICENSE-2.0
    9    * 
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS,
   12    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13    * See the License for the specific language governing permissions and
   14    * limitations under the License.
   15    */
   16   package org.apache.catalina.cluster.session;
   17   
   18   import java.io.IOException;
   19   
   20   import org.apache.catalina.LifecycleException;
   21   import org.apache.catalina.Session;
   22   import org.apache.catalina.cluster.CatalinaCluster;
   23   import org.apache.catalina.cluster.ClusterManager;
   24   import org.apache.catalina.cluster.ClusterMessage;
   25   import org.apache.catalina.cluster.Member;
   26   import org.apache.catalina.realm.GenericPrincipal;
   27   import org.apache.catalina.session.StandardManager;
   28   
   29   /**
   30    * Title:        Tomcat Session Replication for Tomcat 4.0 <BR>
   31    * Description:  A very simple straight forward implementation of
   32    *               session replication of servers in a cluster.<BR>
   33    *               This session replication is implemented "live". By live
   34    *               I mean, when a session attribute is added into a session on Node A
   35    *               a message is broadcasted to other messages and setAttribute is called on the
   36    *               replicated sessions.<BR>
   37    *               A full description of this implementation can be found under
   38    *               <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR>
   39    *
   40    * Copyright:    See apache license
   41    * Company:      www.filip.net
   42    * @author  <a href="mailto:mail@filip.net">Filip Hanik</a>
   43    * @author Bela Ban (modifications for synchronous replication)
   44    * @version 1.0 for TC 4.0
   45    * Description: The InMemoryReplicationManager is a session manager that replicated
   46    * session information in memory. It uses <a href="www.javagroups.com">JavaGroups</a> as
   47    * a communication protocol to ensure guaranteed and ordered message delivery.
   48    * JavaGroups also provides a very flexible protocol stack to ensure that the replication
   49    * can be used in any environment.
   50    * <BR><BR>
   51    * The InMemoryReplicationManager extends the StandardManager hence it allows for us
   52    * to inherit all the basic session management features like expiration, session listeners etc
   53    * <BR><BR>
   54    * To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages
   55    * all defined in the SessionMessage class.<BR>
   56    * When a session is replicated (not an attribute added/removed) the session is serialized into
   57    * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
   58    */
   59   public class SimpleTcpReplicationManager extends StandardManager
   60   implements ClusterManager
   61   {
   62       public static org.apache.commons.logging.Log log =
   63           org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
   64   
   65       //the channel configuration
   66       protected String mChannelConfig = null;
   67   
   68       //the group name
   69       protected String mGroupName = "TomcatReplication";
   70   
   71       //somehow start() gets called more than once
   72       protected boolean mChannelStarted = false;
   73   
   74       //log to screen
   75       protected boolean mPrintToScreen = true;
   76   
   77       protected boolean defaultMode = false;
   78   
   79       protected boolean mManagerRunning = false;
   80   
   81       /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc)
   82        * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for
   83        * all responses.
   84        */
   85       protected boolean synchronousReplication=true;
   86   
   87       /** Set to true if we don't want the sessions to expire on shutdown */
   88       protected boolean mExpireSessionsOnShutdown = true;
   89   
   90       protected boolean useDirtyFlag = false;
   91   
   92       protected String name;
   93   
   94       protected boolean distributable = true;
   95   
   96       protected CatalinaCluster cluster;
   97   
   98       protected java.util.HashMap invalidatedSessions = new java.util.HashMap();
   99   
  100       /**
  101        * Flag to keep track if the state has been transferred or not
  102        * Assumes false.
  103        */
  104       protected boolean stateTransferred = false;
  105       private boolean notifyListenersOnReplication;
  106       private boolean sendClusterDomainOnly = true ;
  107   
  108       /**
  109        * Constructor, just calls super()
  110        *
  111        */
  112       public SimpleTcpReplicationManager()
  113       {
  114           super();
  115       }
  116   
  117       public boolean isSendClusterDomainOnly() {
  118           return sendClusterDomainOnly;
  119       }
  120       
  121       /**
  122        * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
  123        */
  124       public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
  125           this.sendClusterDomainOnly = sendClusterDomainOnly;
  126       }
  127     
  128       /**
  129        * @return Returns the defaultMode.
  130        */
  131       public boolean isDefaultMode() {
  132           return defaultMode;
  133       }
  134       /**
  135        * @param defaultMode The defaultMode to set.
  136        */
  137       public void setDefaultMode(boolean defaultMode) {
  138           this.defaultMode = defaultMode;
  139       }
  140       
  141       public boolean isManagerRunning()
  142       {
  143           return mManagerRunning;
  144       }
  145   
  146       public void setUseDirtyFlag(boolean usedirtyflag)
  147       {
  148           this.useDirtyFlag = usedirtyflag;
  149       }
  150   
  151       public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
  152       {
  153           mExpireSessionsOnShutdown = expireSessionsOnShutdown;
  154       }
  155   
  156       public void setCluster(CatalinaCluster cluster) {
  157           if(log.isDebugEnabled())
  158               log.debug("Cluster associated with SimpleTcpReplicationManager");
  159           this.cluster = cluster;
  160       }
  161   
  162       public boolean getExpireSessionsOnShutdown()
  163       {
  164           return mExpireSessionsOnShutdown;
  165       }
  166   
  167       public void setPrintToScreen(boolean printtoscreen)
  168       {
  169           if(log.isDebugEnabled())
  170               log.debug("Setting screen debug to:"+printtoscreen);
  171           mPrintToScreen = printtoscreen;
  172       }
  173   
  174       public void setSynchronousReplication(boolean flag)
  175       {
  176           synchronousReplication=flag;
  177       }
  178   
  179       /**
  180        * Override persistence since they don't go hand in hand with replication for now.
  181        */
  182       public void unload() throws IOException {
  183           if ( !getDistributable() ) {
  184               super.unload();
  185           }
  186       }
  187   
  188       /**
  189        * Creates a HTTP session.
  190        * Most of the code in here is copied from the StandardManager.
  191        * This is not pretty, yeah I know, but it was necessary since the
  192        * StandardManager had hard coded the session instantiation to the a
  193        * StandardSession, when we actually want to instantiate a ReplicatedSession<BR>
  194        * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
  195        * nodes in the cluster that this session has been created.
  196        * @param notify - if set to true the other nodes in the cluster will be notified.
  197        *                 This flag is needed so that we can create a session before we deserialize
  198        *                 a replicated one
  199        *
  200        * @see ReplicatedSession
  201        */
  202       protected Session createSession(String sessionId, boolean notify, boolean setId)
  203       {
  204   
  205           //inherited from the basic manager
  206           if ((getMaxActiveSessions() >= 0) &&
  207              (sessions.size() >= getMaxActiveSessions()))
  208               throw new IllegalStateException(sm.getString("standardManager.createSession.ise"));
  209   
  210   
  211           Session session = new ReplicatedSession(this);
  212   
  213           // Initialize the properties of the new session and return it
  214           session.setNew(true);
  215           session.setValid(true);
  216           session.setCreationTime(System.currentTimeMillis());
  217           session.setMaxInactiveInterval(this.maxInactiveInterval);
  218           if(sessionId == null)
  219               sessionId = generateSessionId();
  220           if ( setId ) session.setId(sessionId);
  221           if ( notify && (cluster!=null) ) {
  222               ((ReplicatedSession)session).setIsDirty(true);
  223           }
  224           return (session);
  225       }//createSession
  226   
  227       //=========================================================================
  228       // OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
  229       //=========================================================================
  230   
  231       /**
  232        * Construct and return a new session object, based on the default
  233        * settings specified by this Manager's properties.  The session
  234        * id will be assigned by this method, and available via the getId()
  235        * method of the returned session.  If a new session cannot be created
  236        * for any reason, return <code>null</code>.
  237        *
  238        * @exception IllegalStateException if a new session cannot be
  239        *  instantiated for any reason
  240        */
  241       public Session createSession(String sessionId)
  242       {
  243           //create a session and notify the other nodes in the cluster
  244           Session session =  createSession(sessionId,getDistributable(),true);
  245           add(session);
  246           return session;
  247       }
  248   
  249       public void sessionInvalidated(String sessionId) {
  250           synchronized ( invalidatedSessions ) {
  251               invalidatedSessions.put(sessionId, sessionId);
  252           }
  253       }
  254   
  255       public String[] getInvalidatedSessions() {
  256           synchronized ( invalidatedSessions ) {
  257               String[] result = new String[invalidatedSessions.size()];
  258               invalidatedSessions.values().toArray(result);
  259               return result;
  260           }
  261   
  262       }
  263   
  264       public ClusterMessage requestCompleted(String sessionId)
  265       {
  266           if (  !getDistributable() ) {
  267               log.warn("Received requestCompleted message, although this context["+
  268                        getName()+"] is not distributable. Ignoring message");
  269               return null;
  270           }
  271           //notify javagroups
  272           try
  273           {
  274               if ( invalidatedSessions.get(sessionId) != null ) {
  275                   synchronized ( invalidatedSessions ) {
  276                       invalidatedSessions.remove(sessionId);
  277                       SessionMessage msg = new SessionMessageImpl(name,
  278                       SessionMessage.EVT_SESSION_EXPIRED,
  279                       null,
  280                       sessionId,
  281                       sessionId);
  282                   return msg;
  283                   }
  284               } else {
  285                   ReplicatedSession session = (ReplicatedSession) findSession(
  286                       sessionId);
  287                   if (session != null) {
  288                       //return immediately if the session is not dirty
  289                       if (useDirtyFlag && (!session.isDirty())) {
  290                           //but before we return doing nothing,
  291                           //see if we should send
  292                           //an updated last access message so that
  293                           //sessions across cluster dont expire
  294                           long interval = session.getMaxInactiveInterval();
  295                           long lastaccdist = System.currentTimeMillis() -
  296                               session.getLastAccessWasDistributed();
  297                           if ( ((interval*1000) / lastaccdist)< 3 ) {
  298                               SessionMessage accmsg = new SessionMessageImpl(name,
  299                                   SessionMessage.EVT_SESSION_ACCESSED,
  300                                   null,
  301                                   sessionId,
  302                                   sessionId);
  303                               session.setLastAccessWasDistributed(System.currentTimeMillis());
  304                               return accmsg;
  305                           }
  306                           return null;
  307                       }
  308   
  309                       session.setIsDirty(false);
  310                       if (log.isDebugEnabled()) {
  311                           try {
  312                               log.debug("Sending session to cluster=" + session);
  313                           }
  314                           catch (Exception ignore) {}
  315                       }
  316                       SessionMessage msg = new SessionMessageImpl(name,
  317                           SessionMessage.EVT_SESSION_CREATED,
  318                           writeSession(session),
  319                           session.getIdInternal(),
  320                           session.getIdInternal());
  321                       return msg;
  322                   } //end if
  323               }//end if
  324           }
  325           catch (Exception x )
  326           {
  327               log.error("Unable to replicate session",x);
  328           }
  329           return null;
  330       }
  331   
  332       /**
  333        * Serialize a session into a byte array<BR>
  334        * This method simple calls the writeObjectData method on the session
  335        * and returns the byte data from that call
  336        * @param session - the session to be serialized
  337        * @return a byte array containing the session data, null if the serialization failed
  338        */
  339       protected byte[] writeSession( Session session )
  340       {
  341           try
  342           {
  343               java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
  344               java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
  345               session_out.flush();
  346               boolean hasPrincipal = session.getPrincipal() != null;
  347               session_out.writeBoolean(hasPrincipal);
  348               if ( hasPrincipal )
  349               {
  350                   session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
  351               }//end if
  352               ((ReplicatedSession)session).writeObjectData(session_out);
  353               return session_data.toByteArray();
  354   
  355           }
  356           catch ( Exception x )
  357           {
  358               log.error("Failed to serialize the session!",x);
  359           }
  360           return null;
  361       }
  362   
  363       /**
  364        * Reinstantiates a serialized session from the data passed in.
  365        * This will first call createSession() so that we get a fresh instance with all
  366        * the managers set and all the transient fields validated.
  367        * Then it calls Session.readObjectData(byte[]) to deserialize the object
  368        * @param data - a byte array containing session data
  369        * @return a valid Session object, null if an error occurs
  370        *
  371        */
  372       protected Session readSession( byte[] data, String sessionId )
  373       {
  374           try
  375           {
  376               java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream(data);
  377               ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader());
  378   
  379               Session session = sessionId!=null?this.findSession(sessionId):null;
  380               boolean isNew = (session==null);
  381               //clear the old values from the existing session
  382               if ( session!=null ) {
  383                   ReplicatedSession rs = (ReplicatedSession)session;
  384                   rs.expire(false);  //cleans up the previous values, since we are not doing removes
  385                   session = null;
  386               }//end if
  387   
  388               if (session==null) {
  389                   session = createSession(null,false, false);
  390                   sessions.remove(session.getIdInternal());
  391               }
  392               
  393               
  394               boolean hasPrincipal = session_in.readBoolean();
  395               SerializablePrincipal p = null;
  396               if ( hasPrincipal )
  397                   p = (SerializablePrincipal)session_in.readObject();
  398               ((ReplicatedSession)session).readObjectData(session_in);
  399               if ( hasPrincipal )
  400                   session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
  401               ((ReplicatedSession)session).setId(sessionId,isNew);
  402               ReplicatedSession rsession = (ReplicatedSession)session; 
  403               rsession.setAccessCount(1);
  404               session.setManager(this);
  405               session.setValid(true);
  406               rsession.setLastAccessedTime(System.currentTimeMillis());
  407               rsession.setThisAccessedTime(System.currentTimeMillis());
  408               ((ReplicatedSession)session).setAccessCount(0);
  409               session.setNew(false);
  410               if(log.isTraceEnabled())
  411                    log.trace("Session loaded id="+sessionId +
  412                                  " actualId="+session.getId()+ 
  413                                  " exists="+this.sessions.containsKey(sessionId)+
  414                                  " valid="+rsession.isValid());
  415               return session;
  416   
  417           }
  418           catch ( Exception x )
  419           {
  420               log.error("Failed to deserialize the session!",x);
  421           }
  422           return null;
  423       }
  424   
  425       public String getName() {
  426           return this.name;
  427       }
  428       /**
  429        * Prepare for the beginning of active use of the public methods of this
  430        * component.  This method should be called after <code>configure()</code>,
  431        * and before any of the public methods of the component are utilized.<BR>
  432        * Starts the cluster communication channel, this will connect with the other nodes
  433        * in the cluster, and request the current session state to be transferred to this node.
  434        * @exception IllegalStateException if this component has already been
  435        *  started
  436        * @exception LifecycleException if this component detects a fatal error
  437        *  that prevents this component from being used
  438        */
  439       public void start() throws LifecycleException {
  440           mManagerRunning = true;
  441           super.start();
  442           //start the javagroups channel
  443           try {
  444               //the channel is already running
  445               if ( mChannelStarted ) return;
  446               if(log.isInfoEnabled())
  447                   log.info("Starting clustering manager...:"+getName());
  448               if ( cluster == null ) {
  449                   log.error("Starting... no cluster associated with this context:"+getName());
  450                   return;
  451               }
  452               cluster.addManager(getName(),this);
  453   
  454               if (cluster.getMembers().length > 0) {
  455                   Member mbr = cluster.getMembers()[0];
  456                   SessionMessage msg =
  457                       new SessionMessageImpl(this.getName(),
  458                                          SessionMessage.EVT_GET_ALL_SESSIONS,
  459                                          null,
  460                                          "GET-ALL",
  461                                          "GET-ALL-"+this.getName());
  462                   cluster.send(msg, mbr);
  463                   if(log.isWarnEnabled())
  464                        log.warn("Manager["+getName()+"], requesting session state from "+mbr+
  465                            ". This operation will timeout if no session state has been received within "+
  466                            "60 seconds");
  467                   long reqStart = System.currentTimeMillis();
  468                   long reqNow = 0;
  469                   boolean isTimeout=false;
  470                   do {
  471                       try {
  472                           Thread.sleep(100);
  473                       }catch ( Exception sleep) {}
  474                       reqNow = System.currentTimeMillis();
  475                       isTimeout=((reqNow-reqStart)>(1000*60));
  476                   } while ( (!isStateTransferred()) && (!isTimeout));
  477                   if ( isTimeout || (!isStateTransferred()) ) {
  478                       log.error("Manager["+getName()+"], No session state received, timing out.");
  479                   }else {
  480                       if(log.isInfoEnabled())
  481                           log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
  482                   }
  483               } else {
  484                   if(log.isInfoEnabled())
  485                       log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
  486               }//end if
  487               mChannelStarted = true;
  488           }  catch ( Exception x ) {
  489               log.error("Unable to start SimpleTcpReplicationManager",x);
  490           }
  491       }
  492   
  493       /**
  494        * Gracefully terminate the active use of the public methods of this
  495        * component.  This method should be the last one called on a given
  496        * instance of this component.<BR>
  497        * This will disconnect the cluster communication channel and stop the listener thread.
  498        * @exception IllegalStateException if this component has not been started
  499        * @exception LifecycleException if this component detects a fatal error
  500        *  that needs to be reported
  501        */
  502       public void stop() throws LifecycleException
  503       {
  504           mManagerRunning = false;
  505           mChannelStarted = false;
  506           super.stop();
  507           //stop the javagroup channel
  508           try
  509           {
  510               this.sessions.clear();
  511               cluster.removeManager(getName(),this);
  512   //            mReplicationListener.stopListening();
  513   //            mReplicationTransmitter.stop();
  514   //            service.stop();
  515   //            service = null;
  516           }
  517           catch ( Exception x )
  518           {
  519               log.error("Unable to stop SimpleTcpReplicationManager",x);
  520           }
  521       }
  522   
  523       public void setDistributable(boolean dist) {
  524           this.distributable = dist;
  525       }
  526   
  527       public boolean getDistributable() {
  528           return distributable;
  529       }
  530   
  531       /**
  532        * This method is called by the received thread when a SessionMessage has
  533        * been received from one of the other nodes in the cluster.
  534        * @param msg - the message received
  535        * @param sender - the sender of the message, this is used if we receive a
  536        *                 EVT_GET_ALL_SESSION message, so that we only reply to
  537        *                 the requesting node
  538        */
  539       protected void messageReceived( SessionMessage msg, Member sender ) {
  540           try  {
  541               if(log.isInfoEnabled()) {
  542                   log.debug("Received SessionMessage of type="+msg.getEventTypeString());
  543                   log.debug("Received SessionMessage sender="+sender);
  544               }
  545               switch ( msg.getEventType() ) {
  546                   case SessionMessage.EVT_GET_ALL_SESSIONS: {
  547                       //get a list of all the session from this manager
  548                       Object[] sessions = findSessions();
  549                       java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
  550                       java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
  551                       oout.writeInt(sessions.length);
  552                       for (int i=0; i<sessions.length; i++){
  553                           ReplicatedSession ses = (ReplicatedSession)sessions[i];
  554                           oout.writeUTF(ses.getIdInternal());
  555                           byte[] data = writeSession(ses);
  556                           oout.writeObject(data);
  557                       }//for
  558                       //don't send a message if we don't have to
  559                       oout.flush();
  560                       oout.close();
  561                       byte[] data = bout.toByteArray();
  562                       SessionMessage newmsg = new SessionMessageImpl(name,
  563                           SessionMessage.EVT_ALL_SESSION_DATA,
  564                           data, "SESSION-STATE","SESSION-STATE-"+getName());
  565                       cluster.send(newmsg, sender);
  566                       break;
  567                   }
  568                   case SessionMessage.EVT_ALL_SESSION_DATA: {
  569                       java.io.ByteArrayInputStream bin =
  570                           new java.io.ByteArrayInputStream(msg.getSession());
  571                       java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
  572                       int size = oin.readInt();
  573                       for ( int i=0; i<size; i++) {
  574                           String id = oin.readUTF();
  575                           byte[] data = (byte[])oin.readObject();
  576                           Session session = readSession(data,id);
  577                       }//for
  578                       stateTransferred=true;
  579                       break;
  580                   }
  581                   case SessionMessage.EVT_SESSION_CREATED: {
  582                       Session session = this.readSession(msg.getSession(),msg.getSessionID());
  583                       if ( log.isDebugEnabled() ) {
  584                           log.debug("Received replicated session=" + session +
  585                               " isValid=" + session.isValid());
  586                       }
  587                       break;
  588                   }
  589                   case SessionMessage.EVT_SESSION_EXPIRED: {
  590                       Session session = findSession(msg.getSessionID());
  591                       if ( session != null ) {
  592                           session.expire();
  593                           this.remove(session);
  594                       }//end if
  595                       break;
  596                   }
  597                   case SessionMessage.EVT_SESSION_ACCESSED :{
  598                       Session session = findSession(msg.getSessionID());
  599                       if ( session != null ) {
  600                           session.access();
  601                           session.endAccess();
  602                       }
  603                       break;
  604                   }
  605                   default:  {
  606                       //we didn't recognize the message type, do nothing
  607                       break;
  608                   }
  609               }//switch
  610           }
  611           catch ( Exception x )
  612           {
  613               log.error("Unable to receive message through TCP channel",x);
  614           }
  615       }
  616   
  617       public void messageDataReceived(ClusterMessage cmsg) {
  618           try {
  619               if ( cmsg instanceof SessionMessage ) {
  620                   SessionMessage msg = (SessionMessage)cmsg;
  621                   messageReceived(msg,
  622                                   msg.getAddress() != null ? (Member) msg.getAddress() : null);
  623               }
  624           } catch(Throwable ex){
  625               log.error("InMemoryReplicationManager.messageDataReceived()", ex);
  626           }//catch
  627       }
  628   
  629       public boolean isStateTransferred() {
  630           return stateTransferred;
  631       }
  632   
  633       public void setName(String name) {
  634           this.name = name;
  635       }
  636       public boolean isNotifyListenersOnReplication() {
  637           return notifyListenersOnReplication;
  638       }
  639       public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
  640           this.notifyListenersOnReplication = notifyListenersOnReplication;
  641       }
  642   
  643   
  644       /* 
  645        * @see org.apache.catalina.cluster.ClusterManager#getCluster()
  646        */
  647       public CatalinaCluster getCluster() {
  648           return cluster;
  649       }
  650   
  651   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » session » [javadoc | source]