Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » ha » tcp » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    * 
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    * 
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.catalina.ha.tcp;
   19   
   20   import java.io.IOException;
   21   import java.util.StringTokenizer;
   22   import java.util.regex.Pattern;
   23   import java.util.ArrayList;
   24   import java.util.List;
   25   import java.util.Iterator;
   26   import javax.servlet.ServletException;
   27   
   28   import org.apache.catalina.Manager;
   29   import org.apache.catalina.Session;
   30   import org.apache.catalina.Context;
   31   import org.apache.catalina.core.StandardContext;
   32   import org.apache.catalina.ha.CatalinaCluster;
   33   import org.apache.catalina.ha.ClusterManager;
   34   import org.apache.catalina.ha.ClusterMessage;
   35   import org.apache.catalina.ha.ClusterSession;
   36   import org.apache.catalina.ha.ClusterValve;
   37   import org.apache.catalina.ha.session.DeltaManager;
   38   import org.apache.catalina.ha.session.DeltaSession;
   39   import org.apache.catalina.connector.Request;
   40   import org.apache.catalina.connector.Response;
   41   import org.apache.catalina.util.StringManager;
   42   import org.apache.catalina.valves.ValveBase;
   43   
   44   /**
   45    * <p>Implementation of a Valve that logs interesting contents from the
   46    * specified Request (before processing) and the corresponding Response
   47    * (after processing).  It is especially useful in debugging problems
   48    * related to headers and cookies.</p>
   49    *
   50    * <p>This Valve may be attached to any Container, depending on the granularity
   51    * of the logging you wish to perform.</p>
   52    *
   53    * <p>primaryIndicator=true, then the request attribute <i>org.apache.catalina.ha.tcp.isPrimarySession.</i>
   54    * is set true, when request processing is at sessions primary node.
   55    * </p>
   56    *
   57    * @author Craig R. McClanahan
   58    * @author Filip Hanik
   59    * @author Peter Rossbach
   60    * @version $Revision: 613805 $ $Date: 2008-01-21 10:19:02 +0100 (lun., 21 janv. 2008) $
   61    */
   62   
   63   public class ReplicationValve
   64       extends ValveBase implements ClusterValve {
   65       
   66       private static org.apache.juli.logging.Log log =
   67           org.apache.juli.logging.LogFactory.getLog( ReplicationValve.class );
   68   
   69       // ----------------------------------------------------- Instance Variables
   70   
   71       /**
   72        * The descriptive information related to this implementation.
   73        */
   74       private static final String info =
   75           "org.apache.catalina.ha.tcp.ReplicationValve/2.0";
   76   
   77   
   78       /**
   79        * The StringManager for this package.
   80        */
   81       protected static StringManager sm =
   82           StringManager.getManager(Constants.Package);
   83   
   84       private CatalinaCluster cluster = null ;
   85   
   86       /**
   87        * holds file endings to not call for like images and others
   88        */
   89       protected java.util.regex.Pattern[] reqFilters = new java.util.regex.Pattern[0];
   90       
   91       /**
   92        * Orginal filter 
   93        */
   94       protected String filter ;
   95       
   96       /**
   97        * crossContext session container 
   98        */
   99       protected ThreadLocal crossContextSessions = new ThreadLocal() ;
  100       
  101       /**
  102        * doProcessingStats (default = off)
  103        */
  104       protected boolean doProcessingStats = false;
  105       
  106       protected long totalRequestTime = 0;
  107       protected long totalSendTime = 0;
  108       protected long nrOfRequests = 0;
  109       protected long lastSendTime = 0;
  110       protected long nrOfFilterRequests = 0;
  111       protected long nrOfSendRequests = 0;
  112       protected long nrOfCrossContextSendRequests = 0;
  113       
  114       /**
  115        * must primary change indicator set 
  116        */
  117       protected boolean primaryIndicator = false ;
  118       
  119       /**
  120        * Name of primary change indicator as request attribute
  121        */
  122       protected String primaryIndicatorName = "org.apache.catalina.ha.tcp.isPrimarySession";
  123      
  124       // ------------------------------------------------------------- Properties
  125   
  126       public ReplicationValve() {
  127       }
  128       
  129       /**
  130        * Return descriptive information about this Valve implementation.
  131        */
  132       public String getInfo() {
  133   
  134           return (info);
  135   
  136       }
  137       
  138       /**
  139        * @return Returns the cluster.
  140        */
  141       public CatalinaCluster getCluster() {
  142           return cluster;
  143       }
  144       
  145       /**
  146        * @param cluster The cluster to set.
  147        */
  148       public void setCluster(CatalinaCluster cluster) {
  149           this.cluster = cluster;
  150       }
  151    
  152       /**
  153        * @return Returns the filter
  154        */
  155       public String getFilter() {
  156          return filter ;
  157       }
  158   
  159       /**
  160        * compile filter string to regular expressions
  161        * @see Pattern#compile(java.lang.String)
  162        * @param filter
  163        *            The filter to set.
  164        */
  165       public void setFilter(String filter) {
  166           if (log.isDebugEnabled())
  167               log.debug(sm.getString("ReplicationValve.filter.loading", filter));
  168           this.filter = filter;
  169           StringTokenizer t = new StringTokenizer(filter, ";");
  170           this.reqFilters = new Pattern[t.countTokens()];
  171           int i = 0;
  172           while (t.hasMoreTokens()) {
  173               String s = t.nextToken();
  174               if (log.isTraceEnabled())
  175                   log.trace(sm.getString("ReplicationValve.filter.token", s));
  176               try {
  177                   reqFilters[i++] = Pattern.compile(s);
  178               } catch (Exception x) {
  179                   log.error(sm.getString("ReplicationValve.filter.token.failure",
  180                           s), x);
  181               }
  182           }
  183       }
  184   
  185       /**
  186        * @return Returns the primaryIndicator.
  187        */
  188       public boolean isPrimaryIndicator() {
  189           return primaryIndicator;
  190       }
  191   
  192       /**
  193        * @param primaryIndicator The primaryIndicator to set.
  194        */
  195       public void setPrimaryIndicator(boolean primaryIndicator) {
  196           this.primaryIndicator = primaryIndicator;
  197       }
  198       
  199       /**
  200        * @return Returns the primaryIndicatorName.
  201        */
  202       public String getPrimaryIndicatorName() {
  203           return primaryIndicatorName;
  204       }
  205       
  206       /**
  207        * @param primaryIndicatorName The primaryIndicatorName to set.
  208        */
  209       public void setPrimaryIndicatorName(String primaryIndicatorName) {
  210           this.primaryIndicatorName = primaryIndicatorName;
  211       }
  212       
  213       /**
  214        * Calc processing stats
  215        */
  216       public boolean doStatistics() {
  217           return doProcessingStats;
  218       }
  219   
  220       /**
  221        * Set Calc processing stats
  222        * @see #resetStatistics()
  223        */
  224       public void setStatistics(boolean doProcessingStats) {
  225           this.doProcessingStats = doProcessingStats;
  226       }
  227   
  228       /**
  229        * @return Returns the lastSendTime.
  230        */
  231       public long getLastSendTime() {
  232           return lastSendTime;
  233       }
  234       
  235       /**
  236        * @return Returns the nrOfRequests.
  237        */
  238       public long getNrOfRequests() {
  239           return nrOfRequests;
  240       }
  241       
  242       /**
  243        * @return Returns the nrOfFilterRequests.
  244        */
  245       public long getNrOfFilterRequests() {
  246           return nrOfFilterRequests;
  247       }
  248   
  249       /**
  250        * @return Returns the nrOfCrossContextSendRequests.
  251        */
  252       public long getNrOfCrossContextSendRequests() {
  253           return nrOfCrossContextSendRequests;
  254       }
  255   
  256       /**
  257        * @return Returns the nrOfSendRequests.
  258        */
  259       public long getNrOfSendRequests() {
  260           return nrOfSendRequests;
  261       }
  262   
  263       /**
  264        * @return Returns the totalRequestTime.
  265        */
  266       public long getTotalRequestTime() {
  267           return totalRequestTime;
  268       }
  269       
  270       /**
  271        * @return Returns the totalSendTime.
  272        */
  273       public long getTotalSendTime() {
  274           return totalSendTime;
  275       }
  276   
  277       /**
  278        * @return Returns the reqFilters.
  279        */
  280       protected java.util.regex.Pattern[] getReqFilters() {
  281           return reqFilters;
  282       }
  283       
  284       /**
  285        * @param reqFilters The reqFilters to set.
  286        */
  287       protected void setReqFilters(java.util.regex.Pattern[] reqFilters) {
  288           this.reqFilters = reqFilters;
  289       }
  290       
  291       
  292       // --------------------------------------------------------- Public Methods
  293       
  294       /**
  295        * Register all cross context sessions inside endAccess.
  296        * Use a list with contains check, that the Portlet API can include a lot of fragments from same or
  297        * different applications with session changes.
  298        *
  299        * @param session cross context session
  300        */
  301       public void registerReplicationSession(DeltaSession session) {
  302           List sessions = (List)crossContextSessions.get();
  303           if(sessions != null) {
  304               if(!sessions.contains(session)) {
  305                   if(log.isDebugEnabled())
  306                       log.debug(sm.getString("ReplicationValve.crossContext.registerSession",
  307                           session.getIdInternal(),
  308                           session.getManager().getContainer().getName()));
  309                   sessions.add(session);
  310               }
  311           }
  312       }
  313   
  314       /**
  315        * Log the interesting request parameters, invoke the next Valve in the
  316        * sequence, and log the interesting response parameters.
  317        *
  318        * @param request The servlet request to be processed
  319        * @param response The servlet response to be created
  320        *
  321        * @exception IOException if an input/output error occurs
  322        * @exception ServletException if a servlet error occurs
  323        */
  324       public void invoke(Request request, Response response)
  325           throws IOException, ServletException
  326       {
  327           long totalstart = 0;
  328   
  329           //this happens before the request
  330           if(doStatistics()) {
  331               totalstart = System.currentTimeMillis();
  332           }
  333           if (primaryIndicator) {
  334               createPrimaryIndicator(request) ;
  335           }
  336           Context context = request.getContext();
  337           boolean isCrossContext = context != null
  338                   && context instanceof StandardContext
  339                   && ((StandardContext) context).getCrossContext();
  340           try {
  341               if(isCrossContext) {
  342                   if(log.isDebugEnabled())
  343                       log.debug(sm.getString("ReplicationValve.crossContext.add"));
  344                   //FIXME add Pool of Arraylists
  345                   crossContextSessions.set(new ArrayList());
  346               }
  347               getNext().invoke(request, response);
  348               Manager manager = request.getContext().getManager();
  349               if (manager != null && manager instanceof ClusterManager) {
  350                   ClusterManager clusterManager = (ClusterManager) manager;
  351                   CatalinaCluster containerCluster = (CatalinaCluster) getContainer().getCluster();
  352                   if (containerCluster == null) {
  353                       if (log.isWarnEnabled())
  354                           log.warn(sm.getString("ReplicationValve.nocluster"));
  355                       return;
  356                   }
  357                   // valve cluster can access manager - other cluster handle replication 
  358                   // at host level - hopefully!
  359                   if(containerCluster.getManager(clusterManager.getName()) == null)
  360                       return ;
  361                   if(containerCluster.hasMembers()) {
  362                       sendReplicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster);
  363                   } else {
  364                       resetReplicationRequest(request,isCrossContext);
  365                   }        
  366               }
  367           } finally {
  368               // Array must be remove: Current master request send endAccess at recycle. 
  369               // Don't register this request session again!
  370               if(isCrossContext) {
  371                   if(log.isDebugEnabled())
  372                       log.debug(sm.getString("ReplicationValve.crossContext.remove"));
  373                   // crossContextSessions.remove() only exist at Java 5
  374                   // register ArrayList at a pool
  375                   crossContextSessions.set(null);
  376               }
  377           }
  378       }
  379   
  380       
  381       /**
  382        * reset the active statitics 
  383        */
  384       public void resetStatistics() {
  385           totalRequestTime = 0 ;
  386           totalSendTime = 0 ;
  387           lastSendTime = 0 ;
  388           nrOfFilterRequests = 0 ;
  389           nrOfRequests = 0 ;
  390           nrOfSendRequests = 0;
  391           nrOfCrossContextSendRequests = 0;
  392       }
  393       
  394       /**
  395        * Return a String rendering of this object.
  396        */
  397       public String toString() {
  398   
  399           StringBuffer sb = new StringBuffer("ReplicationValve[");
  400           if (container != null)
  401               sb.append(container.getName());
  402           sb.append("]");
  403           return (sb.toString());
  404   
  405       }
  406   
  407       // --------------------------------------------------------- Protected Methods
  408   
  409       /**
  410        * @param request
  411        * @param totalstart
  412        * @param isCrossContext
  413        * @param clusterManager
  414        * @param containerCluster
  415        */
  416       protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) {
  417           //this happens after the request
  418           long start = 0;
  419           if(doStatistics()) {
  420               start = System.currentTimeMillis();
  421           }
  422           try {
  423               // send invalid sessions
  424               // DeltaManager returns String[0]
  425               if (!(clusterManager instanceof DeltaManager))
  426                   sendInvalidSessions(clusterManager, containerCluster);
  427               // send replication
  428               sendSessionReplicationMessage(request, clusterManager, containerCluster);
  429               if(isCrossContext)
  430                   sendCrossContextSession(containerCluster);
  431           } catch (Exception x) {
  432               // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
  433               log.error(sm.getString("ReplicationValve.send.failure"), x);
  434           } finally {
  435               // FIXME this stats update are not cheap!!
  436               if(doStatistics()) {
  437                   updateStats(totalstart,start);
  438               }
  439           }
  440       }
  441   
  442       /**
  443        * Send all changed cross context sessions to backups
  444        * @param containerCluster
  445        */
  446       protected void sendCrossContextSession(CatalinaCluster containerCluster) {
  447           Object sessions = crossContextSessions.get();
  448           if(sessions != null && sessions instanceof List
  449                   && ((List)sessions).size() >0) {
  450               for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) {          
  451                   Session session = (Session)iter.next();
  452                   if(log.isDebugEnabled())
  453                       log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",  
  454                               session.getManager().getContainer().getName() ));
  455                   sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
  456                   if(doStatistics()) {
  457                       nrOfCrossContextSendRequests++;
  458                   }
  459               }
  460           }
  461       }
  462     
  463       /**
  464        * Fix memory leak for long sessions with many changes, when no backup member exists!
  465        * @param request current request after responce is generated
  466        * @param isCrossContext check crosscontext threadlocal
  467        */
  468       protected void resetReplicationRequest(Request request, boolean isCrossContext) {
  469           Session contextSession = request.getSessionInternal(false);
  470           if(contextSession != null && contextSession instanceof DeltaSession){
  471               resetDeltaRequest(contextSession);
  472               ((DeltaSession)contextSession).setPrimarySession(true);
  473           }
  474           if(isCrossContext) {
  475               Object sessions = crossContextSessions.get();
  476               if(sessions != null && sessions instanceof List
  477                  && ((List)sessions).size() >0) {
  478                   Iterator iter = ((List)sessions).iterator();
  479                   for(; iter.hasNext() ;) {          
  480                       Session session = (Session)iter.next();
  481                       resetDeltaRequest(session);
  482                       if(session instanceof DeltaSession)
  483                           ((DeltaSession)contextSession).setPrimarySession(true);
  484   
  485                   }
  486               }
  487           }                     
  488       }
  489   
  490       /**
  491        * Reset DeltaRequest from session
  492        * @param session HttpSession from current request or cross context session
  493        */
  494       protected void resetDeltaRequest(Session session) {
  495           if(log.isDebugEnabled()) {
  496               log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , 
  497                   session.getManager().getContainer().getName() ));
  498           }
  499           ((DeltaSession)session).resetDeltaRequest();
  500       }
  501   
  502       /**
  503        * Send Cluster Replication Request
  504        * @param request current request
  505        * @param manager session manager
  506        * @param cluster replication cluster
  507        */
  508       protected void sendSessionReplicationMessage(Request request,
  509               ClusterManager manager, CatalinaCluster cluster) {
  510           Session session = request.getSessionInternal(false);
  511           if (session != null) {
  512               String uri = request.getDecodedRequestURI();
  513               // request without session change
  514               if (!isRequestWithoutSessionChange(uri)) {
  515                   if (log.isDebugEnabled())
  516                       log.debug(sm.getString("ReplicationValve.invoke.uri", uri));
  517                   sendMessage(session,manager,cluster);
  518               } else
  519                   if(doStatistics())
  520                       nrOfFilterRequests++;
  521           }
  522   
  523       }
  524   
  525      /**
  526       * Send message delta message from request session 
  527       * @param request current request
  528       * @param manager session manager
  529       * @param cluster replication cluster
  530       */
  531       protected void sendMessage(Session session,
  532                ClusterManager manager, CatalinaCluster cluster) {
  533           String id = session.getIdInternal();
  534           if (id != null) {
  535               send(manager, cluster, id);
  536           }
  537       }
  538   
  539       /**
  540        * send manager requestCompleted message to cluster
  541        * @param manager SessionManager
  542        * @param cluster replication cluster
  543        * @param sessionId sessionid from the manager
  544        * @see DeltaManager#requestCompleted(String)
  545        * @see SimpleTcpCluster#send(ClusterMessage)
  546        */
  547       protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) {
  548           ClusterMessage msg = manager.requestCompleted(sessionId);
  549           if (msg != null) {
  550               if(manager.doDomainReplication()) {
  551                   cluster.sendClusterDomain(msg);
  552               } else {
  553                   cluster.send(msg);
  554               }
  555               if(doStatistics())
  556                   nrOfSendRequests++;
  557           }
  558       }
  559       
  560       /**
  561        * check for session invalidations
  562        * @param manager
  563        * @param cluster
  564        */
  565       protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster cluster) {
  566           String[] invalidIds=manager.getInvalidatedSessions();
  567           if ( invalidIds.length > 0 ) {
  568               for ( int i=0;i<invalidIds.length; i++ ) {
  569                   try {
  570                       send(manager,cluster,invalidIds[i]);
  571                   } catch ( Exception x ) {
  572                       log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
  573                   }
  574               }
  575           }
  576       }
  577       
  578       /**
  579        * is request without possible session change
  580        * @param uri The request uri
  581        * @return True if no session change
  582        */
  583       protected boolean isRequestWithoutSessionChange(String uri) {
  584   
  585           boolean filterfound = false;
  586   
  587           for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {
  588               java.util.regex.Matcher matcher = reqFilters[i].matcher(uri);
  589               filterfound = matcher.matches();
  590           }
  591           return filterfound;
  592       }
  593   
  594       /**
  595        * protocol cluster replications stats
  596        * @param requestTime
  597        * @param clusterTime
  598        */
  599       protected  void updateStats(long requestTime, long clusterTime) {
  600           synchronized(this) {
  601               lastSendTime=System.currentTimeMillis();
  602               totalSendTime+=lastSendTime - clusterTime;
  603               totalRequestTime+=lastSendTime - requestTime;
  604               nrOfRequests++;
  605           }
  606           if(log.isInfoEnabled()) {
  607               if ( (nrOfRequests % 100) == 0 ) {
  608                    log.info(sm.getString("ReplicationValve.stats",
  609                        new Object[]{
  610                            new Long(totalRequestTime/nrOfRequests),
  611                            new Long(totalSendTime/nrOfRequests),
  612                            new Long(nrOfRequests),
  613                            new Long(nrOfSendRequests),
  614                            new Long(nrOfCrossContextSendRequests),
  615                            new Long(nrOfFilterRequests),
  616                            new Long(totalRequestTime),
  617                            new Long(totalSendTime)}));
  618                }
  619           }
  620       }
  621   
  622   
  623       /**
  624        * Mark Request that processed at primary node with attribute
  625        * primaryIndicatorName
  626        * 
  627        * @param request
  628        * @throws IOException
  629        */
  630       protected void createPrimaryIndicator(Request request) throws IOException {
  631           String id = request.getRequestedSessionId();
  632           if ((id != null) && (id.length() > 0)) {
  633               Manager manager = request.getContext().getManager();
  634               Session session = manager.findSession(id);
  635               if (session instanceof ClusterSession) {
  636                   ClusterSession cses = (ClusterSession) session;
  637                   if (cses != null) {
  638                       if (log.isDebugEnabled())
  639                           log.debug(sm.getString(
  640                                   "ReplicationValve.session.indicator", request.getContext().getName(),id,
  641                                   primaryIndicatorName, cses.isPrimarySession()));
  642                       request.setAttribute(primaryIndicatorName, cses.isPrimarySession()?Boolean.TRUE:Boolean.FALSE);
  643                   }
  644               } else {
  645                   if (log.isDebugEnabled()) {
  646                       if (session != null) {
  647                           log.debug(sm.getString(
  648                                   "ReplicationValve.session.found", request.getContext().getName(),id));
  649                       } else {
  650                           log.debug(sm.getString(
  651                                   "ReplicationValve.session.invalid", request.getContext().getName(),id));
  652                       }
  653                   }
  654               }
  655           }
  656       }
  657   
  658   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » ha » tcp » [javadoc | source]