1 /*
2 * Copyright 1999,2004-2005 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.catalina.cluster.session;
18
19 import java.beans.PropertyChangeEvent;
20 import java.beans.PropertyChangeListener;
21 import java.io.BufferedInputStream;
22 import java.io.BufferedOutputStream;
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectInputStream;
27 import java.io.ObjectOutputStream;
28 import java.util.ArrayList;
29 import java.util.Date;
30 import java.util.Iterator;
31
32 import org.apache.catalina.Cluster;
33 import org.apache.catalina.Container;
34 import org.apache.catalina.Context;
35 import org.apache.catalina.Engine;
36 import org.apache.catalina.Host;
37 import org.apache.catalina.Lifecycle;
38 import org.apache.catalina.LifecycleException;
39 import org.apache.catalina.LifecycleListener;
40 import org.apache.catalina.Loader;
41 import org.apache.catalina.Session;
42 import org.apache.catalina.cluster.CatalinaCluster;
43 import org.apache.catalina.cluster.ClusterManager;
44 import org.apache.catalina.cluster.ClusterMessage;
45 import org.apache.catalina.cluster.Member;
46 import org.apache.catalina.session.ManagerBase;
47 import org.apache.catalina.util.CustomObjectInputStream;
48 import org.apache.catalina.util.LifecycleSupport;
49 import org.apache.catalina.util.StringManager;
50
51 /**
52 * The DeltaManager manages replicated sessions by only replicating the deltas
53 * in data. For applications written to handle this, the DeltaManager is the
54 * optimal way of replicating data.
55 *
56 * This code is almost identical to StandardManager with a difference in how it
57 * persists sessions and some modifications to it.
58 *
59 * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
60 * reloading depends upon external calls to the <code>start()</code> and
61 * <code>stop()</code> methods of this class at the correct times.
62 *
63 * @author Filip Hanik
64 * @author Craig R. McClanahan
65 * @author Jean-Francois Arcand
66 * @author Peter Rossbach
67 * @version $Revision: 348972 $ $Date: 2005-11-25 10:56:09 -0500 (Fri, 25 Nov 2005) $
68 */
69
70 public class DeltaManager extends ManagerBase implements Lifecycle,
71 PropertyChangeListener, ClusterManager {
72
73 // ---------------------------------------------------- Security Classes
74
75 public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
76 .getLog(DeltaManager.class);
77
78 /**
79 * The string manager for this package.
80 */
81 protected static StringManager sm = StringManager
82 .getManager(Constants.Package);
83
84 // ----------------------------------------------------- Instance Variables
85
86 /**
87 * The descriptive information about this implementation.
88 */
89 private static final String info = "DeltaManager/2.0";
90
91 /**
92 * Has this component been started yet?
93 */
94 private boolean started = false;
95
96 /**
97 * The descriptive name of this Manager implementation (for logging).
98 */
99 protected static String managerName = "DeltaManager";
100
101 protected String name = null;
102
103 protected boolean defaultMode = false;
104
105 private CatalinaCluster cluster = null;
106
107 /**
108 * The lifecycle event support for this component.
109 */
110 protected LifecycleSupport lifecycle = new LifecycleSupport(this);
111
112 /**
113 * The maximum number of active Sessions allowed, or -1 for no limit.
114 */
115 private int maxActiveSessions = -1;
116
117 private boolean expireSessionsOnShutdown = false;
118
119 private boolean notifyListenersOnReplication = true;
120
121 private boolean notifySessionListenersOnReplication = true;
122
123 private boolean stateTransferred = false ;
124
125 private int stateTransferTimeout = 60;
126
127 private boolean sendAllSessions = true;
128
129 private boolean sendClusterDomainOnly = true ;
130
131 private int sendAllSessionsSize = 1000 ;
132
133 /**
134 * wait time between send session block (default 2 sec)
135 */
136 private int sendAllSessionsWaitTime = 2 * 1000 ;
137
138 private ArrayList receivedMessageQueue = new ArrayList() ;
139
140 private boolean receiverQueue = false ;
141
142 private boolean stateTimestampDrop = true ;
143
144 private long stateTransferCreateSendTime;
145
146 // ------------------------------------------------------------------ stats attributes
147
148 int rejectedSessions = 0;
149
150 private long sessionReplaceCounter = 0 ;
151
152 long processingTime = 0;
153
154 private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ;
155
156 private long counterSend_EVT_ALL_SESSION_DATA = 0 ;
157
158 private long counterReceive_EVT_ALL_SESSION_DATA = 0 ;
159
160 private long counterReceive_EVT_SESSION_CREATED = 0 ;
161
162 private long counterReceive_EVT_SESSION_EXPIRED = 0;
163
164 private long counterReceive_EVT_SESSION_ACCESSED = 0 ;
165
166 private long counterReceive_EVT_SESSION_DELTA = 0;
167
168 private long counterSend_EVT_GET_ALL_SESSIONS = 0 ;
169
170 private long counterSend_EVT_SESSION_CREATED = 0;
171
172 private long counterSend_EVT_SESSION_DELTA = 0 ;
173
174 private long counterSend_EVT_SESSION_ACCESSED = 0;
175
176 private long counterSend_EVT_SESSION_EXPIRED = 0;
177
178 private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
179
180 private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ;
181
182 private int counterNoStateTransfered = 0 ;
183
184
185 // ------------------------------------------------------------- Constructor
186
187 public DeltaManager() {
188 super();
189 }
190
191 // ------------------------------------------------------------- Properties
192
193 /**
194 * Return descriptive information about this Manager implementation and the
195 * corresponding version number, in the format
196 * <code><description>/<version></code>.
197 */
198 public String getInfo() {
199
200 return (info);
201
202 }
203
204 public void setName(String name) {
205 this.name = name;
206 }
207
208 /**
209 * Return the descriptive short name of this Manager implementation.
210 */
211 public String getName() {
212
213 return (name);
214
215 }
216
217 /**
218 * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
219 */
220 public long getCounterSend_EVT_GET_ALL_SESSIONS() {
221 return counterSend_EVT_GET_ALL_SESSIONS;
222 }
223
224 /**
225 * @return Returns the counterSend_EVT_SESSION_ACCESSED.
226 */
227 public long getCounterSend_EVT_SESSION_ACCESSED() {
228 return counterSend_EVT_SESSION_ACCESSED;
229 }
230
231 /**
232 * @return Returns the counterSend_EVT_SESSION_CREATED.
233 */
234 public long getCounterSend_EVT_SESSION_CREATED() {
235 return counterSend_EVT_SESSION_CREATED;
236 }
237
238 /**
239 * @return Returns the counterSend_EVT_SESSION_DELTA.
240 */
241 public long getCounterSend_EVT_SESSION_DELTA() {
242 return counterSend_EVT_SESSION_DELTA;
243 }
244
245 /**
246 * @return Returns the counterSend_EVT_SESSION_EXPIRED.
247 */
248 public long getCounterSend_EVT_SESSION_EXPIRED() {
249 return counterSend_EVT_SESSION_EXPIRED;
250 }
251
252 /**
253 * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
254 */
255 public long getCounterSend_EVT_ALL_SESSION_DATA() {
256 return counterSend_EVT_ALL_SESSION_DATA;
257 }
258
259 /**
260 * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
261 */
262 public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
263 return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
264 }
265
266 /**
267 * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
268 */
269 public long getCounterReceive_EVT_ALL_SESSION_DATA() {
270 return counterReceive_EVT_ALL_SESSION_DATA;
271 }
272
273 /**
274 * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
275 */
276 public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
277 return counterReceive_EVT_GET_ALL_SESSIONS;
278 }
279
280 /**
281 * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
282 */
283 public long getCounterReceive_EVT_SESSION_ACCESSED() {
284 return counterReceive_EVT_SESSION_ACCESSED;
285 }
286
287 /**
288 * @return Returns the counterReceive_EVT_SESSION_CREATED.
289 */
290 public long getCounterReceive_EVT_SESSION_CREATED() {
291 return counterReceive_EVT_SESSION_CREATED;
292 }
293
294 /**
295 * @return Returns the counterReceive_EVT_SESSION_DELTA.
296 */
297 public long getCounterReceive_EVT_SESSION_DELTA() {
298 return counterReceive_EVT_SESSION_DELTA;
299 }
300
301 /**
302 * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
303 */
304 public long getCounterReceive_EVT_SESSION_EXPIRED() {
305 return counterReceive_EVT_SESSION_EXPIRED;
306 }
307
308
309 /**
310 * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
311 */
312 public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
313 return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
314 }
315
316 /**
317 * @return Returns the processingTime.
318 */
319 public long getProcessingTime() {
320 return processingTime;
321 }
322
323 /**
324 * @return Returns the sessionReplaceCounter.
325 */
326 public long getSessionReplaceCounter() {
327 return sessionReplaceCounter;
328 }
329
330 /**
331 * Number of session creations that failed due to maxActiveSessions
332 *
333 * @return The count
334 */
335 public int getRejectedSessions() {
336 return rejectedSessions;
337 }
338
339 public void setRejectedSessions(int rejectedSessions) {
340 this.rejectedSessions = rejectedSessions;
341 }
342
343 /**
344 * @return Returns the counterNoStateTransfered.
345 */
346 public int getCounterNoStateTransfered() {
347 return counterNoStateTransfered;
348 }
349
350 public int getReceivedQueueSize() {
351 return receivedMessageQueue.size() ;
352 }
353
354 /**
355 * @return Returns the stateTransferTimeout.
356 */
357 public int getStateTransferTimeout() {
358 return stateTransferTimeout;
359 }
360 /**
361 * @param timeoutAllSession The timeout
362 */
363 public void setStateTransferTimeout(int timeoutAllSession) {
364 this.stateTransferTimeout = timeoutAllSession;
365 }
366
367 public boolean getStateTransferred() {
368 return stateTransferred;
369 }
370
371 public void setStateTransferred(boolean stateTransferred) {
372 this.stateTransferred = stateTransferred;
373 }
374
375 /**
376 * @return Returns the sendAllSessionsWaitTime in msec
377 */
378 public int getSendAllSessionsWaitTime() {
379 return sendAllSessionsWaitTime;
380 }
381
382 /**
383 * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
384 */
385 public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
386 this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
387 }
388
389 /**
390 * @return Returns the sendClusterDomainOnly.
391 */
392 public boolean isSendClusterDomainOnly() {
393 return sendClusterDomainOnly;
394 }
395
396 /**
397 * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
398 */
399 public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
400 this.sendClusterDomainOnly = sendClusterDomainOnly;
401 }
402
403 /**
404 * @return Returns the stateTimestampDrop.
405 */
406 public boolean isStateTimestampDrop() {
407 return stateTimestampDrop;
408 }
409
410 /**
411 * @param isTimestampDrop The new flag value
412 */
413 public void setStateTimestampDrop(boolean isTimestampDrop) {
414 this.stateTimestampDrop = isTimestampDrop;
415 }
416
417 /**
418 * Return the maximum number of active Sessions allowed, or -1 for no limit.
419 */
420 public int getMaxActiveSessions() {
421
422 return (this.maxActiveSessions);
423
424 }
425
426 /**
427 * Set the maximum number of actives Sessions allowed, or -1 for no limit.
428 *
429 * @param max
430 * The new maximum number of sessions
431 */
432 public void setMaxActiveSessions(int max) {
433
434 int oldMaxActiveSessions = this.maxActiveSessions;
435 this.maxActiveSessions = max;
436 support.firePropertyChange("maxActiveSessions", new Integer(
437 oldMaxActiveSessions), new Integer(this.maxActiveSessions));
438
439 }
440
441 /**
442 * @return Returns the sendAllSessions.
443 */
444 public boolean isSendAllSessions() {
445 return sendAllSessions;
446 }
447
448 /**
449 * @param sendAllSessions The sendAllSessions to set.
450 */
451 public void setSendAllSessions(boolean sendAllSessions) {
452 this.sendAllSessions = sendAllSessions;
453 }
454
455 /**
456 * @return Returns the sendAllSessionsSize.
457 */
458 public int getSendAllSessionsSize() {
459 return sendAllSessionsSize;
460 }
461
462 /**
463 * @param sendAllSessionsSize The sendAllSessionsSize to set.
464 */
465 public void setSendAllSessionsSize(int sendAllSessionsSize) {
466 this.sendAllSessionsSize = sendAllSessionsSize;
467 }
468
469 /**
470 * @return Returns the notifySessionListenersOnReplication.
471 */
472 public boolean isNotifySessionListenersOnReplication() {
473 return notifySessionListenersOnReplication;
474 }
475
476 /**
477 * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
478 */
479 public void setNotifySessionListenersOnReplication(
480 boolean notifyListenersCreateSessionOnReplication) {
481 this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
482 }
483
484
485 public boolean isExpireSessionsOnShutdown() {
486 return expireSessionsOnShutdown;
487 }
488
489 public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) {
490 this.expireSessionsOnShutdown = expireSessionsOnShutdown;
491 }
492
493 public boolean isNotifyListenersOnReplication() {
494 return notifyListenersOnReplication;
495 }
496
497 public void setNotifyListenersOnReplication(
498 boolean notifyListenersOnReplication) {
499 this.notifyListenersOnReplication = notifyListenersOnReplication;
500 }
501
502
503 /**
504 * @return Returns the defaultMode.
505 */
506 public boolean isDefaultMode() {
507 return defaultMode;
508 }
509 /**
510 * @param defaultMode The defaultMode to set.
511 */
512 public void setDefaultMode(boolean defaultMode) {
513 this.defaultMode = defaultMode;
514 }
515
516 public CatalinaCluster getCluster() {
517 return cluster;
518 }
519
520 public void setCluster(CatalinaCluster cluster) {
521 this.cluster = cluster;
522 }
523
524 /**
525 * Set the Container with which this Manager has been associated. If it is a
526 * Context (the usual case), listen for changes to the session timeout
527 * property.
528 *
529 * @param container
530 * The associated Container
531 */
532 public void setContainer(Container container) {
533
534 // De-register from the old Container (if any)
535 if ((this.container != null) && (this.container instanceof Context))
536 ((Context) this.container).removePropertyChangeListener(this);
537
538 // Default processing provided by our superclass
539 super.setContainer(container);
540
541 // Register with the new Container (if any)
542 if ((this.container != null) && (this.container instanceof Context)) {
543 setMaxInactiveInterval(((Context) this.container)
544 .getSessionTimeout() * 60);
545 ((Context) this.container).addPropertyChangeListener(this);
546 }
547
548 }
549
550 // --------------------------------------------------------- Public Methods
551
552 /**
553 * Construct and return a new session object, based on the default settings
554 * specified by this Manager's properties. The session id will be assigned
555 * by this method, and available via the getId() method of the returned
556 * session. If a new session cannot be created for any reason, return
557 * <code>null</code>.
558 *
559 * @exception IllegalStateException
560 * if a new session cannot be instantiated for any reason
561 *
562 * Construct and return a new session object, based on the default settings
563 * specified by this Manager's properties. The session id will be assigned
564 * by this method, and available via the getId() method of the returned
565 * session. If a new session cannot be created for any reason, return
566 * <code>null</code>.
567 *
568 * @exception IllegalStateException
569 * if a new session cannot be instantiated for any reason
570 */
571 public Session createSession(String sessionId) {
572 return createSession(sessionId, true);
573 }
574
575 /**
576 * create new session with check maxActiveSessions and send session creation
577 * to other cluster nodes.
578 *
579 * @param distribute
580 * @return The session
581 */
582 public Session createSession(String sessionId, boolean distribute) {
583
584 if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
585 rejectedSessions++;
586 throw new IllegalStateException(sm
587 .getString("deltaManager.createSession.ise"));
588 }
589
590 DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
591 session.resetDeltaRequest();
592 if (distribute) {
593 sendCreateSession(session.getId(), session);
594 }
595 if (log.isDebugEnabled())
596 log.debug(sm.getString("deltaManager.createSession.newSession",
597 session.getId(), new Integer(sessions.size())));
598
599 return (session);
600
601 }
602
603 /**
604 * Send create session evt to all backup node
605 * @param sessionId
606 * @param session
607 */
608 protected void sendCreateSession(String sessionId, DeltaSession session) {
609 if(cluster.getMembers().length > 0 ) {
610 SessionMessage msg = new SessionMessageImpl(getName(),
611 SessionMessage.EVT_SESSION_CREATED, null, sessionId,
612 sessionId + "-" + System.currentTimeMillis());
613 if (log.isDebugEnabled())
614 log.debug(sm.getString("deltaManager.sendMessage.newSession",
615 name, sessionId));
616 counterSend_EVT_SESSION_CREATED++;
617 send(msg);
618 }
619 session.resetDeltaRequest();
620 }
621
622 /**
623 * Send messages to other backup member (domain or all)
624 * @param msg Session message
625 */
626 protected void send(SessionMessage msg) {
627 if(cluster != null) {
628 if(isSendClusterDomainOnly())
629 cluster.sendClusterDomain(msg);
630 else
631 cluster.send(msg);
632 }
633 }
634
635 /**
636 * Create DeltaSession
637 * @see org.apache.catalina.Manager#createEmptySession()
638 */
639 public Session createEmptySession() {
640 return getNewDeltaSession() ;
641 }
642
643 /**
644 * Get new session class to be used in the doLoad() method.
645 */
646 protected DeltaSession getNewDeltaSession() {
647 return new DeltaSession(this);
648 }
649
650 /**
651 * Load Deltarequest from external node
652 * Load the Class at container classloader
653 * @see DeltaRequest#readExternal(java.io.ObjectInput)
654 * @param session
655 * @param data message data
656 * @return The request
657 * @throws ClassNotFoundException
658 * @throws IOException
659 */
660 protected DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data)
661 throws ClassNotFoundException, IOException {
662 ByteArrayInputStream fis = null;
663 ReplicationStream ois = null;
664 Loader loader = null;
665 ClassLoader classLoader = null;
666 //fix to be able to run the DeltaManager
667 //stand alone without a container.
668 //use the Threads context class loader
669 if (container != null)
670 loader = container.getLoader();
671 if (loader != null)
672 classLoader = loader.getClassLoader();
673 else
674 classLoader = Thread.currentThread().getContextClassLoader();
675 //end fix
676 fis = new ByteArrayInputStream(data);
677 ois = new ReplicationStream(fis, classLoader);
678 session.getDeltaRequest().readExternal(ois);
679 ois.close();
680 return session.getDeltaRequest();
681 }
682
683 /**
684 * serialize DeltaRequest
685 * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
686 *
687 * @param deltaRequest
688 * @return serialized delta request
689 * @throws IOException
690 */
691 protected byte[] unloadDeltaRequest(DeltaRequest deltaRequest)
692 throws IOException {
693 ByteArrayOutputStream bos = new ByteArrayOutputStream();
694 ObjectOutputStream oos = new ObjectOutputStream(bos);
695 deltaRequest.writeExternal(oos);
696 oos.flush();
697 oos.close();
698 return bos.toByteArray();
699 }
700
701 /**
702 * Load sessions from other cluster node.
703 * FIXME replace currently sessions with same id without notifcation.
704 * FIXME SSO handling is not really correct with the session replacement!
705 * @exception ClassNotFoundException
706 * if a serialized class cannot be found during the reload
707 * @exception IOException
708 * if an input/output error occurs
709 */
710 protected void deserializeSessions(byte[] data) throws ClassNotFoundException,
711 IOException {
712
713 // Initialize our internal data structures
714 //sessions.clear(); //should not do this
715 // Open an input stream to the specified pathname, if any
716 ClassLoader originalLoader = Thread.currentThread()
717 .getContextClassLoader();
718 ObjectInputStream ois = null;
719 // Load the previously unloaded active sessions
720 try {
721 ois = openDeserializeObjectStream(data);
722 Integer count = (Integer) ois.readObject();
723 int n = count.intValue();
724 for (int i = 0; i < n; i++) {
725 DeltaSession session = (DeltaSession) createEmptySession();
726 session.readObjectData(ois);
727 session.setManager(this);
728 session.setValid(true);
729 session.setPrimarySession(false);
730 //in case the nodes in the cluster are out of
731 //time synch, this will make sure that we have the
732 //correct timestamp, isValid returns true, cause
733 // accessCount=1
734 session.access();
735 //make sure that the session gets ready to expire if
736 // needed
737 session.setAccessCount(0);
738 session.resetDeltaRequest();
739 // FIXME How inform other session id cache like SingleSignOn
740 // increment sessionCounter to correct stats report
741 if (findSession(session.getIdInternal()) == null ) {
742 sessionCounter++;
743 } else {
744 sessionReplaceCounter++;
745 // FIXME better is to grap this sessions again !
746 if (log.isWarnEnabled())
747 log.warn(sm.getString(
748 "deltaManager.loading.existing.session",
749 session.getIdInternal()));
750 }
751 add(session);
752 }
753 } catch (ClassNotFoundException e) {
754 log.error(sm.getString("deltaManager.loading.cnfe", e), e);
755 throw e;
756 } catch (IOException e) {
757 log.error(sm.getString("deltaManager.loading.ioe", e), e);
758 throw e;
759 } finally {
760 // Close the input stream
761 try {
762 if (ois != null)
763 ois.close();
764 } catch (IOException f) {
765 // ignored
766 }
767 ois = null;
768 if (originalLoader != null)
769 Thread.currentThread().setContextClassLoader(originalLoader);
770 }
771
772 }
773
774 /**
775 * Open Stream and use correct ClassLoader (Container) Switch
776 * ThreadClassLoader
777 *
778 * @param data
779 * @return The object input stream
780 * @throws IOException
781 */
782 protected ObjectInputStream openDeserializeObjectStream(byte[] data) throws IOException {
783 ObjectInputStream ois = null;
784 ByteArrayInputStream fis = null;
785 try {
786 Loader loader = null;
787 ClassLoader classLoader = null;
788 fis = new ByteArrayInputStream(data);
789 BufferedInputStream bis = new BufferedInputStream(fis);
790 if (container != null)
791 loader = container.getLoader();
792 if (loader != null)
793 classLoader = loader.getClassLoader();
794 if (classLoader != null) {
795 if (log.isTraceEnabled())
796 log.trace(sm.getString(
797 "deltaManager.loading.withContextClassLoader",
798 getName()));
799 ois = new CustomObjectInputStream(bis, classLoader);
800 Thread.currentThread().setContextClassLoader(classLoader);
801 } else {
802 if (log.isTraceEnabled())
803 log.trace(sm.getString(
804 "deltaManager.loading.withoutClassLoader",
805 getName()));
806 ois = new ObjectInputStream(bis);
807 }
808 } catch (IOException e) {
809 log.error(sm.getString("deltaManager.loading.ioe", e), e);
810 if (ois != null) {
811 try {
812 ois.close();
813 } catch (IOException f) {
814 ;
815 }
816 ois = null;
817 }
818 throw e;
819 }
820 return ois;
821 }
822
823 /**
824 * Save any currently active sessions in the appropriate persistence
825 * mechanism, if any. If persistence is not supported, this method returns
826 * without doing anything.
827 *
828 * @exception IOException
829 * if an input/output error occurs
830 */
831 protected byte[] serializeSessions(Session[] currentSessions) throws IOException {
832
833 // Open an output stream to the specified pathname, if any
834 ByteArrayOutputStream fos = null;
835 ObjectOutputStream oos = null;
836
837 try {
838 fos = new ByteArrayOutputStream();
839 oos = new ObjectOutputStream(new BufferedOutputStream(fos));
840 oos.writeObject(new Integer(currentSessions.length));
841 for(int i=0 ; i < currentSessions.length;i++) {
842 ((DeltaSession)currentSessions[i]).writeObjectData(oos);
843 }
844 // Flush and close the output stream
845 oos.flush();
846 } catch (IOException e) {
847 log.error(sm.getString("deltaManager.unloading.ioe", e), e);
848 throw e;
849 } finally {
850 if (oos != null) {
851 try {
852 oos.close();
853 } catch (IOException f) {
854 ;
855 }
856 oos = null;
857 }
858 }
859 // send object data as byte[]
860 return fos.toByteArray();
861 }
862
863 // ------------------------------------------------------ Lifecycle Methods
864
865 /**
866 * Add a lifecycle event listener to this component.
867 *
868 * @param listener
869 * The listener to add
870 */
871 public void addLifecycleListener(LifecycleListener listener) {
872
873 lifecycle.addLifecycleListener(listener);
874
875 }
876
877 /**
878 * Get the lifecycle listeners associated with this lifecycle. If this
879 * Lifecycle has no listeners registered, a zero-length array is returned.
880 */
881 public LifecycleListener[] findLifecycleListeners() {
882
883 return lifecycle.findLifecycleListeners();
884
885 }
886
887 /**
888 * Remove a lifecycle event listener from this component.
889 *
890 * @param listener
891 * The listener to remove
892 */
893 public void removeLifecycleListener(LifecycleListener listener) {
894
895 lifecycle.removeLifecycleListener(listener);
896
897 }
898
899 /**
900 * Prepare for the beginning of active use of the public methods of this
901 * component. This method should be called after <code>configure()</code>,
902 * and before any of the public methods of the component are utilized.
903 *
904 * @exception LifecycleException
905 * if this component detects a fatal error that prevents this
906 * component from being used
907 */
908 public void start() throws LifecycleException {
909 if (!initialized)
910 init();
911
912 // Validate and update our current component state
913 if (started) {
914 return;
915 }
916 started = true;
917 lifecycle.fireLifecycleEvent(START_EVENT, null);
918
919 // Force initialization of the random number generator
920 String dummy = generateSessionId();
921
922 // Load unloaded sessions, if any
923 try {
924 //the channel is already running
925 Cluster cluster = getCluster() ;
926 // stop remove cluster binding
927 if(cluster == null) {
928 Container context = getContainer() ;
929 if(context != null && context instanceof Context) {
930 Container host = context.getParent() ;
931 if(host != null && host instanceof Host) {
932 cluster = host.getCluster();
933 if(cluster != null && cluster instanceof CatalinaCluster) {
934 setCluster((CatalinaCluster) cluster) ;
935 } else {
936 Container engine = host.getParent() ;
937 if(engine != null && engine instanceof Engine) {
938 cluster = engine.getCluster();
939 if(cluster != null && cluster instanceof CatalinaCluster) {
940 setCluster((CatalinaCluster) cluster) ;
941 }
942 } else {
943 cluster = null ;
944 }
945 }
946 }
947 }
948 }
949 if (cluster == null) {
950 log.error(sm.getString("deltaManager.noCluster", getName()));
951 return;
952 } else {
953 if (log.isInfoEnabled()) {
954 String type = "unknown" ;
955 if( cluster.getContainer() instanceof Host){
956 type = "Host" ;
957 } else if( cluster.getContainer() instanceof Engine){
958 type = "Engine" ;
959 }
960 log.info(sm
961 .getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
962 }
963 }
964 if (log.isInfoEnabled())
965 log.info(sm
966 .getString("deltaManager.startClustering", getName()));
967 //to survice context reloads, as only a stop/start is called, not
968 // createManager
969 ((CatalinaCluster)cluster).addManager(getName(), this);
970
971 getAllClusterSessions();
972
973 } catch (Throwable t) {
974 log.error(sm.getString("deltaManager.managerLoad"), t);
975 }
976 }
977
978 /**
979 * get from first session master the backup from all clustered sessions
980 * @see #findSessionMasterMember()
981 */
982 public synchronized void getAllClusterSessions() {
983 if (cluster != null && cluster.getMembers().length > 0) {
984 long beforeSendTime = System.currentTimeMillis();
985 Member mbr = findSessionMasterMember();
986 if(mbr == null) { // No domain member found
987 return;
988 }
989 SessionMessage msg = new SessionMessageImpl(this.getName(),
990 SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL",
991 "GET-ALL-" + getName());
992 msg.setResend(ClusterMessage.FLAG_FORBIDDEN);
993 // set reference time
994 msg.setTimestamp(beforeSendTime);
995 stateTransferCreateSendTime = beforeSendTime ;
996 // request session state
997 counterSend_EVT_GET_ALL_SESSIONS++;
998 stateTransferred = false ;
999 // FIXME This send call block the deploy thread, when sender waitForAck is enabled
1000 try {
1001 synchronized(receivedMessageQueue) {
1002 receiverQueue = true ;
1003 }
1004 cluster.send(msg, mbr);
1005 if (log.isWarnEnabled())
1006 log.warn(sm.getString("deltaManager.waitForSessionState",
1007 getName(), mbr));
1008 // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
1009 waitForSendAllSessions(beforeSendTime);
1010 } finally {
1011 synchronized(receivedMessageQueue) {
1012 for (Iterator iter = receivedMessageQueue.iterator(); iter
1013 .hasNext();) {
1014 SessionMessage smsg = (SessionMessage) iter.next();
1015 if (!stateTimestampDrop) {
1016 messageReceived(smsg,
1017 smsg.getAddress() != null ? (Member) smsg
1018 .getAddress() : null);
1019 } else {
1020 if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS
1021 && smsg.getTimestamp() >= stateTransferCreateSendTime) {
1022 // FIXME handle EVT_GET_ALL_SESSIONS later
1023 messageReceived(
1024 smsg,
1025 smsg.getAddress() != null ? (Member) smsg
1026 .getAddress()
1027 : null);
1028 } else {
1029 if (log.isWarnEnabled()) {
1030 log.warn(sm.getString(
1031 "deltaManager.dropMessage",
1032 getName(), smsg
1033 .getEventTypeString(),
1034 new Date(stateTransferCreateSendTime), new Date(
1035 smsg.getTimestamp())));
1036 }
1037 }
1038 }
1039 }
1040 receivedMessageQueue.clear();
1041 receiverQueue = false ;
1042 }
1043 }
1044 } else {
1045 if (log.isInfoEnabled())
1046 log.info(sm.getString("deltaManager.noMembers", getName()));
1047 }
1048 }
1049
1050 /**
1051 * Find the master of the session state
1052 * @return master member of sessions
1053 */
1054 protected Member findSessionMasterMember() {
1055 Member mbr = null;
1056 Member mbrs[] = cluster.getMembers();
1057 String localMemberDomain = cluster.getMembershipService().getLocalMember().getDomain();
1058 if(isSendClusterDomainOnly()) {
1059 for (int i = 0; mbr == null && i < mbrs.length; i++) {
1060 Member member = mbrs[i];
1061 if(localMemberDomain.equals(member.getDomain()))
1062 mbr = member ;
1063 }
1064 } else {
1065 // FIXME Why only the first Member?
1066 if(mbrs.length != 0 )
1067 mbr = mbrs[0];
1068 }
1069 if(mbr == null && log.isWarnEnabled())
1070 log.warn(sm.getString("deltaManager.noMasterMember",
1071 getName(), localMemberDomain));
1072 if(mbr != null && log.isDebugEnabled())
1073 log.warn(sm.getString("deltaManager.foundMasterMember",
1074 getName(), mbr));
1075 return mbr;
1076 }
1077
1078 /**
1079 * Wait that cluster session state is transfer or timeout after 60 Sec
1080 * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
1081 */
1082 protected void waitForSendAllSessions(long beforeSendTime) {
1083 long reqStart = System.currentTimeMillis();
1084 long reqNow = reqStart ;
1085 boolean isTimeout = false;
1086 if(getStateTransferTimeout() > 0) {
1087 // wait that state is transfered with timeout check
1088 do {
1089 try {
1090 Thread.sleep(100);
1091 } catch (Exception sleep) {
1092 }
1093 reqNow = System.currentTimeMillis();
1094 isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
1095 } while ((!getStateTransferred()) && (!isTimeout));
1096 } else {
1097 if(getStateTransferTimeout() == -1) {
1098 // wait that state is transfered
1099 do {
1100 try {
1101 Thread.sleep(100);
1102 } catch (Exception sleep) {
1103 }
1104 } while ((!getStateTransferred()));
1105 reqNow = System.currentTimeMillis();
1106 }
1107 }
1108 if (isTimeout || (!getStateTransferred())) {
1109 counterNoStateTransfered++ ;
1110 log.error(sm.getString("deltaManager.noSessionState",
1111 getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime)));
1112 } else {
1113 if (log.isInfoEnabled())
1114 log.info(sm.getString("deltaManager.sessionReceived",
1115 getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime)));
1116 }
1117 }
1118
1119 /**
1120 * Gracefully terminate the active use of the public methods of this
1121 * component. This method should be the last one called on a given instance
1122 * of this component.
1123 *
1124 * @exception LifecycleException
1125 * if this component detects a fatal error that needs to be
1126 * reported
1127 */
1128 public void stop() throws LifecycleException {
1129
1130 if (log.isDebugEnabled())
1131 log.debug(sm.getString("deltaManager.stopped", getName()));
1132
1133
1134 // Validate and update our current component state
1135 if (!started)
1136 throw new LifecycleException(sm
1137 .getString("deltaManager.notStarted"));
1138 lifecycle.fireLifecycleEvent(STOP_EVENT, null);
1139 started = false;
1140
1141 // Expire all active sessions
1142 if (log.isInfoEnabled())
1143 log.info(sm.getString("deltaManager.expireSessions", getName()));
1144 Session sessions[] = findSessions();
1145 for (int i = 0; i < sessions.length; i++) {
1146 DeltaSession session = (DeltaSession) sessions[i];
1147 if (!session.isValid())
1148 continue;
1149 try {
1150 session.expire(true, isExpireSessionsOnShutdown());
1151 } catch (Throwable ignore) {
1152 ;
1153 }
1154 }
1155
1156 // Require a new random number generator if we are restarted
1157 this.random = null;
1158 getCluster().removeManager(getName(),this);
1159 if (initialized) {
1160 destroy();
1161 }
1162 }
1163
1164 // ----------------------------------------- PropertyChangeListener Methods
1165
1166 /**
1167 * Process property change events from our associated Context.
1168 *
1169 * @param event
1170 * The property change event that has occurred
1171 */
1172 public void propertyChange(PropertyChangeEvent event) {
1173
1174 // Validate the source of this event
1175 if (!(event.getSource() instanceof Context))
1176 return;
1177 Context context = (Context) event.getSource();
1178
1179 // Process a relevant property change
1180 if (event.getPropertyName().equals("sessionTimeout")) {
1181 try {
1182 setMaxInactiveInterval(((Integer) event.getNewValue())
1183 .intValue() * 60);
1184 } catch (NumberFormatException e) {
1185 log.error(sm.getString("deltaManager.sessionTimeout", event
1186 .getNewValue()));
1187 }
1188 }
1189
1190 }
1191
1192 // -------------------------------------------------------- Replication
1193 // Methods
1194
1195 /**
1196 * A message was received from another node, this is the callback method to
1197 * implement if you are interested in receiving replication messages.
1198 *
1199 * @param cmsg -
1200 * the message received.
1201 */
1202 public void messageDataReceived(ClusterMessage cmsg) {
1203 if (cmsg != null && cmsg instanceof SessionMessage) {
1204 SessionMessage msg = (SessionMessage) cmsg;
1205 switch (msg.getEventType()) {
1206 case SessionMessage.EVT_GET_ALL_SESSIONS:
1207 case SessionMessage.EVT_SESSION_CREATED:
1208 case SessionMessage.EVT_SESSION_EXPIRED:
1209 case SessionMessage.EVT_SESSION_ACCESSED:
1210 case SessionMessage.EVT_SESSION_DELTA: {
1211 synchronized(receivedMessageQueue) {
1212 if(receiverQueue) {
1213 receivedMessageQueue.add(msg);
1214 return ;
1215 }
1216 }
1217 break;
1218 }
1219 default: {
1220 //we didn't queue, do nothing
1221 break;
1222 }
1223 } //switch
1224
1225 messageReceived(msg, msg.getAddress() != null ? (Member) msg
1226 .getAddress() : null);
1227 }
1228 }
1229
1230 /**
1231 * When the request has been completed, the replication valve will notify
1232 * the manager, and the manager will decide whether any replication is
1233 * needed or not. If there is a need for replication, the manager will
1234 * create a session message and that will be replicated. The cluster
1235 * determines where it gets sent.
1236 *
1237 * @param sessionId -
1238 * the sessionId that just completed.
1239 * @return a SessionMessage to be sent,
1240 */
1241 public ClusterMessage requestCompleted(String sessionId) {
1242 try {
1243 DeltaSession session = (DeltaSession) findSession(sessionId);
1244 DeltaRequest deltaRequest = session.getDeltaRequest();
1245 SessionMessage msg = null;
1246 if (deltaRequest.getSize() > 0) {
1247
1248 counterSend_EVT_SESSION_DELTA++;
1249 byte[] data = unloadDeltaRequest(deltaRequest);
1250 msg = new SessionMessageImpl(name,
1251 SessionMessage.EVT_SESSION_DELTA, data, sessionId,
1252 sessionId + "-" + System.currentTimeMillis());
1253 session.resetDeltaRequest();
1254 if (log.isDebugEnabled()) {
1255 log.debug(sm.getString(
1256 "deltaManager.createMessage.delta",
1257 getName(), sessionId));
1258 }
1259
1260 } else if (!session.isPrimarySession()) {
1261 counterSend_EVT_SESSION_ACCESSED++;
1262 msg = new SessionMessageImpl(getName(),
1263 SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
1264 sessionId + "-" + System.currentTimeMillis());
1265 if (log.isDebugEnabled()) {
1266 log.debug(sm.getString(
1267 "deltaManager.createMessage.accessChangePrimary",
1268 getName(), sessionId));
1269 }
1270 }
1271 session.setPrimarySession(true);
1272 //check to see if we need to send out an access message
1273 if ((msg == null)) {
1274 long replDelta = System.currentTimeMillis()
1275 - session.getLastTimeReplicated();
1276 if (replDelta > (getMaxInactiveInterval() * 1000)) {
1277 counterSend_EVT_SESSION_ACCESSED++;
1278 msg = new SessionMessageImpl(getName(),
1279 SessionMessage.EVT_SESSION_ACCESSED, null,
1280 sessionId, sessionId + "-" + System.currentTimeMillis());
1281 if (log.isDebugEnabled()) {
1282 log.debug(sm.getString(
1283 "deltaManager.createMessage.access", getName(),
1284 sessionId));
1285 }
1286 }
1287
1288 }
1289
1290 //update last replicated time
1291 if (msg != null)
1292 session.setLastTimeReplicated(System.currentTimeMillis());
1293 return msg;
1294 } catch (IOException x) {
1295 log.error(sm.getString(
1296 "deltaManager.createMessage.unableCreateDeltaRequest",
1297 sessionId), x);
1298 return null;
1299 }
1300
1301 }
1302 /**
1303 * Reset manager statistics
1304 */
1305 public synchronized void resetStatistics() {
1306 processingTime = 0 ;
1307 expiredSessions = 0 ;
1308 rejectedSessions = 0 ;
1309 sessionReplaceCounter = 0 ;
1310 counterNoStateTransfered = 0 ;
1311 maxActive = getActiveSessions() ;
1312 sessionCounter = getActiveSessions() ;
1313 counterReceive_EVT_ALL_SESSION_DATA = 0;
1314 counterReceive_EVT_GET_ALL_SESSIONS = 0;
1315 counterReceive_EVT_SESSION_ACCESSED = 0 ;
1316 counterReceive_EVT_SESSION_CREATED = 0 ;
1317 counterReceive_EVT_SESSION_DELTA = 0 ;
1318 counterReceive_EVT_SESSION_EXPIRED = 0 ;
1319 counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1320 counterSend_EVT_ALL_SESSION_DATA = 0;
1321 counterSend_EVT_GET_ALL_SESSIONS = 0;
1322 counterSend_EVT_SESSION_ACCESSED = 0 ;
1323 counterSend_EVT_SESSION_CREATED = 0 ;
1324 counterSend_EVT_SESSION_DELTA = 0 ;
1325 counterSend_EVT_SESSION_EXPIRED = 0 ;
1326 counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1327
1328 }
1329
1330 // -------------------------------------------------------- persistence handler
1331
1332 public void load() {
1333
1334 }
1335
1336 public void unload() {
1337
1338 }
1339
1340 // -------------------------------------------------------- expire
1341
1342 /**
1343 * send session expired to other cluster nodes
1344 *
1345 * @param id
1346 * session id
1347 */
1348 protected void sessionExpired(String id) {
1349 counterSend_EVT_SESSION_EXPIRED++ ;
1350 SessionMessage msg = new SessionMessageImpl(getName(),
1351 SessionMessage.EVT_SESSION_EXPIRED, null, id, id
1352 + "-EXPIRED-MSG");
1353 if (log.isDebugEnabled())
1354 log.debug(sm.getString("deltaManager.createMessage.expire",
1355 getName(), id));
1356 send(msg);
1357 }
1358
1359 /**
1360 * Exipre all find sessions.
1361 */
1362 public void expireAllLocalSessions()
1363 {
1364 long timeNow = System.currentTimeMillis();
1365 Session sessions[] = findSessions();
1366 int expireDirect = 0 ;
1367 int expireIndirect = 0 ;
1368
1369 if(log.isDebugEnabled())
1370 log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length);
1371 for (int i = 0; i < sessions.length; i++) {
1372 if (sessions[i] instanceof DeltaSession) {
1373 DeltaSession session = (DeltaSession) sessions[i];
1374 if (session.isPrimarySession()) {
1375 if (session.isValid()) {
1376 session.expire();
1377 expireDirect++;
1378 } else {
1379 expireIndirect++;
1380 }
1381 }
1382 }
1383 }
1384 long timeEnd = System.currentTimeMillis();
1385 if(log.isDebugEnabled())
1386 log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect);
1387
1388 }
1389
1390 /**
1391 * When the manager expires session not tied to a request. The cluster will
1392 * periodically ask for a list of sessions that should expire and that
1393 * should be sent across the wire.
1394 *
1395 * @return The invalidated sessions array
1396 */
1397 public String[] getInvalidatedSessions() {
1398 return new String[0];
1399 }
1400
1401 // -------------------------------------------------------- message receive
1402
1403 /**
1404 * Test that sender and local domain is the same
1405 */
1406 protected boolean checkSenderDomain(SessionMessage msg,Member sender) {
1407 String localMemberDomain = cluster.getMembershipService().getLocalMember().getDomain();
1408 boolean sameDomain= localMemberDomain.equals(sender.getDomain());
1409 if (!sameDomain && log.isWarnEnabled()) {
1410 log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain",
1411 new Object[] {getName(),
1412 msg.getEventTypeString(),
1413 sender,
1414 sender.getDomain(),
1415 localMemberDomain }
1416 ));
1417 }
1418 return sameDomain ;
1419 }
1420
1421 /**
1422 * This method is called by the received thread when a SessionMessage has
1423 * been received from one of the other nodes in the cluster.
1424 *
1425 * @param msg -
1426 * the message received
1427 * @param sender -
1428 * the sender of the message, this is used if we receive a
1429 * EVT_GET_ALL_SESSION message, so that we only reply to the
1430 * requesting node
1431 */
1432 protected void messageReceived(SessionMessage msg, Member sender) {
1433 if(isSendClusterDomainOnly() && !checkSenderDomain(msg,sender)) {
1434 return;
1435 }
1436 try {
1437 if (log.isDebugEnabled())
1438 log.debug(sm.getString("deltaManager.receiveMessage.eventType",
1439 getName(), msg.getEventTypeString(), sender));
1440
1441 switch (msg.getEventType()) {
1442 case SessionMessage.EVT_GET_ALL_SESSIONS: {
1443 handleGET_ALL_SESSIONS(msg,sender);
1444 break;
1445 }
1446 case SessionMessage.EVT_ALL_SESSION_DATA: {
1447 handleALL_SESSION_DATA(msg,sender);
1448 break;
1449 }
1450 case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
1451 handleALL_SESSION_TRANSFERCOMPLETE(msg,sender);
1452 break;
1453 }
1454 case SessionMessage.EVT_SESSION_CREATED: {
1455 handleSESSION_CREATED(msg,sender);
1456 break;
1457 }
1458 case SessionMessage.EVT_SESSION_EXPIRED: {
1459 handleSESSION_EXPIRRED(msg,sender);
1460 break;
1461 }
1462 case SessionMessage.EVT_SESSION_ACCESSED: {
1463 handleSESSION_ACCESSED(msg,sender);
1464 break;
1465 }
1466 case SessionMessage.EVT_SESSION_DELTA: {
1467 handleSESSION_DELTA(msg,sender);
1468 break;
1469 }
1470 default: {
1471 //we didn't recognize the message type, do nothing
1472 break;
1473 }
1474 } //switch
1475 } catch (Exception x) {
1476 log.error(sm.getString("deltaManager.receiveMessage.error",
1477 getName()), x);
1478 }
1479 }
1480
1481 // -------------------------------------------------------- message receiver handler
1482
1483
1484 /**
1485 * handle receive session state is complete transfered
1486 * @param msg
1487 * @param sender
1488 */
1489 protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) {
1490 counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ;
1491 if (log.isDebugEnabled())
1492 log.debug(sm.getString(
1493 "deltaManager.receiveMessage.transfercomplete",
1494 getName(), sender.getHost(), new Integer(sender.getPort())));
1495 stateTransferCreateSendTime = msg.getTimestamp() ;
1496 stateTransferred = true ;
1497 }
1498
1499 /**
1500 * handle receive session delta
1501 * @param msg
1502 * @param sender
1503 * @throws IOException
1504 * @throws ClassNotFoundException
1505 */
1506 protected void handleSESSION_DELTA(SessionMessage msg, Member sender)
1507 throws IOException, ClassNotFoundException {
1508 counterReceive_EVT_SESSION_DELTA++;
1509 byte[] delta = msg.getSession();
1510 DeltaSession session = (DeltaSession) findSession(msg.getSessionID());
1511 if (session != null) {
1512 if (log.isDebugEnabled())
1513 log.debug(sm.getString("deltaManager.receiveMessage.delta",
1514 getName(), msg.getSessionID()));
1515 DeltaRequest dreq = loadDeltaRequest(session, delta);
1516 dreq.execute(session, notifyListenersOnReplication);
1517 session.setPrimarySession(false);
1518 }
1519 }
1520
1521 /**
1522 * handle receive session is access at other node ( primary session is now false)
1523 * @param msg
1524 * @param sender
1525 * @throws IOException
1526 */
1527 protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException {
1528 counterReceive_EVT_SESSION_ACCESSED++;
1529 DeltaSession session = (DeltaSession) findSession(msg
1530 .getSessionID());
1531 if (session != null) {
1532 if (log.isDebugEnabled())
1533 log.debug(sm.getString(
1534 "deltaManager.receiveMessage.accessed",
1535 getName(), msg.getSessionID()));
1536 session.access();
1537 session.setPrimarySession(false);
1538 session.endAccess();
1539 }
1540 }
1541
1542 /**
1543 * handle receive session is expire at other node ( expire session also here)
1544 * @param msg
1545 * @param sender
1546 * @throws IOException
1547 */
1548 protected void handleSESSION_EXPIRRED(SessionMessage msg,Member sender) throws IOException {
1549 counterReceive_EVT_SESSION_EXPIRED++;
1550 DeltaSession session = (DeltaSession) findSession(msg
1551 .getSessionID());
1552 if (session != null) {
1553 if (log.isDebugEnabled())
1554 log.debug(sm.getString(
1555 "deltaManager.receiveMessage.expired",
1556 getName(), msg.getSessionID()));
1557 session.expire(notifySessionListenersOnReplication, false);
1558 }
1559 }
1560
1561 /**
1562 * handle receive new session is created at other node (create backup - primary false)
1563 * @param msg
1564 * @param sender
1565 */
1566 protected void handleSESSION_CREATED(SessionMessage msg,Member sender) {
1567 counterReceive_EVT_SESSION_CREATED++;
1568 if (log.isDebugEnabled())
1569 log.debug(sm.getString(
1570 "deltaManager.receiveMessage.createNewSession",
1571 getName(), msg.getSessionID()));
1572 DeltaSession session = (DeltaSession) createEmptySession();
1573 session.setManager(this);
1574 session.setValid(true);
1575 session.setPrimarySession(false);
1576 session.access();
1577 if(notifySessionListenersOnReplication)
1578 session.setId(msg.getSessionID());
1579 else
1580 session.setIdInternal(msg.getSessionID());
1581 session.resetDeltaRequest();
1582 session.endAccess();
1583
1584 }
1585
1586 /**
1587 * handle receive sessions from other not ( restart )
1588 * @param msg
1589 * @param sender
1590 * @throws ClassNotFoundException
1591 * @throws IOException
1592 */
1593 protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException {
1594 counterReceive_EVT_ALL_SESSION_DATA++;
1595 if (log.isDebugEnabled())
1596 log.debug(sm.getString(
1597 "deltaManager.receiveMessage.allSessionDataBegin",
1598 getName()));
1599 byte[] data = msg.getSession();
1600 deserializeSessions(data);
1601 if (log.isDebugEnabled())
1602 log.debug(sm.getString(
1603 "deltaManager.receiveMessage.allSessionDataAfter",
1604 getName()));
1605 //stateTransferred = true;
1606 }
1607
1608 /**
1609 * handle receive that other node want all sessions ( restart )
1610 * a) send all sessions with one message
1611 * b) send session at blocks
1612 * After sending send state is complete transfered
1613 * @param msg
1614 * @param sender
1615 * @throws IOException
1616 */
1617 protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender)
1618 throws IOException {
1619 counterReceive_EVT_GET_ALL_SESSIONS++;
1620 //get a list of all the session from this manager
1621 if (log.isDebugEnabled())
1622 log.debug(sm.getString(
1623 "deltaManager.receiveMessage.unloadingBegin", getName()));
1624 // Write the number of active sessions, followed by the details
1625 // get all sessions and serialize without sync
1626 Session[] currentSessions = findSessions();
1627 long findSessionTimestamp = System.currentTimeMillis() ;
1628 if (isSendAllSessions()) {
1629 sendSessions(sender, currentSessions, findSessionTimestamp);
1630 } else {
1631 // send session at blocks
1632 int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length
1633 : getSendAllSessionsSize();
1634 Session[] sendSessions = new Session[len];
1635 for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
1636 len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length
1637 - i
1638 : getSendAllSessionsSize();
1639 System.arraycopy(currentSessions, i, sendSessions, 0, len);
1640 sendSessions(sender, sendSessions,findSessionTimestamp);
1641 if (getSendAllSessionsWaitTime() > 0) {
1642 try {
1643 Thread.sleep(getSendAllSessionsWaitTime());
1644 } catch (Exception sleep) {
1645 }
1646 }
1647 }
1648 }
1649
1650 SessionMessage newmsg = new SessionMessageImpl(name,
1651 SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,
1652 "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"
1653 + getName());
1654 newmsg.setTimestamp(findSessionTimestamp);
1655 if (log.isDebugEnabled())
1656 log.debug(sm.getString(
1657 "deltaManager.createMessage.allSessionTransfered",
1658 getName()));
1659 counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
1660 cluster.send(newmsg, sender);
1661 }
1662
1663
1664 /**
1665 * send a block of session to sender
1666 * @param sender
1667 * @param currentSessions
1668 * @param sendTimestamp
1669 * @throws IOException
1670 */
1671 protected void sendSessions(Member sender, Session[] currentSessions,
1672 long sendTimestamp) throws IOException {
1673 byte[] data = serializeSessions(currentSessions);
1674 if (log.isDebugEnabled())
1675 log.debug(sm.getString(
1676 "deltaManager.receiveMessage.unloadingAfter",
1677 getName()));
1678 SessionMessage newmsg = new SessionMessageImpl(name,
1679 SessionMessage.EVT_ALL_SESSION_DATA, data,
1680 "SESSION-STATE", "SESSION-STATE-" + getName());
1681 newmsg.setTimestamp(sendTimestamp);
1682 //if(isSendSESSIONSTATEcompressed()) {
1683 // newmsg.setCompress(ClusterMessage.FLAG_ALLOWED);
1684 //}
1685 if (log.isDebugEnabled())
1686 log.debug(sm.getString(
1687 "deltaManager.createMessage.allSessionData",
1688 getName()));
1689 counterSend_EVT_ALL_SESSION_DATA++;
1690 cluster.send(newmsg, sender);
1691 }
1692
1693 }