1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.catalina.ha.session;
19
20 import java.beans.PropertyChangeEvent;
21 import java.io.BufferedOutputStream;
22 import java.io.ByteArrayOutputStream;
23 import java.io.IOException;
24 import java.io.ObjectInputStream;
25 import java.io.ObjectOutputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.Iterator;
29
30 import org.apache.catalina.Cluster;
31 import org.apache.catalina.Container;
32 import org.apache.catalina.Context;
33 import org.apache.catalina.Engine;
34 import org.apache.catalina.Host;
35 import org.apache.catalina.LifecycleException;
36 import org.apache.catalina.LifecycleListener;
37 import org.apache.catalina.Session;
38 import org.apache.catalina.Valve;
39 import org.apache.catalina.core.StandardContext;
40 import org.apache.catalina.ha.CatalinaCluster;
41 import org.apache.catalina.ha.ClusterMessage;
42 import org.apache.catalina.ha.tcp.ReplicationValve;
43 import org.apache.catalina.tribes.Member;
44 import org.apache.catalina.tribes.io.ReplicationStream;
45 import org.apache.catalina.util.LifecycleSupport;
46 import org.apache.catalina.util.StringManager;
47 import org.apache.catalina.ha.ClusterManager;
48
49 /**
50 * The DeltaManager manages replicated sessions by only replicating the deltas
51 * in data. For applications written to handle this, the DeltaManager is the
52 * optimal way of replicating data.
53 *
54 * This code is almost identical to StandardManager with a difference in how it
55 * persists sessions and some modifications to it.
56 *
57 * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
58 * reloading depends upon external calls to the <code>start()</code> and
59 * <code>stop()</code> methods of this class at the correct times.
60 *
61 * @author Filip Hanik
62 * @author Craig R. McClanahan
63 * @author Jean-Francois Arcand
64 * @author Peter Rossbach
65 * @version $Revision: 561326 $ $Date: 2007-07-31 15:35:35 +0200 (mar., 31 juil. 2007) $
66 */
67
68 public class DeltaManager extends ClusterManagerBase{
69
70 // ---------------------------------------------------- Security Classes
71 public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
72
73 /**
74 * The string manager for this package.
75 */
76 protected static StringManager sm = StringManager.getManager(Constants.Package);
77
78 // ----------------------------------------------------- Instance Variables
79
80 /**
81 * The descriptive information about this implementation.
82 */
83 private static final String info = "DeltaManager/2.1";
84
85 /**
86 * Has this component been started yet?
87 */
88 private boolean started = false;
89
90 /**
91 * The descriptive name of this Manager implementation (for logging).
92 */
93 protected static String managerName = "DeltaManager";
94 protected String name = null;
95 protected boolean defaultMode = false;
96 private CatalinaCluster cluster = null;
97
98 /**
99 * cached replication valve cluster container!
100 */
101 private ReplicationValve replicationValve = null ;
102
103 /**
104 * The lifecycle event support for this component.
105 */
106 protected LifecycleSupport lifecycle = new LifecycleSupport(this);
107
108 /**
109 * The maximum number of active Sessions allowed, or -1 for no limit.
110 */
111 private int maxActiveSessions = -1;
112 private boolean expireSessionsOnShutdown = false;
113 private boolean notifyListenersOnReplication = true;
114 private boolean notifySessionListenersOnReplication = true;
115 private boolean stateTransfered = false ;
116 private int stateTransferTimeout = 60;
117 private boolean sendAllSessions = true;
118 private boolean sendClusterDomainOnly = true ;
119 private int sendAllSessionsSize = 1000 ;
120
121 /**
122 * wait time between send session block (default 2 sec)
123 */
124 private int sendAllSessionsWaitTime = 2 * 1000 ;
125 private ArrayList receivedMessageQueue = new ArrayList() ;
126 private boolean receiverQueue = false ;
127 private boolean stateTimestampDrop = true ;
128 private long stateTransferCreateSendTime;
129
130 // ------------------------------------------------------------------ stats attributes
131
132 int rejectedSessions = 0;
133 private long sessionReplaceCounter = 0 ;
134 long processingTime = 0;
135 private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
136 private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
137 private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
138 private long counterReceive_EVT_SESSION_CREATED = 0 ;
139 private long counterReceive_EVT_SESSION_EXPIRED = 0;
140 private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
141 private long counterReceive_EVT_SESSION_DELTA = 0;
142 private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
143 private long counterSend_EVT_SESSION_CREATED = 0;
144 private long counterSend_EVT_SESSION_DELTA = 0 ;
145 private long counterSend_EVT_SESSION_ACCESSED = 0;
146 private long counterSend_EVT_SESSION_EXPIRED = 0;
147 private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
148 private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
149 private int counterNoStateTransfered = 0 ;
150
151
152 // ------------------------------------------------------------- Constructor
153 public DeltaManager() {
154 super();
155 }
156
157 // ------------------------------------------------------------- Properties
158
159 /**
160 * Return descriptive information about this Manager implementation and the
161 * corresponding version number, in the format
162 * <code><description>/<version></code>.
163 */
164 public String getInfo() {
165 return info;
166 }
167
168 public void setName(String name) {
169 this.name = name;
170 }
171
172 /**
173 * Return the descriptive short name of this Manager implementation.
174 */
175 public String getName() {
176 return name;
177 }
178
179 /**
180 * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
181 */
182 public long getCounterSend_EVT_GET_ALL_SESSIONS() {
183 return counterSend_EVT_GET_ALL_SESSIONS;
184 }
185
186 /**
187 * @return Returns the counterSend_EVT_SESSION_ACCESSED.
188 */
189 public long getCounterSend_EVT_SESSION_ACCESSED() {
190 return counterSend_EVT_SESSION_ACCESSED;
191 }
192
193 /**
194 * @return Returns the counterSend_EVT_SESSION_CREATED.
195 */
196 public long getCounterSend_EVT_SESSION_CREATED() {
197 return counterSend_EVT_SESSION_CREATED;
198 }
199
200 /**
201 * @return Returns the counterSend_EVT_SESSION_DELTA.
202 */
203 public long getCounterSend_EVT_SESSION_DELTA() {
204 return counterSend_EVT_SESSION_DELTA;
205 }
206
207 /**
208 * @return Returns the counterSend_EVT_SESSION_EXPIRED.
209 */
210 public long getCounterSend_EVT_SESSION_EXPIRED() {
211 return counterSend_EVT_SESSION_EXPIRED;
212 }
213
214 /**
215 * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
216 */
217 public long getCounterSend_EVT_ALL_SESSION_DATA() {
218 return counterSend_EVT_ALL_SESSION_DATA;
219 }
220
221 /**
222 * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
223 */
224 public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
225 return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
226 }
227
228 /**
229 * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
230 */
231 public long getCounterReceive_EVT_ALL_SESSION_DATA() {
232 return counterReceive_EVT_ALL_SESSION_DATA;
233 }
234
235 /**
236 * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
237 */
238 public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
239 return counterReceive_EVT_GET_ALL_SESSIONS;
240 }
241
242 /**
243 * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
244 */
245 public long getCounterReceive_EVT_SESSION_ACCESSED() {
246 return counterReceive_EVT_SESSION_ACCESSED;
247 }
248
249 /**
250 * @return Returns the counterReceive_EVT_SESSION_CREATED.
251 */
252 public long getCounterReceive_EVT_SESSION_CREATED() {
253 return counterReceive_EVT_SESSION_CREATED;
254 }
255
256 /**
257 * @return Returns the counterReceive_EVT_SESSION_DELTA.
258 */
259 public long getCounterReceive_EVT_SESSION_DELTA() {
260 return counterReceive_EVT_SESSION_DELTA;
261 }
262
263 /**
264 * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
265 */
266 public long getCounterReceive_EVT_SESSION_EXPIRED() {
267 return counterReceive_EVT_SESSION_EXPIRED;
268 }
269
270
271 /**
272 * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
273 */
274 public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
275 return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
276 }
277
278 /**
279 * @return Returns the processingTime.
280 */
281 public long getProcessingTime() {
282 return processingTime;
283 }
284
285 /**
286 * @return Returns the sessionReplaceCounter.
287 */
288 public long getSessionReplaceCounter() {
289 return sessionReplaceCounter;
290 }
291
292 /**
293 * Number of session creations that failed due to maxActiveSessions
294 *
295 * @return The count
296 */
297 public int getRejectedSessions() {
298 return rejectedSessions;
299 }
300
301 public void setRejectedSessions(int rejectedSessions) {
302 this.rejectedSessions = rejectedSessions;
303 }
304
305 /**
306 * @return Returns the counterNoStateTransfered.
307 */
308 public int getCounterNoStateTransfered() {
309 return counterNoStateTransfered;
310 }
311
312 public int getReceivedQueueSize() {
313 return receivedMessageQueue.size() ;
314 }
315
316 /**
317 * @return Returns the stateTransferTimeout.
318 */
319 public int getStateTransferTimeout() {
320 return stateTransferTimeout;
321 }
322 /**
323 * @param timeoutAllSession The timeout
324 */
325 public void setStateTransferTimeout(int timeoutAllSession) {
326 this.stateTransferTimeout = timeoutAllSession;
327 }
328
329 /**
330 * is session state transfered complete?
331 *
332 */
333 public boolean getStateTransfered() {
334 return stateTransfered;
335 }
336
337 /**
338 * set that state ist complete transfered
339 * @param stateTransfered
340 */
341 public void setStateTransfered(boolean stateTransfered) {
342 this.stateTransfered = stateTransfered;
343 }
344
345 /**
346 * @return Returns the sendAllSessionsWaitTime in msec
347 */
348 public int getSendAllSessionsWaitTime() {
349 return sendAllSessionsWaitTime;
350 }
351
352 /**
353 * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
354 */
355 public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
356 this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
357 }
358
359 /**
360 * @return Returns the sendClusterDomainOnly.
361 */
362 public boolean doDomainReplication() {
363 return sendClusterDomainOnly;
364 }
365
366 /**
367 * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
368 */
369 public void setDomainReplication(boolean sendClusterDomainOnly) {
370 this.sendClusterDomainOnly = sendClusterDomainOnly;
371 }
372
373 /**
374 * @return Returns the stateTimestampDrop.
375 */
376 public boolean isStateTimestampDrop() {
377 return stateTimestampDrop;
378 }
379
380 /**
381 * @param isTimestampDrop The new flag value
382 */
383 public void setStateTimestampDrop(boolean isTimestampDrop) {
384 this.stateTimestampDrop = isTimestampDrop;
385 }
386
387 /**
388 * Return the maximum number of active Sessions allowed, or -1 for no limit.
389 */
390 public int getMaxActiveSessions() {
391 return (this.maxActiveSessions);
392 }
393
394 /**
395 * Set the maximum number of actives Sessions allowed, or -1 for no limit.
396 *
397 * @param max
398 * The new maximum number of sessions
399 */
400 public void setMaxActiveSessions(int max) {
401 int oldMaxActiveSessions = this.maxActiveSessions;
402 this.maxActiveSessions = max;
403 support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions));
404 }
405
406 /**
407 *
408 * @return Returns the sendAllSessions.
409 */
410 public boolean isSendAllSessions() {
411 return sendAllSessions;
412 }
413
414 /**
415 * @param sendAllSessions The sendAllSessions to set.
416 */
417 public void setSendAllSessions(boolean sendAllSessions) {
418 this.sendAllSessions = sendAllSessions;
419 }
420
421 /**
422 * @return Returns the sendAllSessionsSize.
423 */
424 public int getSendAllSessionsSize() {
425 return sendAllSessionsSize;
426 }
427
428 /**
429 * @param sendAllSessionsSize The sendAllSessionsSize to set.
430 */
431 public void setSendAllSessionsSize(int sendAllSessionsSize) {
432 this.sendAllSessionsSize = sendAllSessionsSize;
433 }
434
435 /**
436 * @return Returns the notifySessionListenersOnReplication.
437 */
438 public boolean isNotifySessionListenersOnReplication() {
439 return notifySessionListenersOnReplication;
440 }
441
442 /**
443 * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
444 */
445 public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) {
446 this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
447 }
448
449
450 public boolean isExpireSessionsOnShutdown() {
451 return expireSessionsOnShutdown;
452 }
453
454 public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
455 this.expireSessionsOnShutdown = expireSessionsOnShutdown;
456 }
457
458 public boolean isNotifyListenersOnReplication() {
459 return notifyListenersOnReplication;
460 }
461
462 public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
463 this.notifyListenersOnReplication = notifyListenersOnReplication;
464 }
465
466
467 /**
468 * @return Returns the defaultMode.
469 */
470 public boolean isDefaultMode() {
471 return defaultMode;
472 }
473 /**
474 * @param defaultMode The defaultMode to set.
475 */
476 public void setDefaultMode(boolean defaultMode) {
477 this.defaultMode = defaultMode;
478 }
479
480 public CatalinaCluster getCluster() {
481 return cluster;
482 }
483
484 public void setCluster(CatalinaCluster cluster) {
485 this.cluster = cluster;
486 }
487
488 /**
489 * Set the Container with which this Manager has been associated. If it is a
490 * Context (the usual case), listen for changes to the session timeout
491 * property.
492 *
493 * @param container
494 * The associated Container
495 */
496 public void setContainer(Container container) {
497 // De-register from the old Container (if any)
498 if ((this.container != null) && (this.container instanceof Context))
499 ((Context) this.container).removePropertyChangeListener(this);
500
501 // Default processing provided by our superclass
502 super.setContainer(container);
503
504 // Register with the new Container (if any)
505 if ((this.container != null) && (this.container instanceof Context)) {
506 setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60);
507 ((Context) this.container).addPropertyChangeListener(this);
508 }
509
510 }
511
512 // --------------------------------------------------------- Public Methods
513
514 /**
515 * Construct and return a new session object, based on the default settings
516 * specified by this Manager's properties. The session id will be assigned
517 * by this method, and available via the getId() method of the returned
518 * session. If a new session cannot be created for any reason, return
519 * <code>null</code>.
520 *
521 * @exception IllegalStateException
522 * if a new session cannot be instantiated for any reason
523 *
524 * Construct and return a new session object, based on the default settings
525 * specified by this Manager's properties. The session id will be assigned
526 * by this method, and available via the getId() method of the returned
527 * session. If a new session cannot be created for any reason, return
528 * <code>null</code>.
529 *
530 * @exception IllegalStateException
531 * if a new session cannot be instantiated for any reason
532 */
533 public Session createSession(String sessionId) {
534 return createSession(sessionId, true);
535 }
536
537 /**
538 * create new session with check maxActiveSessions and send session creation
539 * to other cluster nodes.
540 *
541 * @param distribute
542 * @return The session
543 */
544 public Session createSession(String sessionId, boolean distribute) {
545 if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
546 rejectedSessions++;
547 throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
548 }
549 DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
550 if (distribute) {
551 sendCreateSession(session.getId(), session);
552 }
553 if (log.isDebugEnabled())
554 log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
555 return (session);
556
557 }
558
559 /**
560 * Send create session evt to all backup node
561 * @param sessionId
562 * @param session
563 */
564 protected void sendCreateSession(String sessionId, DeltaSession session) {
565 if(cluster.getMembers().length > 0 ) {
566 SessionMessage msg =
567 new SessionMessageImpl(getName(),
568 SessionMessage.EVT_SESSION_CREATED,
569 null,
570 sessionId,
571 sessionId + "-" + System.currentTimeMillis());
572 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
573 msg.setTimestamp(session.getCreationTime());
574 counterSend_EVT_SESSION_CREATED++;
575 send(msg);
576 }
577 }
578
579 /**
580 * Send messages to other backup member (domain or all)
581 * @param msg Session message
582 */
583 protected void send(SessionMessage msg) {
584 if(cluster != null) {
585 if(doDomainReplication())
586 cluster.sendClusterDomain(msg);
587 else
588 cluster.send(msg);
589 }
590 }
591
592 /**
593 * Create DeltaSession
594 * @see org.apache.catalina.Manager#createEmptySession()
595 */
596 public Session createEmptySession() {
597 return getNewDeltaSession() ;
598 }
599
600 /**
601 * Get new session class to be used in the doLoad() method.
602 */
603 protected DeltaSession getNewDeltaSession() {
604 return new DeltaSession(this);
605 }
606
607 /**
608 * Load Deltarequest from external node
609 * Load the Class at container classloader
610 * @see DeltaRequest#readExternal(java.io.ObjectInput)
611 * @param session
612 * @param data message data
613 * @return The request
614 * @throws ClassNotFoundException
615 * @throws IOException
616 */
617 protected DeltaRequest deserializeDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException {
618 ReplicationStream ois = getReplicationStream(data);
619 session.getDeltaRequest().readExternal(ois);
620 ois.close();
621 return session.getDeltaRequest();
622 }
623
624 /**
625 * serialize DeltaRequest
626 * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
627 *
628 * @param deltaRequest
629 * @return serialized delta request
630 * @throws IOException
631 */
632 protected byte[] serializeDeltaRequest(DeltaRequest deltaRequest) throws IOException {
633 return deltaRequest.serialize();
634 }
635
636 /**
637 * Load sessions from other cluster node.
638 * FIXME replace currently sessions with same id without notifcation.
639 * FIXME SSO handling is not really correct with the session replacement!
640 * @exception ClassNotFoundException
641 * if a serialized class cannot be found during the reload
642 * @exception IOException
643 * if an input/output error occurs
644 */
645 protected void deserializeSessions(byte[] data) throws ClassNotFoundException,IOException {
646
647 // Initialize our internal data structures
648 //sessions.clear(); //should not do this
649 // Open an input stream to the specified pathname, if any
650 ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
651 ObjectInputStream ois = null;
652 // Load the previously unloaded active sessions
653 try {
654 ois = getReplicationStream(data);
655 Integer count = (Integer) ois.readObject();
656 int n = count.intValue();
657 for (int i = 0; i < n; i++) {
658 DeltaSession session = (DeltaSession) createEmptySession();
659 session.readObjectData(ois);
660 session.setManager(this);
661 session.setValid(true);
662 session.setPrimarySession(false);
663 //in case the nodes in the cluster are out of
664 //time synch, this will make sure that we have the
665 //correct timestamp, isValid returns true, cause
666 // accessCount=1
667 session.access();
668 //make sure that the session gets ready to expire if
669 // needed
670 session.setAccessCount(0);
671 session.resetDeltaRequest();
672 // FIXME How inform other session id cache like SingleSignOn
673 // increment sessionCounter to correct stats report
674 if (findSession(session.getIdInternal()) == null ) {
675 sessionCounter++;
676 } else {
677 sessionReplaceCounter++;
678 // FIXME better is to grap this sessions again !
679 if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal()));
680 }
681 add(session);
682 }
683 } catch (ClassNotFoundException e) {
684 log.error(sm.getString("deltaManager.loading.cnfe", e), e);
685 throw e;
686 } catch (IOException e) {
687 log.error(sm.getString("deltaManager.loading.ioe", e), e);
688 throw e;
689 } finally {
690 // Close the input stream
691 try {
692 if (ois != null) ois.close();
693 } catch (IOException f) {
694 // ignored
695 }
696 ois = null;
697 if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
698 }
699
700 }
701
702
703
704 /**
705 * Save any currently active sessions in the appropriate persistence
706 * mechanism, if any. If persistence is not supported, this method returns
707 * without doing anything.
708 *
709 * @exception IOException
710 * if an input/output error occurs
711 */
712 protected byte[] serializeSessions(Session[] currentSessions) throws IOException {
713
714 // Open an output stream to the specified pathname, if any
715 ByteArrayOutputStream fos = null;
716 ObjectOutputStream oos = null;
717
718 try {
719 fos = new ByteArrayOutputStream();
720 oos = new ObjectOutputStream(new BufferedOutputStream(fos));
721 oos.writeObject(new Integer(currentSessions.length));
722 for(int i=0 ; i < currentSessions.length;i++) {
723 ((DeltaSession)currentSessions[i]).writeObjectData(oos);
724 }
725 // Flush and close the output stream
726 oos.flush();
727 } catch (IOException e) {
728 log.error(sm.getString("deltaManager.unloading.ioe", e), e);
729 throw e;
730 } finally {
731 if (oos != null) {
732 try {
733 oos.close();
734 } catch (IOException f) {
735 ;
736 }
737 oos = null;
738 }
739 }
740 // send object data as byte[]
741 return fos.toByteArray();
742 }
743
744 // ------------------------------------------------------ Lifecycle Methods
745
746 /**
747 * Add a lifecycle event listener to this component.
748 *
749 * @param listener
750 * The listener to add
751 */
752 public void addLifecycleListener(LifecycleListener listener) {
753 lifecycle.addLifecycleListener(listener);
754 }
755
756 /**
757 * Get the lifecycle listeners associated with this lifecycle. If this
758 * Lifecycle has no listeners registered, a zero-length array is returned.
759 */
760 public LifecycleListener[] findLifecycleListeners() {
761 return lifecycle.findLifecycleListeners();
762 }
763
764 /**
765 * Remove a lifecycle event listener from this component.
766 *
767 * @param listener
768 * The listener to remove
769 */
770 public void removeLifecycleListener(LifecycleListener listener) {
771 lifecycle.removeLifecycleListener(listener);
772 }
773
774 /**
775 * Prepare for the beginning of active use of the public methods of this
776 * component. This method should be called after <code>configure()</code>,
777 * and before any of the public methods of the component are utilized.
778 *
779 * @exception LifecycleException
780 * if this component detects a fatal error that prevents this
781 * component from being used
782 */
783 public void start() throws LifecycleException {
784 if (!initialized) init();
785
786 // Validate and update our current component state
787 if (started) {
788 return;
789 }
790 started = true;
791 lifecycle.fireLifecycleEvent(START_EVENT, null);
792
793 // Force initialization of the random number generator
794 generateSessionId();
795
796 // Load unloaded sessions, if any
797 try {
798 //the channel is already running
799 Cluster cluster = getCluster() ;
800 // stop remove cluster binding
801 //wow, how many nested levels of if statements can we have ;)
802 if(cluster == null) {
803 Container context = getContainer() ;
804 if(context != null && context instanceof Context) {
805 Container host = context.getParent() ;
806 if(host != null && host instanceof Host) {
807 cluster = host.getCluster();
808 if(cluster != null && cluster instanceof CatalinaCluster) {
809 setCluster((CatalinaCluster) cluster) ;
810 } else {
811 Container engine = host.getParent() ;
812 if(engine != null && engine instanceof Engine) {
813 cluster = engine.getCluster();
814 if(cluster != null && cluster instanceof CatalinaCluster) {
815 setCluster((CatalinaCluster) cluster) ;
816 }
817 } else {
818 cluster = null ;
819 }
820 }
821 }
822 }
823 }
824 if (cluster == null) {
825 log.error(sm.getString("deltaManager.noCluster", getName()));
826 return;
827 } else {
828 if (log.isInfoEnabled()) {
829 String type = "unknown" ;
830 if( cluster.getContainer() instanceof Host){
831 type = "Host" ;
832 } else if( cluster.getContainer() instanceof Engine){
833 type = "Engine" ;
834 }
835 log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
836 }
837 }
838 if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
839 //to survice context reloads, as only a stop/start is called, not
840 // createManager
841 cluster.registerManager(this);
842
843 getAllClusterSessions();
844
845 } catch (Throwable t) {
846 log.error(sm.getString("deltaManager.managerLoad"), t);
847 }
848 }
849
850 /**
851 * get from first session master the backup from all clustered sessions
852 * @see #findSessionMasterMember()
853 */
854 public synchronized void getAllClusterSessions() {
855 if (cluster != null && cluster.getMembers().length > 0) {
856 long beforeSendTime = System.currentTimeMillis();
857 Member mbr = findSessionMasterMember();
858 if(mbr == null) { // No domain member found
859 return;
860 }
861 SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
862 // set reference time
863 stateTransferCreateSendTime = beforeSendTime ;
864 // request session state
865 counterSend_EVT_GET_ALL_SESSIONS++;
866 stateTransfered = false ;
867 // FIXME This send call block the deploy thread, when sender waitForAck is enabled
868 try {
869 synchronized(receivedMessageQueue) {
870 receiverQueue = true ;
871 }
872 cluster.send(msg, mbr);
873 if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr));
874 // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
875 waitForSendAllSessions(beforeSendTime);
876 } finally {
877 synchronized(receivedMessageQueue) {
878 for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
879 SessionMessage smsg = (SessionMessage) iter.next();
880 if (!stateTimestampDrop) {
881 messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
882 } else {
883 if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
884 // FIXME handle EVT_GET_ALL_SESSIONS later
885 messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
886 } else {
887 if (log.isWarnEnabled()) {
888 log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
889 }
890 }
891 }
892 }
893 receivedMessageQueue.clear();
894 receiverQueue = false ;
895 }
896 }
897 } else {
898 if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
899 }
900 }
901
902 /**
903 * Register cross context session at replication valve thread local
904 * @param session cross context session
905 */
906 protected void registerSessionAtReplicationValve(DeltaSession session) {
907 if(replicationValve == null) {
908 if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) {
909 Cluster cluster = getCluster() ;
910 if(cluster != null && cluster instanceof CatalinaCluster) {
911 Valve[] valves = ((CatalinaCluster)cluster).getValves();
912 if(valves != null && valves.length > 0) {
913 for(int i=0; replicationValve == null && i < valves.length ; i++ ){
914 if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ;
915 }//for
916
917 if(replicationValve == null && log.isDebugEnabled()) {
918 log.debug("no ReplicationValve found for CrossContext Support");
919 }//endif
920 }//end if
921 }//endif
922 }//end if
923 }//end if
924 if(replicationValve != null) {
925 replicationValve.registerReplicationSession(session);
926 }
927 }
928
929 /**
930 * Find the master of the session state
931 * @return master member of sessions
932 */
933 protected Member findSessionMasterMember() {
934 Member mbr = null;
935 Member mbrs[] = cluster.getMembers();
936 if(mbrs.length != 0 ) mbr = mbrs[0];
937 if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), ""));
938 if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr));
939 return mbr;
940 }
941
942 /**
943 * Wait that cluster session state is transfer or timeout after 60 Sec
944 * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
945 */
946 protected void waitForSendAllSessions(long beforeSendTime) {
947 long reqStart = System.currentTimeMillis();
948 long reqNow = reqStart ;
949 boolean isTimeout = false;
950 if(getStateTransferTimeout() > 0) {
951 // wait that state is transfered with timeout check
952 do {
953 try {
954 Thread.sleep(100);
955 } catch (Exception sleep) {
956 //
957 }
958 reqNow = System.currentTimeMillis();
959 isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
960 } while ((!getStateTransfered()) && (!isTimeout));
961 } else {
962 if(getStateTransferTimeout() == -1) {
963 // wait that state is transfered
964 do {
965 try {
966 Thread.sleep(100);
967 } catch (Exception sleep) {
968 }
969 } while ((!getStateTransfered()));
970 reqNow = System.currentTimeMillis();
971 }
972 }
973 if (isTimeout || (!getStateTransfered())) {
974 counterNoStateTransfered++ ;
975 log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
976 } else {
977 if (log.isInfoEnabled())
978 log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
979 }
980 }
981
982 /**
983 * Gracefully terminate the active use of the public methods of this
984 * component. This method should be the last one called on a given instance
985 * of this component.
986 *
987 * @exception LifecycleException
988 * if this component detects a fatal error that needs to be
989 * reported
990 */
991 public void stop() throws LifecycleException {
992
993 if (log.isDebugEnabled())
994 log.debug(sm.getString("deltaManager.stopped", getName()));
995
996
997 // Validate and update our current component state
998 if (!started)
999 throw new LifecycleException(sm.getString("deltaManager.notStarted"));
1000 lifecycle.fireLifecycleEvent(STOP_EVENT, null);
1001 started = false;
1002
1003 // Expire all active sessions
1004 if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName()));
1005 Session sessions[] = findSessions();
1006 for (int i = 0; i < sessions.length; i++) {
1007 DeltaSession session = (DeltaSession) sessions[i];
1008 if (!session.isValid())
1009 continue;
1010 try {
1011 session.expire(true, isExpireSessionsOnShutdown());
1012 } catch (Throwable ignore) {
1013 ;
1014 }
1015 }
1016
1017 // Require a new random number generator if we are restarted
1018 this.random = null;
1019 getCluster().removeManager(this);
1020 replicationValve = null;
1021 if (initialized) {
1022 destroy();
1023 }
1024 }
1025
1026 // ----------------------------------------- PropertyChangeListener Methods
1027
1028 /**
1029 * Process property change events from our associated Context.
1030 *
1031 * @param event
1032 * The property change event that has occurred
1033 */
1034 public void propertyChange(PropertyChangeEvent event) {
1035
1036 // Validate the source of this event
1037 if (!(event.getSource() instanceof Context))
1038 return;
1039 // Process a relevant property change
1040 if (event.getPropertyName().equals("sessionTimeout")) {
1041 try {
1042 setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60);
1043 } catch (NumberFormatException e) {
1044 log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue()));
1045 }
1046 }
1047
1048 }
1049
1050 // -------------------------------------------------------- Replication
1051 // Methods
1052
1053 /**
1054 * A message was received from another node, this is the callback method to
1055 * implement if you are interested in receiving replication messages.
1056 *
1057 * @param cmsg -
1058 * the message received.
1059 */
1060 public void messageDataReceived(ClusterMessage cmsg) {
1061 if (cmsg != null && cmsg instanceof SessionMessage) {
1062 SessionMessage msg = (SessionMessage) cmsg;
1063 switch (msg.getEventType()) {
1064 case SessionMessage.EVT_GET_ALL_SESSIONS:
1065 case SessionMessage.EVT_SESSION_CREATED:
1066 case SessionMessage.EVT_SESSION_EXPIRED:
1067 case SessionMessage.EVT_SESSION_ACCESSED:
1068 case SessionMessage.EVT_SESSION_DELTA: {
1069 synchronized(receivedMessageQueue) {
1070 if(receiverQueue) {
1071 receivedMessageQueue.add(msg);
1072 return ;
1073 }
1074 }
1075 break;
1076 }
1077 default: {
1078 //we didn't queue, do nothing
1079 break;
1080 }
1081 } //switch
1082
1083 messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
1084 }
1085 }
1086
1087 /**
1088 * When the request has been completed, the replication valve will notify
1089 * the manager, and the manager will decide whether any replication is
1090 * needed or not. If there is a need for replication, the manager will
1091 * create a session message and that will be replicated. The cluster
1092 * determines where it gets sent.
1093 *
1094 * @param sessionId -
1095 * the sessionId that just completed.
1096 * @return a SessionMessage to be sent,
1097 */
1098 public ClusterMessage requestCompleted(String sessionId) {
1099 try {
1100 DeltaSession session = (DeltaSession) findSession(sessionId);
1101 DeltaRequest deltaRequest = session.getDeltaRequest();
1102 SessionMessage msg = null;
1103 boolean isDeltaRequest = false ;
1104 synchronized(deltaRequest) {
1105 isDeltaRequest = deltaRequest.getSize() > 0 ;
1106 if (isDeltaRequest) {
1107 counterSend_EVT_SESSION_DELTA++;
1108 byte[] data = serializeDeltaRequest(deltaRequest);
1109 msg = new SessionMessageImpl(getName(),
1110 SessionMessage.EVT_SESSION_DELTA,
1111 data,
1112 sessionId,
1113 sessionId + "-" + System.currentTimeMillis());
1114 session.resetDeltaRequest();
1115 }
1116 }
1117 if(!isDeltaRequest) {
1118 if(!session.isPrimarySession()) {
1119 counterSend_EVT_SESSION_ACCESSED++;
1120 msg = new SessionMessageImpl(getName(),
1121 SessionMessage.EVT_SESSION_ACCESSED,
1122 null,
1123 sessionId,
1124 sessionId + "-" + System.currentTimeMillis());
1125 if (log.isDebugEnabled()) {
1126 log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId));
1127 }
1128 }
1129 } else { // log only outside synch block!
1130 if (log.isDebugEnabled()) {
1131 log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId));
1132 }
1133 }
1134 session.setPrimarySession(true);
1135 //check to see if we need to send out an access message
1136 if ((msg == null)) {
1137 long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
1138 if (replDelta > (getMaxInactiveInterval() * 1000)) {
1139 counterSend_EVT_SESSION_ACCESSED++;
1140 msg = new SessionMessageImpl(getName(),
1141 SessionMessage.EVT_SESSION_ACCESSED,
1142 null,
1143 sessionId,
1144 sessionId + "-" + System.currentTimeMillis());
1145 if (log.isDebugEnabled()) {
1146 log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId));
1147 }
1148 }
1149
1150 }
1151
1152 //update last replicated time
1153 if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis());
1154 return msg;
1155 } catch (IOException x) {
1156 log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x);
1157 return null;
1158 }
1159
1160 }
1161 /**
1162 * Reset manager statistics
1163 */
1164 public synchronized void resetStatistics() {
1165 processingTime = 0 ;
1166 expiredSessions = 0 ;
1167 rejectedSessions = 0 ;
1168 sessionReplaceCounter = 0 ;
1169 counterNoStateTransfered = 0 ;
1170 maxActive = getActiveSessions() ;
1171 sessionCounter = getActiveSessions() ;
1172 counterReceive_EVT_ALL_SESSION_DATA = 0;
1173 counterReceive_EVT_GET_ALL_SESSIONS = 0;
1174 counterReceive_EVT_SESSION_ACCESSED = 0 ;
1175 counterReceive_EVT_SESSION_CREATED = 0 ;
1176 counterReceive_EVT_SESSION_DELTA = 0 ;
1177 counterReceive_EVT_SESSION_EXPIRED = 0 ;
1178 counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1179 counterSend_EVT_ALL_SESSION_DATA = 0;
1180 counterSend_EVT_GET_ALL_SESSIONS = 0;
1181 counterSend_EVT_SESSION_ACCESSED = 0 ;
1182 counterSend_EVT_SESSION_CREATED = 0 ;
1183 counterSend_EVT_SESSION_DELTA = 0 ;
1184 counterSend_EVT_SESSION_EXPIRED = 0 ;
1185 counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1186
1187 }
1188
1189 // -------------------------------------------------------- persistence handler
1190
1191 public void load() {
1192
1193 }
1194
1195 public void unload() {
1196
1197 }
1198
1199 // -------------------------------------------------------- expire
1200
1201 /**
1202 * send session expired to other cluster nodes
1203 *
1204 * @param id
1205 * session id
1206 */
1207 protected void sessionExpired(String id) {
1208 counterSend_EVT_SESSION_EXPIRED++ ;
1209 SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG");
1210 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id));
1211 send(msg);
1212 }
1213
1214 /**
1215 * Exipre all find sessions.
1216 */
1217 public void expireAllLocalSessions()
1218 {
1219 long timeNow = System.currentTimeMillis();
1220 Session sessions[] = findSessions();
1221 int expireDirect = 0 ;
1222 int expireIndirect = 0 ;
1223
1224 if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
1225 for (int i = 0; i < sessions.length; i++) {
1226 if (sessions[i] instanceof DeltaSession) {
1227 DeltaSession session = (DeltaSession) sessions[i];
1228 if (session.isPrimarySession()) {
1229 if (session.isValid()) {
1230 session.expire();
1231 expireDirect++;
1232 } else {
1233 expireIndirect++;
1234 }//end if
1235 }//end if
1236 }//end if
1237 }//for
1238 long timeEnd = System.currentTimeMillis();
1239 if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
1240
1241 }
1242
1243 /**
1244 * When the manager expires session not tied to a request. The cluster will
1245 * periodically ask for a list of sessions that should expire and that
1246 * should be sent across the wire.
1247 *
1248 * @return The invalidated sessions array
1249 */
1250 public String[] getInvalidatedSessions() {
1251 return new String[0];
1252 }
1253
1254 // -------------------------------------------------------- message receive
1255
1256 /**
1257 * Test that sender and local domain is the same
1258 */
1259 protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
1260 boolean sameDomain= true;
1261 if (!sameDomain && log.isWarnEnabled()) {
1262 log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
1263 new Object[] {getName(),
1264 msg.getEventTypeString(),
1265 sender,
1266 "",
1267 "" }));
1268 }
1269 return sameDomain ;
1270 }
1271
1272 /**
1273 * This method is called by the received thread when a SessionMessage has
1274 * been received from one of the other nodes in the cluster.
1275 *
1276 * @param msg -
1277 * the message received
1278 * @param sender -
1279 * the sender of the message, this is used if we receive a
1280 * EVT_GET_ALL_SESSION message, so that we only reply to the
1281 * requesting node
1282 */
1283 protected void messageReceived(SessionMessage msg, Member sender) {
1284 if(doDomainReplication() && !checkSenderDomain(msg,sender)) {
1285 return;
1286 }
1287 ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
1288 try {
1289
1290 ClassLoader[] loaders = getClassLoaders();
1291 if ( loaders != null && loaders.length > 0) Thread.currentThread().setContextClassLoader(loaders[0]);
1292 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender));
1293
1294 switch (msg.getEventType()) {
1295 case SessionMessage.EVT_GET_ALL_SESSIONS: {
1296 handleGET_ALL_SESSIONS(msg,sender);
1297 break;
1298 }
1299 case SessionMessage.EVT_ALL_SESSION_DATA: {
1300 handleALL_SESSION_DATA(msg,sender);
1301 break;
1302 }
1303 case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
1304 handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
1305 break;
1306 }
1307 case SessionMessage.EVT_SESSION_CREATED: {
1308 handleSESSION_CREATED(msg,sender);
1309 break;
1310 }
1311 case SessionMessage.EVT_SESSION_EXPIRED: {
1312 handleSESSION_EXPIRED(msg,sender);
1313 break;
1314 }
1315 case SessionMessage.EVT_SESSION_ACCESSED: {
1316 handleSESSION_ACCESSED(msg,sender);
1317 break;
1318 }
1319 case SessionMessage.EVT_SESSION_DELTA: {
1320 handleSESSION_DELTA(msg,sender);
1321 break;
1322 }
1323 default: {
1324 //we didn't recognize the message type, do nothing
1325 break;
1326 }
1327 } //switch
1328 } catch (Exception x) {
1329 log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x);
1330 } finally {
1331 Thread.currentThread().setContextClassLoader(contextLoader);
1332 }
1333 }
1334
1335 // -------------------------------------------------------- message receiver handler
1336
1337
1338 /**
1339 * handle receive session state is complete transfered
1340 * @param msg
1341 * @param sender
1342 */
1343 protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
1344 counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
1345 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort())));
1346 stateTransferCreateSendTime = msg.getTimestamp() ;
1347 stateTransfered = true ;
1348 }
1349
1350 /**
1351 * handle receive session delta
1352 * @param msg
1353 * @param sender
1354 * @throws IOException
1355 * @throws ClassNotFoundException
1356 */
1357 protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException {
1358 counterReceive_EVT_SESSION_DELTA++;
1359 byte[] delta = msg.getSession();
1360 DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
1361 if (session != null) {
1362 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID()));
1363 DeltaRequest dreq = deserializeDeltaRequest(session, delta);
1364 dreq.execute(session, notifyListenersOnReplication);
1365 session.setPrimarySession(false);
1366 }
1367 }
1368
1369 /**
1370 * handle receive session is access at other node ( primary session is now false)
1371 * @param msg
1372 * @param sender
1373 * @throws IOException
1374 */
1375 protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
1376 counterReceive_EVT_SESSION_ACCESSED++;
1377 DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
1378 if (session != null) {
1379 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID()));
1380 session.access();
1381 session.setPrimarySession(false);
1382 session.endAccess();
1383 }
1384 }
1385
1386 /**
1387 * handle receive session is expire at other node ( expire session also here)
1388 * @param msg
1389 * @param sender
1390 * @throws IOException
1391 */
1392 protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException {
1393 counterReceive_EVT_SESSION_EXPIRED++;
1394 DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
1395 if (session != null) {
1396 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID()));
1397 session.expire(notifySessionListenersOnReplication, false);
1398 }
1399 }
1400
1401 /**
1402 * handle receive new session is created at other node (create backup - primary false)
1403 * @param msg
1404 * @param sender
1405 */
1406 protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
1407 counterReceive_EVT_SESSION_CREATED++;
1408 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID()));
1409 DeltaSession session = (DeltaSession) createEmptySession();
1410 session.setManager(this);
1411 session.setValid(true);
1412 session.setPrimarySession(false);
1413 session.setCreationTime(msg.getTimestamp());
1414 // use container maxInactiveInterval so that session will expire correctly in case of primary transfer
1415 session.setMaxInactiveInterval(getMaxInactiveInterval());
1416 session.access();
1417 if(notifySessionListenersOnReplication)
1418 session.setId(msg.getSessionID());
1419 else
1420 session.setIdInternal(msg.getSessionID());
1421 session.resetDeltaRequest();
1422 session.endAccess();
1423
1424 }
1425
1426 /**
1427 * handle receive sessions from other not ( restart )
1428 * @param msg
1429 * @param sender
1430 * @throws ClassNotFoundException
1431 * @throws IOException
1432 */
1433 protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
1434 counterReceive_EVT_ALL_SESSION_DATA++;
1435 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName()));
1436 byte[] data = msg.getSession();
1437 deserializeSessions(data);
1438 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName()));
1439 //stateTransferred = true;
1440 }
1441
1442 /**
1443 * handle receive that other node want all sessions ( restart )
1444 * a) send all sessions with one message
1445 * b) send session at blocks
1446 * After sending send state is complete transfered
1447 * @param msg
1448 * @param sender
1449 * @throws IOException
1450 */
1451 protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
1452 counterReceive_EVT_GET_ALL_SESSIONS++;
1453 //get a list of all the session from this manager
1454 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
1455 // Write the number of active sessions, followed by the details
1456 // get all sessions and serialize without sync
1457 Session[] currentSessions = findSessions();
1458 long findSessionTimestamp = System.currentTimeMillis() ;
1459 if (isSendAllSessions()) {
1460 sendSessions(sender, currentSessions, findSessionTimestamp);
1461 } else {
1462 // send session at blocks
1463 int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
1464 Session[] sendSessions = new Session[len];
1465 for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
1466 len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
1467 System.arraycopy(currentSessions, i, sendSessions, 0, len);
1468 sendSessions(sender, sendSessions,findSessionTimestamp);
1469 if (getSendAllSessionsWaitTime() > 0) {
1470 try {
1471 Thread.sleep(getSendAllSessionsWaitTime());
1472 } catch (Exception sleep) {
1473 }
1474 }//end if
1475 }//for
1476 }//end if
1477
1478 SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
1479 newmsg.setTimestamp(findSessionTimestamp);
1480 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
1481 counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
1482 cluster.send(newmsg, sender);
1483 }
1484
1485
1486 /**
1487 * send a block of session to sender
1488 * @param sender
1489 * @param currentSessions
1490 * @param sendTimestamp
1491 * @throws IOException
1492 */
1493 protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException {
1494 byte[] data = serializeSessions(currentSessions);
1495 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName()));
1496 SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName());
1497 newmsg.setTimestamp(sendTimestamp);
1498 if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName()));
1499 counterSend_EVT_ALL_SESSION_DATA++;
1500 cluster.send(newmsg, sender);
1501 }
1502
1503 public ClusterManager cloneFromTemplate() {
1504 DeltaManager result = new DeltaManager();
1505 result.name = "Clone-from-"+name;
1506 result.cluster = cluster;
1507 result.replicationValve = replicationValve;
1508 result.maxActiveSessions = maxActiveSessions;
1509 result.expireSessionsOnShutdown = expireSessionsOnShutdown;
1510 result.notifyListenersOnReplication = notifyListenersOnReplication;
1511 result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
1512 result.stateTransferTimeout = stateTransferTimeout;
1513 result.sendAllSessions = sendAllSessions;
1514 result.sendClusterDomainOnly = sendClusterDomainOnly ;
1515 result.sendAllSessionsSize = sendAllSessionsSize;
1516 result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ;
1517 result.receiverQueue = receiverQueue ;
1518 result.stateTimestampDrop = stateTimestampDrop ;
1519 result.stateTransferCreateSendTime = stateTransferCreateSendTime;
1520 return result;
1521 }
1522 }