1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.catalina.ha.tcp;
19
20 import java.io.IOException;
21 import java.util.StringTokenizer;
22 import java.util.regex.Pattern;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Iterator;
26 import javax.servlet.ServletException;
27
28 import org.apache.catalina.Manager;
29 import org.apache.catalina.Session;
30 import org.apache.catalina.Context;
31 import org.apache.catalina.core.StandardContext;
32 import org.apache.catalina.ha.CatalinaCluster;
33 import org.apache.catalina.ha.ClusterManager;
34 import org.apache.catalina.ha.ClusterMessage;
35 import org.apache.catalina.ha.ClusterSession;
36 import org.apache.catalina.ha.ClusterValve;
37 import org.apache.catalina.ha.session.DeltaManager;
38 import org.apache.catalina.ha.session.DeltaSession;
39 import org.apache.catalina.connector.Request;
40 import org.apache.catalina.connector.Response;
41 import org.apache.catalina.util.StringManager;
42 import org.apache.catalina.valves.ValveBase;
43
44 /**
45 * <p>Implementation of a Valve that logs interesting contents from the
46 * specified Request (before processing) and the corresponding Response
47 * (after processing). It is especially useful in debugging problems
48 * related to headers and cookies.</p>
49 *
50 * <p>This Valve may be attached to any Container, depending on the granularity
51 * of the logging you wish to perform.</p>
52 *
53 * <p>primaryIndicator=true, then the request attribute <i>org.apache.catalina.ha.tcp.isPrimarySession.</i>
54 * is set true, when request processing is at sessions primary node.
55 * </p>
56 *
57 * @author Craig R. McClanahan
58 * @author Filip Hanik
59 * @author Peter Rossbach
60 * @version $Revision: 613805 $ $Date: 2008-01-21 10:19:02 +0100 (lun., 21 janv. 2008) $
61 */
62
63 public class ReplicationValve
64 extends ValveBase implements ClusterValve {
65
66 private static org.apache.juli.logging.Log log =
67 org.apache.juli.logging.LogFactory.getLog( ReplicationValve.class );
68
69 // ----------------------------------------------------- Instance Variables
70
71 /**
72 * The descriptive information related to this implementation.
73 */
74 private static final String info =
75 "org.apache.catalina.ha.tcp.ReplicationValve/2.0";
76
77
78 /**
79 * The StringManager for this package.
80 */
81 protected static StringManager sm =
82 StringManager.getManager(Constants.Package);
83
84 private CatalinaCluster cluster = null ;
85
86 /**
87 * holds file endings to not call for like images and others
88 */
89 protected java.util.regex.Pattern[] reqFilters = new java.util.regex.Pattern[0];
90
91 /**
92 * Orginal filter
93 */
94 protected String filter ;
95
96 /**
97 * crossContext session container
98 */
99 protected ThreadLocal crossContextSessions = new ThreadLocal() ;
100
101 /**
102 * doProcessingStats (default = off)
103 */
104 protected boolean doProcessingStats = false;
105
106 protected long totalRequestTime = 0;
107 protected long totalSendTime = 0;
108 protected long nrOfRequests = 0;
109 protected long lastSendTime = 0;
110 protected long nrOfFilterRequests = 0;
111 protected long nrOfSendRequests = 0;
112 protected long nrOfCrossContextSendRequests = 0;
113
114 /**
115 * must primary change indicator set
116 */
117 protected boolean primaryIndicator = false ;
118
119 /**
120 * Name of primary change indicator as request attribute
121 */
122 protected String primaryIndicatorName = "org.apache.catalina.ha.tcp.isPrimarySession";
123
124 // ------------------------------------------------------------- Properties
125
126 public ReplicationValve() {
127 }
128
129 /**
130 * Return descriptive information about this Valve implementation.
131 */
132 public String getInfo() {
133
134 return (info);
135
136 }
137
138 /**
139 * @return Returns the cluster.
140 */
141 public CatalinaCluster getCluster() {
142 return cluster;
143 }
144
145 /**
146 * @param cluster The cluster to set.
147 */
148 public void setCluster(CatalinaCluster cluster) {
149 this.cluster = cluster;
150 }
151
152 /**
153 * @return Returns the filter
154 */
155 public String getFilter() {
156 return filter ;
157 }
158
159 /**
160 * compile filter string to regular expressions
161 * @see Pattern#compile(java.lang.String)
162 * @param filter
163 * The filter to set.
164 */
165 public void setFilter(String filter) {
166 if (log.isDebugEnabled())
167 log.debug(sm.getString("ReplicationValve.filter.loading", filter));
168 this.filter = filter;
169 StringTokenizer t = new StringTokenizer(filter, ";");
170 this.reqFilters = new Pattern[t.countTokens()];
171 int i = 0;
172 while (t.hasMoreTokens()) {
173 String s = t.nextToken();
174 if (log.isTraceEnabled())
175 log.trace(sm.getString("ReplicationValve.filter.token", s));
176 try {
177 reqFilters[i++] = Pattern.compile(s);
178 } catch (Exception x) {
179 log.error(sm.getString("ReplicationValve.filter.token.failure",
180 s), x);
181 }
182 }
183 }
184
185 /**
186 * @return Returns the primaryIndicator.
187 */
188 public boolean isPrimaryIndicator() {
189 return primaryIndicator;
190 }
191
192 /**
193 * @param primaryIndicator The primaryIndicator to set.
194 */
195 public void setPrimaryIndicator(boolean primaryIndicator) {
196 this.primaryIndicator = primaryIndicator;
197 }
198
199 /**
200 * @return Returns the primaryIndicatorName.
201 */
202 public String getPrimaryIndicatorName() {
203 return primaryIndicatorName;
204 }
205
206 /**
207 * @param primaryIndicatorName The primaryIndicatorName to set.
208 */
209 public void setPrimaryIndicatorName(String primaryIndicatorName) {
210 this.primaryIndicatorName = primaryIndicatorName;
211 }
212
213 /**
214 * Calc processing stats
215 */
216 public boolean doStatistics() {
217 return doProcessingStats;
218 }
219
220 /**
221 * Set Calc processing stats
222 * @see #resetStatistics()
223 */
224 public void setStatistics(boolean doProcessingStats) {
225 this.doProcessingStats = doProcessingStats;
226 }
227
228 /**
229 * @return Returns the lastSendTime.
230 */
231 public long getLastSendTime() {
232 return lastSendTime;
233 }
234
235 /**
236 * @return Returns the nrOfRequests.
237 */
238 public long getNrOfRequests() {
239 return nrOfRequests;
240 }
241
242 /**
243 * @return Returns the nrOfFilterRequests.
244 */
245 public long getNrOfFilterRequests() {
246 return nrOfFilterRequests;
247 }
248
249 /**
250 * @return Returns the nrOfCrossContextSendRequests.
251 */
252 public long getNrOfCrossContextSendRequests() {
253 return nrOfCrossContextSendRequests;
254 }
255
256 /**
257 * @return Returns the nrOfSendRequests.
258 */
259 public long getNrOfSendRequests() {
260 return nrOfSendRequests;
261 }
262
263 /**
264 * @return Returns the totalRequestTime.
265 */
266 public long getTotalRequestTime() {
267 return totalRequestTime;
268 }
269
270 /**
271 * @return Returns the totalSendTime.
272 */
273 public long getTotalSendTime() {
274 return totalSendTime;
275 }
276
277 /**
278 * @return Returns the reqFilters.
279 */
280 protected java.util.regex.Pattern[] getReqFilters() {
281 return reqFilters;
282 }
283
284 /**
285 * @param reqFilters The reqFilters to set.
286 */
287 protected void setReqFilters(java.util.regex.Pattern[] reqFilters) {
288 this.reqFilters = reqFilters;
289 }
290
291
292 // --------------------------------------------------------- Public Methods
293
294 /**
295 * Register all cross context sessions inside endAccess.
296 * Use a list with contains check, that the Portlet API can include a lot of fragments from same or
297 * different applications with session changes.
298 *
299 * @param session cross context session
300 */
301 public void registerReplicationSession(DeltaSession session) {
302 List sessions = (List)crossContextSessions.get();
303 if(sessions != null) {
304 if(!sessions.contains(session)) {
305 if(log.isDebugEnabled())
306 log.debug(sm.getString("ReplicationValve.crossContext.registerSession",
307 session.getIdInternal(),
308 session.getManager().getContainer().getName()));
309 sessions.add(session);
310 }
311 }
312 }
313
314 /**
315 * Log the interesting request parameters, invoke the next Valve in the
316 * sequence, and log the interesting response parameters.
317 *
318 * @param request The servlet request to be processed
319 * @param response The servlet response to be created
320 *
321 * @exception IOException if an input/output error occurs
322 * @exception ServletException if a servlet error occurs
323 */
324 public void invoke(Request request, Response response)
325 throws IOException, ServletException
326 {
327 long totalstart = 0;
328
329 //this happens before the request
330 if(doStatistics()) {
331 totalstart = System.currentTimeMillis();
332 }
333 if (primaryIndicator) {
334 createPrimaryIndicator(request) ;
335 }
336 Context context = request.getContext();
337 boolean isCrossContext = context != null
338 && context instanceof StandardContext
339 && ((StandardContext) context).getCrossContext();
340 try {
341 if(isCrossContext) {
342 if(log.isDebugEnabled())
343 log.debug(sm.getString("ReplicationValve.crossContext.add"));
344 //FIXME add Pool of Arraylists
345 crossContextSessions.set(new ArrayList());
346 }
347 getNext().invoke(request, response);
348 Manager manager = request.getContext().getManager();
349 if (manager != null && manager instanceof ClusterManager) {
350 ClusterManager clusterManager = (ClusterManager) manager;
351 CatalinaCluster containerCluster = (CatalinaCluster) getContainer().getCluster();
352 if (containerCluster == null) {
353 if (log.isWarnEnabled())
354 log.warn(sm.getString("ReplicationValve.nocluster"));
355 return;
356 }
357 // valve cluster can access manager - other cluster handle replication
358 // at host level - hopefully!
359 if(containerCluster.getManager(clusterManager.getName()) == null)
360 return ;
361 if(containerCluster.hasMembers()) {
362 sendReplicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster);
363 } else {
364 resetReplicationRequest(request,isCrossContext);
365 }
366 }
367 } finally {
368 // Array must be remove: Current master request send endAccess at recycle.
369 // Don't register this request session again!
370 if(isCrossContext) {
371 if(log.isDebugEnabled())
372 log.debug(sm.getString("ReplicationValve.crossContext.remove"));
373 // crossContextSessions.remove() only exist at Java 5
374 // register ArrayList at a pool
375 crossContextSessions.set(null);
376 }
377 }
378 }
379
380
381 /**
382 * reset the active statitics
383 */
384 public void resetStatistics() {
385 totalRequestTime = 0 ;
386 totalSendTime = 0 ;
387 lastSendTime = 0 ;
388 nrOfFilterRequests = 0 ;
389 nrOfRequests = 0 ;
390 nrOfSendRequests = 0;
391 nrOfCrossContextSendRequests = 0;
392 }
393
394 /**
395 * Return a String rendering of this object.
396 */
397 public String toString() {
398
399 StringBuffer sb = new StringBuffer("ReplicationValve[");
400 if (container != null)
401 sb.append(container.getName());
402 sb.append("]");
403 return (sb.toString());
404
405 }
406
407 // --------------------------------------------------------- Protected Methods
408
409 /**
410 * @param request
411 * @param totalstart
412 * @param isCrossContext
413 * @param clusterManager
414 * @param containerCluster
415 */
416 protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) {
417 //this happens after the request
418 long start = 0;
419 if(doStatistics()) {
420 start = System.currentTimeMillis();
421 }
422 try {
423 // send invalid sessions
424 // DeltaManager returns String[0]
425 if (!(clusterManager instanceof DeltaManager))
426 sendInvalidSessions(clusterManager, containerCluster);
427 // send replication
428 sendSessionReplicationMessage(request, clusterManager, containerCluster);
429 if(isCrossContext)
430 sendCrossContextSession(containerCluster);
431 } catch (Exception x) {
432 // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
433 log.error(sm.getString("ReplicationValve.send.failure"), x);
434 } finally {
435 // FIXME this stats update are not cheap!!
436 if(doStatistics()) {
437 updateStats(totalstart,start);
438 }
439 }
440 }
441
442 /**
443 * Send all changed cross context sessions to backups
444 * @param containerCluster
445 */
446 protected void sendCrossContextSession(CatalinaCluster containerCluster) {
447 Object sessions = crossContextSessions.get();
448 if(sessions != null && sessions instanceof List
449 && ((List)sessions).size() >0) {
450 for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) {
451 Session session = (Session)iter.next();
452 if(log.isDebugEnabled())
453 log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",
454 session.getManager().getContainer().getName() ));
455 sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
456 if(doStatistics()) {
457 nrOfCrossContextSendRequests++;
458 }
459 }
460 }
461 }
462
463 /**
464 * Fix memory leak for long sessions with many changes, when no backup member exists!
465 * @param request current request after responce is generated
466 * @param isCrossContext check crosscontext threadlocal
467 */
468 protected void resetReplicationRequest(Request request, boolean isCrossContext) {
469 Session contextSession = request.getSessionInternal(false);
470 if(contextSession != null && contextSession instanceof DeltaSession){
471 resetDeltaRequest(contextSession);
472 ((DeltaSession)contextSession).setPrimarySession(true);
473 }
474 if(isCrossContext) {
475 Object sessions = crossContextSessions.get();
476 if(sessions != null && sessions instanceof List
477 && ((List)sessions).size() >0) {
478 Iterator iter = ((List)sessions).iterator();
479 for(; iter.hasNext() ;) {
480 Session session = (Session)iter.next();
481 resetDeltaRequest(session);
482 if(session instanceof DeltaSession)
483 ((DeltaSession)contextSession).setPrimarySession(true);
484
485 }
486 }
487 }
488 }
489
490 /**
491 * Reset DeltaRequest from session
492 * @param session HttpSession from current request or cross context session
493 */
494 protected void resetDeltaRequest(Session session) {
495 if(log.isDebugEnabled()) {
496 log.debug(sm.getString("ReplicationValve.resetDeltaRequest" ,
497 session.getManager().getContainer().getName() ));
498 }
499 ((DeltaSession)session).resetDeltaRequest();
500 }
501
502 /**
503 * Send Cluster Replication Request
504 * @param request current request
505 * @param manager session manager
506 * @param cluster replication cluster
507 */
508 protected void sendSessionReplicationMessage(Request request,
509 ClusterManager manager, CatalinaCluster cluster) {
510 Session session = request.getSessionInternal(false);
511 if (session != null) {
512 String uri = request.getDecodedRequestURI();
513 // request without session change
514 if (!isRequestWithoutSessionChange(uri)) {
515 if (log.isDebugEnabled())
516 log.debug(sm.getString("ReplicationValve.invoke.uri", uri));
517 sendMessage(session,manager,cluster);
518 } else
519 if(doStatistics())
520 nrOfFilterRequests++;
521 }
522
523 }
524
525 /**
526 * Send message delta message from request session
527 * @param request current request
528 * @param manager session manager
529 * @param cluster replication cluster
530 */
531 protected void sendMessage(Session session,
532 ClusterManager manager, CatalinaCluster cluster) {
533 String id = session.getIdInternal();
534 if (id != null) {
535 send(manager, cluster, id);
536 }
537 }
538
539 /**
540 * send manager requestCompleted message to cluster
541 * @param manager SessionManager
542 * @param cluster replication cluster
543 * @param sessionId sessionid from the manager
544 * @see DeltaManager#requestCompleted(String)
545 * @see SimpleTcpCluster#send(ClusterMessage)
546 */
547 protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) {
548 ClusterMessage msg = manager.requestCompleted(sessionId);
549 if (msg != null) {
550 if(manager.doDomainReplication()) {
551 cluster.sendClusterDomain(msg);
552 } else {
553 cluster.send(msg);
554 }
555 if(doStatistics())
556 nrOfSendRequests++;
557 }
558 }
559
560 /**
561 * check for session invalidations
562 * @param manager
563 * @param cluster
564 */
565 protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster cluster) {
566 String[] invalidIds=manager.getInvalidatedSessions();
567 if ( invalidIds.length > 0 ) {
568 for ( int i=0;i<invalidIds.length; i++ ) {
569 try {
570 send(manager,cluster,invalidIds[i]);
571 } catch ( Exception x ) {
572 log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
573 }
574 }
575 }
576 }
577
578 /**
579 * is request without possible session change
580 * @param uri The request uri
581 * @return True if no session change
582 */
583 protected boolean isRequestWithoutSessionChange(String uri) {
584
585 boolean filterfound = false;
586
587 for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {
588 java.util.regex.Matcher matcher = reqFilters[i].matcher(uri);
589 filterfound = matcher.matches();
590 }
591 return filterfound;
592 }
593
594 /**
595 * protocol cluster replications stats
596 * @param requestTime
597 * @param clusterTime
598 */
599 protected void updateStats(long requestTime, long clusterTime) {
600 synchronized(this) {
601 lastSendTime=System.currentTimeMillis();
602 totalSendTime+=lastSendTime - clusterTime;
603 totalRequestTime+=lastSendTime - requestTime;
604 nrOfRequests++;
605 }
606 if(log.isInfoEnabled()) {
607 if ( (nrOfRequests % 100) == 0 ) {
608 log.info(sm.getString("ReplicationValve.stats",
609 new Object[]{
610 new Long(totalRequestTime/nrOfRequests),
611 new Long(totalSendTime/nrOfRequests),
612 new Long(nrOfRequests),
613 new Long(nrOfSendRequests),
614 new Long(nrOfCrossContextSendRequests),
615 new Long(nrOfFilterRequests),
616 new Long(totalRequestTime),
617 new Long(totalSendTime)}));
618 }
619 }
620 }
621
622
623 /**
624 * Mark Request that processed at primary node with attribute
625 * primaryIndicatorName
626 *
627 * @param request
628 * @throws IOException
629 */
630 protected void createPrimaryIndicator(Request request) throws IOException {
631 String id = request.getRequestedSessionId();
632 if ((id != null) && (id.length() > 0)) {
633 Manager manager = request.getContext().getManager();
634 Session session = manager.findSession(id);
635 if (session instanceof ClusterSession) {
636 ClusterSession cses = (ClusterSession) session;
637 if (cses != null) {
638 if (log.isDebugEnabled())
639 log.debug(sm.getString(
640 "ReplicationValve.session.indicator", request.getContext().getName(),id,
641 primaryIndicatorName, cses.isPrimarySession()));
642 request.setAttribute(primaryIndicatorName, cses.isPrimarySession()?Boolean.TRUE:Boolean.FALSE);
643 }
644 } else {
645 if (log.isDebugEnabled()) {
646 if (session != null) {
647 log.debug(sm.getString(
648 "ReplicationValve.session.found", request.getContext().getName(),id));
649 } else {
650 log.debug(sm.getString(
651 "ReplicationValve.session.invalid", request.getContext().getName(),id));
652 }
653 }
654 }
655 }
656 }
657
658 }