1 /*
2 * Copyright 1999,2004 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.apache.catalina.cluster.session;
17
18 import java.io.IOException;
19
20 import org.apache.catalina.LifecycleException;
21 import org.apache.catalina.Session;
22 import org.apache.catalina.cluster.CatalinaCluster;
23 import org.apache.catalina.cluster.ClusterManager;
24 import org.apache.catalina.cluster.ClusterMessage;
25 import org.apache.catalina.cluster.Member;
26 import org.apache.catalina.realm.GenericPrincipal;
27 import org.apache.catalina.session.StandardManager;
28
29 /**
30 * Title: Tomcat Session Replication for Tomcat 4.0 <BR>
31 * Description: A very simple straight forward implementation of
32 * session replication of servers in a cluster.<BR>
33 * This session replication is implemented "live". By live
34 * I mean, when a session attribute is added into a session on Node A
35 * a message is broadcasted to other messages and setAttribute is called on the
36 * replicated sessions.<BR>
37 * A full description of this implementation can be found under
38 * <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR>
39 *
40 * Copyright: See apache license
41 * Company: www.filip.net
42 * @author <a href="mailto:mail@filip.net">Filip Hanik</a>
43 * @author Bela Ban (modifications for synchronous replication)
44 * @version 1.0 for TC 4.0
45 * Description: The InMemoryReplicationManager is a session manager that replicated
46 * session information in memory. It uses <a href="www.javagroups.com">JavaGroups</a> as
47 * a communication protocol to ensure guaranteed and ordered message delivery.
48 * JavaGroups also provides a very flexible protocol stack to ensure that the replication
49 * can be used in any environment.
50 * <BR><BR>
51 * The InMemoryReplicationManager extends the StandardManager hence it allows for us
52 * to inherit all the basic session management features like expiration, session listeners etc
53 * <BR><BR>
54 * To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages
55 * all defined in the SessionMessage class.<BR>
56 * When a session is replicated (not an attribute added/removed) the session is serialized into
57 * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
58 */
59 public class SimpleTcpReplicationManager extends StandardManager
60 implements ClusterManager
61 {
62 public static org.apache.commons.logging.Log log =
63 org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
64
65 //the channel configuration
66 protected String mChannelConfig = null;
67
68 //the group name
69 protected String mGroupName = "TomcatReplication";
70
71 //somehow start() gets called more than once
72 protected boolean mChannelStarted = false;
73
74 //log to screen
75 protected boolean mPrintToScreen = true;
76
77 protected boolean defaultMode = false;
78
79 protected boolean mManagerRunning = false;
80
81 /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc)
82 * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for
83 * all responses.
84 */
85 protected boolean synchronousReplication=true;
86
87 /** Set to true if we don't want the sessions to expire on shutdown */
88 protected boolean mExpireSessionsOnShutdown = true;
89
90 protected boolean useDirtyFlag = false;
91
92 protected String name;
93
94 protected boolean distributable = true;
95
96 protected CatalinaCluster cluster;
97
98 protected java.util.HashMap invalidatedSessions = new java.util.HashMap();
99
100 /**
101 * Flag to keep track if the state has been transferred or not
102 * Assumes false.
103 */
104 protected boolean stateTransferred = false;
105 private boolean notifyListenersOnReplication;
106 private boolean sendClusterDomainOnly = true ;
107
108 /**
109 * Constructor, just calls super()
110 *
111 */
112 public SimpleTcpReplicationManager()
113 {
114 super();
115 }
116
117 public boolean isSendClusterDomainOnly() {
118 return sendClusterDomainOnly;
119 }
120
121 /**
122 * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
123 */
124 public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
125 this.sendClusterDomainOnly = sendClusterDomainOnly;
126 }
127
128 /**
129 * @return Returns the defaultMode.
130 */
131 public boolean isDefaultMode() {
132 return defaultMode;
133 }
134 /**
135 * @param defaultMode The defaultMode to set.
136 */
137 public void setDefaultMode(boolean defaultMode) {
138 this.defaultMode = defaultMode;
139 }
140
141 public boolean isManagerRunning()
142 {
143 return mManagerRunning;
144 }
145
146 public void setUseDirtyFlag(boolean usedirtyflag)
147 {
148 this.useDirtyFlag = usedirtyflag;
149 }
150
151 public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
152 {
153 mExpireSessionsOnShutdown = expireSessionsOnShutdown;
154 }
155
156 public void setCluster(CatalinaCluster cluster) {
157 if(log.isDebugEnabled())
158 log.debug("Cluster associated with SimpleTcpReplicationManager");
159 this.cluster = cluster;
160 }
161
162 public boolean getExpireSessionsOnShutdown()
163 {
164 return mExpireSessionsOnShutdown;
165 }
166
167 public void setPrintToScreen(boolean printtoscreen)
168 {
169 if(log.isDebugEnabled())
170 log.debug("Setting screen debug to:"+printtoscreen);
171 mPrintToScreen = printtoscreen;
172 }
173
174 public void setSynchronousReplication(boolean flag)
175 {
176 synchronousReplication=flag;
177 }
178
179 /**
180 * Override persistence since they don't go hand in hand with replication for now.
181 */
182 public void unload() throws IOException {
183 if ( !getDistributable() ) {
184 super.unload();
185 }
186 }
187
188 /**
189 * Creates a HTTP session.
190 * Most of the code in here is copied from the StandardManager.
191 * This is not pretty, yeah I know, but it was necessary since the
192 * StandardManager had hard coded the session instantiation to the a
193 * StandardSession, when we actually want to instantiate a ReplicatedSession<BR>
194 * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
195 * nodes in the cluster that this session has been created.
196 * @param notify - if set to true the other nodes in the cluster will be notified.
197 * This flag is needed so that we can create a session before we deserialize
198 * a replicated one
199 *
200 * @see ReplicatedSession
201 */
202 protected Session createSession(String sessionId, boolean notify, boolean setId)
203 {
204
205 //inherited from the basic manager
206 if ((getMaxActiveSessions() >= 0) &&
207 (sessions.size() >= getMaxActiveSessions()))
208 throw new IllegalStateException(sm.getString("standardManager.createSession.ise"));
209
210
211 Session session = new ReplicatedSession(this);
212
213 // Initialize the properties of the new session and return it
214 session.setNew(true);
215 session.setValid(true);
216 session.setCreationTime(System.currentTimeMillis());
217 session.setMaxInactiveInterval(this.maxInactiveInterval);
218 if(sessionId == null)
219 sessionId = generateSessionId();
220 if ( setId ) session.setId(sessionId);
221 if ( notify && (cluster!=null) ) {
222 ((ReplicatedSession)session).setIsDirty(true);
223 }
224 return (session);
225 }//createSession
226
227 //=========================================================================
228 // OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
229 //=========================================================================
230
231 /**
232 * Construct and return a new session object, based on the default
233 * settings specified by this Manager's properties. The session
234 * id will be assigned by this method, and available via the getId()
235 * method of the returned session. If a new session cannot be created
236 * for any reason, return <code>null</code>.
237 *
238 * @exception IllegalStateException if a new session cannot be
239 * instantiated for any reason
240 */
241 public Session createSession(String sessionId)
242 {
243 //create a session and notify the other nodes in the cluster
244 Session session = createSession(sessionId,getDistributable(),true);
245 add(session);
246 return session;
247 }
248
249 public void sessionInvalidated(String sessionId) {
250 synchronized ( invalidatedSessions ) {
251 invalidatedSessions.put(sessionId, sessionId);
252 }
253 }
254
255 public String[] getInvalidatedSessions() {
256 synchronized ( invalidatedSessions ) {
257 String[] result = new String[invalidatedSessions.size()];
258 invalidatedSessions.values().toArray(result);
259 return result;
260 }
261
262 }
263
264 public ClusterMessage requestCompleted(String sessionId)
265 {
266 if ( !getDistributable() ) {
267 log.warn("Received requestCompleted message, although this context["+
268 getName()+"] is not distributable. Ignoring message");
269 return null;
270 }
271 //notify javagroups
272 try
273 {
274 if ( invalidatedSessions.get(sessionId) != null ) {
275 synchronized ( invalidatedSessions ) {
276 invalidatedSessions.remove(sessionId);
277 SessionMessage msg = new SessionMessageImpl(name,
278 SessionMessage.EVT_SESSION_EXPIRED,
279 null,
280 sessionId,
281 sessionId);
282 return msg;
283 }
284 } else {
285 ReplicatedSession session = (ReplicatedSession) findSession(
286 sessionId);
287 if (session != null) {
288 //return immediately if the session is not dirty
289 if (useDirtyFlag && (!session.isDirty())) {
290 //but before we return doing nothing,
291 //see if we should send
292 //an updated last access message so that
293 //sessions across cluster dont expire
294 long interval = session.getMaxInactiveInterval();
295 long lastaccdist = System.currentTimeMillis() -
296 session.getLastAccessWasDistributed();
297 if ( ((interval*1000) / lastaccdist)< 3 ) {
298 SessionMessage accmsg = new SessionMessageImpl(name,
299 SessionMessage.EVT_SESSION_ACCESSED,
300 null,
301 sessionId,
302 sessionId);
303 session.setLastAccessWasDistributed(System.currentTimeMillis());
304 return accmsg;
305 }
306 return null;
307 }
308
309 session.setIsDirty(false);
310 if (log.isDebugEnabled()) {
311 try {
312 log.debug("Sending session to cluster=" + session);
313 }
314 catch (Exception ignore) {}
315 }
316 SessionMessage msg = new SessionMessageImpl(name,
317 SessionMessage.EVT_SESSION_CREATED,
318 writeSession(session),
319 session.getIdInternal(),
320 session.getIdInternal());
321 return msg;
322 } //end if
323 }//end if
324 }
325 catch (Exception x )
326 {
327 log.error("Unable to replicate session",x);
328 }
329 return null;
330 }
331
332 /**
333 * Serialize a session into a byte array<BR>
334 * This method simple calls the writeObjectData method on the session
335 * and returns the byte data from that call
336 * @param session - the session to be serialized
337 * @return a byte array containing the session data, null if the serialization failed
338 */
339 protected byte[] writeSession( Session session )
340 {
341 try
342 {
343 java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
344 java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
345 session_out.flush();
346 boolean hasPrincipal = session.getPrincipal() != null;
347 session_out.writeBoolean(hasPrincipal);
348 if ( hasPrincipal )
349 {
350 session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
351 }//end if
352 ((ReplicatedSession)session).writeObjectData(session_out);
353 return session_data.toByteArray();
354
355 }
356 catch ( Exception x )
357 {
358 log.error("Failed to serialize the session!",x);
359 }
360 return null;
361 }
362
363 /**
364 * Reinstantiates a serialized session from the data passed in.
365 * This will first call createSession() so that we get a fresh instance with all
366 * the managers set and all the transient fields validated.
367 * Then it calls Session.readObjectData(byte[]) to deserialize the object
368 * @param data - a byte array containing session data
369 * @return a valid Session object, null if an error occurs
370 *
371 */
372 protected Session readSession( byte[] data, String sessionId )
373 {
374 try
375 {
376 java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream(data);
377 ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader());
378
379 Session session = sessionId!=null?this.findSession(sessionId):null;
380 boolean isNew = (session==null);
381 //clear the old values from the existing session
382 if ( session!=null ) {
383 ReplicatedSession rs = (ReplicatedSession)session;
384 rs.expire(false); //cleans up the previous values, since we are not doing removes
385 session = null;
386 }//end if
387
388 if (session==null) {
389 session = createSession(null,false, false);
390 sessions.remove(session.getIdInternal());
391 }
392
393
394 boolean hasPrincipal = session_in.readBoolean();
395 SerializablePrincipal p = null;
396 if ( hasPrincipal )
397 p = (SerializablePrincipal)session_in.readObject();
398 ((ReplicatedSession)session).readObjectData(session_in);
399 if ( hasPrincipal )
400 session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
401 ((ReplicatedSession)session).setId(sessionId,isNew);
402 ReplicatedSession rsession = (ReplicatedSession)session;
403 rsession.setAccessCount(1);
404 session.setManager(this);
405 session.setValid(true);
406 rsession.setLastAccessedTime(System.currentTimeMillis());
407 rsession.setThisAccessedTime(System.currentTimeMillis());
408 ((ReplicatedSession)session).setAccessCount(0);
409 session.setNew(false);
410 if(log.isTraceEnabled())
411 log.trace("Session loaded id="+sessionId +
412 " actualId="+session.getId()+
413 " exists="+this.sessions.containsKey(sessionId)+
414 " valid="+rsession.isValid());
415 return session;
416
417 }
418 catch ( Exception x )
419 {
420 log.error("Failed to deserialize the session!",x);
421 }
422 return null;
423 }
424
425 public String getName() {
426 return this.name;
427 }
428 /**
429 * Prepare for the beginning of active use of the public methods of this
430 * component. This method should be called after <code>configure()</code>,
431 * and before any of the public methods of the component are utilized.<BR>
432 * Starts the cluster communication channel, this will connect with the other nodes
433 * in the cluster, and request the current session state to be transferred to this node.
434 * @exception IllegalStateException if this component has already been
435 * started
436 * @exception LifecycleException if this component detects a fatal error
437 * that prevents this component from being used
438 */
439 public void start() throws LifecycleException {
440 mManagerRunning = true;
441 super.start();
442 //start the javagroups channel
443 try {
444 //the channel is already running
445 if ( mChannelStarted ) return;
446 if(log.isInfoEnabled())
447 log.info("Starting clustering manager...:"+getName());
448 if ( cluster == null ) {
449 log.error("Starting... no cluster associated with this context:"+getName());
450 return;
451 }
452 cluster.addManager(getName(),this);
453
454 if (cluster.getMembers().length > 0) {
455 Member mbr = cluster.getMembers()[0];
456 SessionMessage msg =
457 new SessionMessageImpl(this.getName(),
458 SessionMessage.EVT_GET_ALL_SESSIONS,
459 null,
460 "GET-ALL",
461 "GET-ALL-"+this.getName());
462 cluster.send(msg, mbr);
463 if(log.isWarnEnabled())
464 log.warn("Manager["+getName()+"], requesting session state from "+mbr+
465 ". This operation will timeout if no session state has been received within "+
466 "60 seconds");
467 long reqStart = System.currentTimeMillis();
468 long reqNow = 0;
469 boolean isTimeout=false;
470 do {
471 try {
472 Thread.sleep(100);
473 }catch ( Exception sleep) {}
474 reqNow = System.currentTimeMillis();
475 isTimeout=((reqNow-reqStart)>(1000*60));
476 } while ( (!isStateTransferred()) && (!isTimeout));
477 if ( isTimeout || (!isStateTransferred()) ) {
478 log.error("Manager["+getName()+"], No session state received, timing out.");
479 }else {
480 if(log.isInfoEnabled())
481 log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
482 }
483 } else {
484 if(log.isInfoEnabled())
485 log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
486 }//end if
487 mChannelStarted = true;
488 } catch ( Exception x ) {
489 log.error("Unable to start SimpleTcpReplicationManager",x);
490 }
491 }
492
493 /**
494 * Gracefully terminate the active use of the public methods of this
495 * component. This method should be the last one called on a given
496 * instance of this component.<BR>
497 * This will disconnect the cluster communication channel and stop the listener thread.
498 * @exception IllegalStateException if this component has not been started
499 * @exception LifecycleException if this component detects a fatal error
500 * that needs to be reported
501 */
502 public void stop() throws LifecycleException
503 {
504 mManagerRunning = false;
505 mChannelStarted = false;
506 super.stop();
507 //stop the javagroup channel
508 try
509 {
510 this.sessions.clear();
511 cluster.removeManager(getName(),this);
512 // mReplicationListener.stopListening();
513 // mReplicationTransmitter.stop();
514 // service.stop();
515 // service = null;
516 }
517 catch ( Exception x )
518 {
519 log.error("Unable to stop SimpleTcpReplicationManager",x);
520 }
521 }
522
523 public void setDistributable(boolean dist) {
524 this.distributable = dist;
525 }
526
527 public boolean getDistributable() {
528 return distributable;
529 }
530
531 /**
532 * This method is called by the received thread when a SessionMessage has
533 * been received from one of the other nodes in the cluster.
534 * @param msg - the message received
535 * @param sender - the sender of the message, this is used if we receive a
536 * EVT_GET_ALL_SESSION message, so that we only reply to
537 * the requesting node
538 */
539 protected void messageReceived( SessionMessage msg, Member sender ) {
540 try {
541 if(log.isInfoEnabled()) {
542 log.debug("Received SessionMessage of type="+msg.getEventTypeString());
543 log.debug("Received SessionMessage sender="+sender);
544 }
545 switch ( msg.getEventType() ) {
546 case SessionMessage.EVT_GET_ALL_SESSIONS: {
547 //get a list of all the session from this manager
548 Object[] sessions = findSessions();
549 java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
550 java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
551 oout.writeInt(sessions.length);
552 for (int i=0; i<sessions.length; i++){
553 ReplicatedSession ses = (ReplicatedSession)sessions[i];
554 oout.writeUTF(ses.getIdInternal());
555 byte[] data = writeSession(ses);
556 oout.writeObject(data);
557 }//for
558 //don't send a message if we don't have to
559 oout.flush();
560 oout.close();
561 byte[] data = bout.toByteArray();
562 SessionMessage newmsg = new SessionMessageImpl(name,
563 SessionMessage.EVT_ALL_SESSION_DATA,
564 data, "SESSION-STATE","SESSION-STATE-"+getName());
565 cluster.send(newmsg, sender);
566 break;
567 }
568 case SessionMessage.EVT_ALL_SESSION_DATA: {
569 java.io.ByteArrayInputStream bin =
570 new java.io.ByteArrayInputStream(msg.getSession());
571 java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
572 int size = oin.readInt();
573 for ( int i=0; i<size; i++) {
574 String id = oin.readUTF();
575 byte[] data = (byte[])oin.readObject();
576 Session session = readSession(data,id);
577 }//for
578 stateTransferred=true;
579 break;
580 }
581 case SessionMessage.EVT_SESSION_CREATED: {
582 Session session = this.readSession(msg.getSession(),msg.getSessionID());
583 if ( log.isDebugEnabled() ) {
584 log.debug("Received replicated session=" + session +
585 " isValid=" + session.isValid());
586 }
587 break;
588 }
589 case SessionMessage.EVT_SESSION_EXPIRED: {
590 Session session = findSession(msg.getSessionID());
591 if ( session != null ) {
592 session.expire();
593 this.remove(session);
594 }//end if
595 break;
596 }
597 case SessionMessage.EVT_SESSION_ACCESSED :{
598 Session session = findSession(msg.getSessionID());
599 if ( session != null ) {
600 session.access();
601 session.endAccess();
602 }
603 break;
604 }
605 default: {
606 //we didn't recognize the message type, do nothing
607 break;
608 }
609 }//switch
610 }
611 catch ( Exception x )
612 {
613 log.error("Unable to receive message through TCP channel",x);
614 }
615 }
616
617 public void messageDataReceived(ClusterMessage cmsg) {
618 try {
619 if ( cmsg instanceof SessionMessage ) {
620 SessionMessage msg = (SessionMessage)cmsg;
621 messageReceived(msg,
622 msg.getAddress() != null ? (Member) msg.getAddress() : null);
623 }
624 } catch(Throwable ex){
625 log.error("InMemoryReplicationManager.messageDataReceived()", ex);
626 }//catch
627 }
628
629 public boolean isStateTransferred() {
630 return stateTransferred;
631 }
632
633 public void setName(String name) {
634 this.name = name;
635 }
636 public boolean isNotifyListenersOnReplication() {
637 return notifyListenersOnReplication;
638 }
639 public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
640 this.notifyListenersOnReplication = notifyListenersOnReplication;
641 }
642
643
644 /*
645 * @see org.apache.catalina.cluster.ClusterManager#getCluster()
646 */
647 public CatalinaCluster getCluster() {
648 return cluster;
649 }
650
651 }