Source code: org/jgroups/protocols/pbcast/STABLE.java
1 // $Id: STABLE.java,v 1.39 2005/08/11 12:43:46 belaban Exp $
2
3 package org.jgroups.protocols.pbcast;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.Streamable;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.Util;
11
12 import java.io.*;
13 import java.util.*;
14
15
16
17
18 /**
19 * Computes the broadcast messages that are stable; i.e., have been received by all members. Sends
20 * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
21 * have been seen by all members.<p>
22 * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
23 * A stability vector, which maintains the highest seqno for each member and initially contains no data,
24 * is updated when such a message is received. The entry for a member P is computed set to
25 * min(entry[P], digest[P]). When messages from all members have been received, a stability
26 * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
27 * in the NAKACK layer).<p>
28 * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
29 * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
30 * STABLE messages in the face of no activity.<br/>
31 * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
32 * a STABLE task will be started (unless it is already running).
33 * @author Bela Ban
34 */
35 public class STABLE extends Protocol {
36 Address local_addr=null;
37 final Vector mbrs=new Vector();
38 final Digest digest=new Digest(10); // keeps track of the highest seqnos from all members
39 final Digest latest_local_digest=new Digest(10); // keeps track of the latest digests received from NAKACK
40 final Vector heard_from=new Vector(); // keeps track of who we already heard from (STABLE_GOSSIP msgs)
41
42 /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
43 long desired_avg_gossip=20000;
44
45 /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
46 * small number (> 0 !) if <code>max_bytes</code> is used */
47 long stability_delay=6000;
48 StabilitySendTask stability_task=null;
49 final Object stability_mutex=new Object(); // to synchronize on stability_task
50 StableTask stable_task=null; // bcasts periodic STABLE message (added to timer below)
51 final Object stable_task_mutex=new Object(); // to sync on stable_task
52 TimeScheduler timer=null; // to send periodic STABLE msgs (and STABILITY messages)
53 static final String name="STABLE";
54
55 /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
56 * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
57 * <code>stability_delay</code> should be set to a low number as well */
58 long max_bytes=0;
59
60 /** The total number of bytes received from unicast and multicast messages */
61 long num_bytes_received=0;
62
63 /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
64 * handle STABILITY messages */
65 boolean suspended=false;
66
67 boolean initialized=false;
68
69 ResumeTask resume_task=null;
70 final Object resume_task_mutex=new Object();
71
72 /** Number of gossip messages */
73 int num_gossips=0;
74
75
76
77 public String getName() {
78 return name;
79 }
80
81 public long getDesiredAverageGossip() {
82 return desired_avg_gossip;
83 }
84
85 public void setDesiredAverageGossip(long gossip_interval) {
86 desired_avg_gossip=gossip_interval;
87 }
88
89 public long getMaxBytes() {
90 return max_bytes;
91 }
92
93 public void setMaxBytes(long max_bytes) {
94 this.max_bytes=max_bytes;
95 }
96
97 public int getNumberOfGossipMessages() {return num_gossips;}
98
99 public void resetStats() {
100 super.resetStats();
101 num_gossips=0;
102 }
103
104
105 public Vector requiredDownServices() {
106 Vector retval=new Vector();
107 retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); // NAKACK layer
108 return retval;
109 }
110
111 public boolean setProperties(Properties props) {
112 String str;
113
114 super.setProperties(props);
115 str=props.getProperty("digest_timeout");
116 if(str != null) {
117 props.remove("digest_timeout");
118 log.error("digest_timeout has been deprecated; it will be ignored");
119 }
120
121 str=props.getProperty("desired_avg_gossip");
122 if(str != null) {
123 desired_avg_gossip=Long.parseLong(str);
124 props.remove("desired_avg_gossip");
125 }
126
127 str=props.getProperty("stability_delay");
128 if(str != null) {
129 stability_delay=Long.parseLong(str);
130 props.remove("stability_delay");
131 }
132
133 str=props.getProperty("max_gossip_runs");
134 if(str != null) {
135 props.remove("max_gossip_runs");
136 log.error("max_gossip_runs has been deprecated and will be ignored");
137 }
138
139 str=props.getProperty("max_bytes");
140 if(str != null) {
141 max_bytes=Long.parseLong(str);
142 props.remove("max_bytes");
143 }
144
145 str=props.getProperty("max_suspend_time");
146 if(str != null) {
147 log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
148 props.remove("max_suspend_time");
149 }
150
151 if(props.size() > 0) {
152 log.error("these properties are not recognized: " + props);
153
154 return false;
155 }
156 return true;
157 }
158
159
160 private void suspend(long timeout) {
161 if(!suspended) {
162 suspended=true;
163 if(log.isDebugEnabled())
164 log.debug("suspending message garbage collection");
165 }
166 startResumeTask(timeout); // will not start task if already running
167 }
168
169 private void resume() {
170 suspended=false;
171 if(log.isDebugEnabled())
172 log.debug("resuming message garbage collection");
173 stopResumeTask();
174 }
175
176 public void start() throws Exception {
177 if(stack != null && stack.timer != null)
178 timer=stack.timer;
179 else
180 throw new Exception("timer cannot be retrieved from protocol stack");
181 if(desired_avg_gossip > 0)
182 startStableTask();
183 }
184
185 public void stop() {
186 stopStableTask();
187 clearDigest();
188 }
189
190
191 public void up(Event evt) {
192 Message msg;
193 StableHeader hdr;
194 int type=evt.getType();
195
196 switch(type) {
197
198 case Event.MSG:
199 msg=(Message)evt.getArg();
200 if(max_bytes > 0) { // message counting is enabled
201 long size=Math.max(msg.getLength(), 24);
202 num_bytes_received+=size;
203 if(num_bytes_received >= max_bytes) {
204 if(trace) {
205 log.trace(new StringBuffer("max_bytes has been reached (").append(max_bytes).
206 append(", bytes received=").append(num_bytes_received).append("): triggers stable msg"));
207 }
208 num_bytes_received=0;
209 // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
210 passDown(new Event(Event.GET_DIGEST_STABLE));
211 }
212 }
213
214 hdr=(StableHeader)msg.removeHeader(name);
215 if(hdr == null)
216 break;
217 switch(hdr.type) {
218 case StableHeader.STABLE_GOSSIP:
219 handleStableMessage(msg.getSrc(), hdr.stableDigest);
220 break;
221 case StableHeader.STABILITY:
222 handleStabilityMessage(hdr.stableDigest, msg.getSrc());
223 break;
224 default:
225 if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");
226 }
227 return; // don't pass STABLE or STABILITY messages up the stack
228
229 case Event.GET_DIGEST_STABLE_OK:
230 Digest d=(Digest)evt.getArg();
231 synchronized(latest_local_digest) {
232 latest_local_digest.replace(d);
233 }
234 if(trace)
235 log.trace("setting latest_local_digest from NAKACK: " + d.printHighSeqnos());
236 sendStableMessage(d);
237 break;
238
239 case Event.VIEW_CHANGE:
240 View view=(View)evt.getArg();
241 handleViewChange(view);
242 break;
243
244 case Event.SET_LOCAL_ADDRESS:
245 local_addr=(Address)evt.getArg();
246 break;
247 }
248 passUp(evt);
249 }
250
251
252
253
254 public void down(Event evt) {
255 switch(evt.getType()) {
256 case Event.VIEW_CHANGE:
257 View v=(View)evt.getArg();
258 handleViewChange(v);
259 break;
260
261 case Event.SUSPEND_STABLE:
262 long timeout=0;
263 Object t=evt.getArg();
264 if(t != null && t instanceof Long)
265 timeout=((Long)t).longValue();
266 suspend(timeout);
267 break;
268
269 case Event.RESUME_STABLE:
270 resume();
271 break;
272 }
273 passDown(evt);
274 }
275
276
277 public void runMessageGarbageCollection() {
278 Digest copy;
279 synchronized(digest) {
280 copy=digest.copy();
281 }
282 sendStableMessage(copy);
283 }
284
285
286
287 /* --------------------------------------- Private Methods ---------------------------------------- */
288
289
290 private void handleViewChange(View v) {
291 Vector tmp=v.getMembers();
292 mbrs.clear();
293 mbrs.addAll(tmp);
294 adjustSenders(digest, tmp);
295 adjustSenders(latest_local_digest, tmp);
296 resetDigest(tmp);
297 if(!initialized)
298 initialized=true;
299 }
300
301
302 /** Digest and members are guaranteed to be non-null */
303 private void adjustSenders(Digest d, Vector members) {
304 synchronized(d) {
305 // 1. remove all members from digest who are not in the view
306 Iterator it=d.senders.keySet().iterator();
307 Address mbr;
308 while(it.hasNext()) {
309 mbr=(Address)it.next();
310 if(!members.contains(mbr))
311 it.remove();
312 }
313 // 2. add members to digest which are in the new view but not in the digest
314 for(int i=0; i < members.size(); i++) {
315 mbr=(Address)members.get(i);
316 if(!d.contains(mbr))
317 d.add(mbr, -1, -1);
318 }
319 }
320 }
321
322
323 private void clearDigest() {
324 synchronized(digest) {
325 digest.clear();
326 }
327 }
328
329
330
331 /** Update my own digest from a digest received by somebody else. Returns whether the update was successful.
332 * Needs to be called with a lock on digest */
333 private boolean updateLocalDigest(Digest d, Address sender) {
334 if(d == null || d.size() == 0)
335 return false;
336
337 if(!initialized) {
338 if(trace)
339 log.trace("STABLE message will not be handled as I'm not yet initialized");
340 return false;
341 }
342
343 if(!digest.sameSenders(d)) {
344 if(trace)
345 log.trace(new StringBuffer("received a digest ").append(d.printHighSeqnos()).append(" from ").
346 append(sender).append(" which has different members than mine (").
347 append(digest.printHighSeqnos()).append("), discarding it and resetting heard_from list"));
348 // to avoid sending incorrect stability/stable msgs, we simply reset our heard_from list, see DESIGN
349 resetDigest(mbrs);
350 return false;
351 }
352
353 StringBuffer sb=null;
354 if(trace)
355 sb=new StringBuffer("my [").append(local_addr).append("] digest before: ").append(digest).
356 append("\ndigest from ").append(sender).append(": ").append(d);
357 Address mbr;
358 long highest_seqno, my_highest_seqno, new_highest_seqno;
359 long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno;
360 Map.Entry entry;
361 org.jgroups.protocols.pbcast.Digest.Entry val;
362 for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
363 entry=(Map.Entry)it.next();
364 mbr=(Address)entry.getKey();
365 val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
366 highest_seqno=val.high_seqno;
367 highest_seen_seqno=val.high_seqno_seen;
368
369 // compute the minimum of the highest seqnos deliverable (for garbage collection)
370 my_highest_seqno=digest.highSeqnoAt(mbr);
371 // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
372 my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
373
374 new_highest_seqno=Math.min(my_highest_seqno, highest_seqno);
375 new_highest_seen_seqno=Math.max(my_highest_seen_seqno, highest_seen_seqno);
376 digest.setHighestDeliveredAndSeenSeqnos(mbr, new_highest_seqno, new_highest_seen_seqno);
377 }
378 if(trace) {
379 sb.append("\nmy [").append(local_addr).append("] digest after: ").append(digest).append("\n");
380 log.trace(sb);
381 }
382 return true;
383 }
384
385
386
387 private void resetDigest(Vector new_members) {
388 if(new_members == null || new_members.size() == 0)
389 return;
390 synchronized(heard_from) {
391 heard_from.clear();
392 heard_from.addAll(new_members);
393 }
394
395 Digest copy_of_latest;
396 synchronized(latest_local_digest) {
397 copy_of_latest=latest_local_digest.copy();
398 }
399 synchronized(digest) {
400 digest.replace(copy_of_latest);
401 if(trace)
402 log.trace("resetting digest from NAKACK: " + copy_of_latest.printHighSeqnos());
403 }
404 }
405
406 /**
407 * Removes mbr from heard_from and returns true if this was the last member, otherwise false.
408 * Resets the heard_from list (populates with membership)
409 * @param mbr
410 * @return
411 */
412 private boolean removeFromHeardFromList(Address mbr) {
413 synchronized(heard_from) {
414 heard_from.remove(mbr);
415 if(heard_from.size() == 0) {
416 resetDigest(this.mbrs);
417 return true;
418 }
419 }
420 return false;
421 }
422
423
424 void startStableTask() {
425 // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
426 // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
427 // 1 cycle: on the next message or view, we will start the task
428 if(stable_task != null)
429 return;
430 synchronized(stable_task_mutex) {
431 if(stable_task != null && stable_task.running()) {
432 return; // already running
433 }
434 stable_task=new StableTask();
435 timer.add(stable_task, true); // fixed-rate scheduling
436 }
437 if(trace)
438 log.trace("stable task started");
439 }
440
441
442 void stopStableTask() {
443 // contrary to startStableTask(), we don't need double-checked locking here because this method is not
444 // called frequently
445 synchronized(stable_task_mutex) {
446 if(stable_task != null) {
447 stable_task.stop();
448 stable_task=null;
449 }
450 }
451 }
452
453
454 void startResumeTask(long max_suspend_time) {
455 max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
456
457 synchronized(resume_task_mutex) {
458 if(resume_task != null && resume_task.running()) {
459 return; // already running
460 }
461 else {
462 resume_task=new ResumeTask(max_suspend_time);
463 timer.add(resume_task, true); // fixed-rate scheduling
464 }
465 }
466 if(log.isDebugEnabled())
467 log.debug("resume task started, max_suspend_time=" + max_suspend_time);
468 }
469
470
471 void stopResumeTask() {
472 synchronized(resume_task_mutex) {
473 if(resume_task != null) {
474 resume_task.stop();
475 resume_task=null;
476 }
477 }
478 }
479
480
481 void startStabilityTask(Digest d, long delay) {
482 synchronized(stability_mutex) {
483 if(stability_task != null && stability_task.running()) {
484 }
485 else {
486 stability_task=new StabilitySendTask(d, delay); // runs only once
487 timer.add(stability_task, true);
488 }
489 }
490 }
491
492
493 void stopStabilityTask() {
494 synchronized(stability_mutex) {
495 if(stability_task != null) {
496 stability_task.stop();
497 stability_task=null;
498 }
499 }
500 }
501
502
503 /**
504 Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
505 <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
506 seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
507 message, which results in garbage collection of messages lower than the ones in the stability vector. The
508 maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
509 for details).
510 */
511 private void handleStableMessage(Address sender, Digest d) {
512 if(d == null || sender == null) {
513 if(log.isErrorEnabled()) log.error("digest or sender is null");
514 return;
515 }
516
517 if(!initialized) {
518 if(trace)
519 log.trace("STABLE message will not be handled as I'm not yet initialized");
520 return;
521 }
522
523 if(suspended) {
524 if(trace)
525 log.trace("STABLE message will not be handled as I'm suspended");
526 return;
527 }
528
529 if(trace)
530 log.trace(new StringBuffer("received stable msg from ").append(sender).append(": ").append(d.printHighSeqnos()));
531 if(!heard_from.contains(sender)) { // already received gossip from sender; discard it
532 if(trace) log.trace("already received stable msg from " + sender);
533 return;
534 }
535
536 Digest copy;
537 synchronized(digest) {
538 boolean success=updateLocalDigest(d, sender);
539 if(!success) // we can only remove the sender from heard_from if *all* elements of my digest were updated
540 return;
541 copy=digest.copy();
542 }
543
544 boolean was_last=removeFromHeardFromList(sender);
545 if(was_last) {
546 sendStabilityMessage(copy);
547 }
548 }
549
550
551 /**
552 * Bcasts a STABLE message of the current digest to all members. Message contains highest seqnos of all members
553 * seen by this member. Highest seqnos are retrieved from the NAKACK layer below.
554 * @param Digest A <em>copy</em> of this.digest
555 */
556 private void sendStableMessage(Digest d) {
557 if(suspended) {
558 if(trace)
559 log.trace("will not send STABLE message as I'm suspended");
560 return;
561 }
562
563 if(d != null && d.size() > 0) {
564 if(trace)
565 log.trace("sending stable msg " + d.printHighSeqnos());
566 Message msg=new Message(); // mcast message
567 StableHeader hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
568 msg.putHeader(name, hdr);
569 num_gossips++;
570 passDown(new Event(Event.MSG, msg));
571 }
572 }
573
574
575
576 /**
577 Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
578 The reason for waiting a random amount of time is that, in the worst case, all members receive a
579 STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
580 STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
581 elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
582 waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
583 discard S2.
584 @param tmp A copy of te stability digest, so we don't need to copy it again
585 */
586 void sendStabilityMessage(Digest tmp) {
587 long delay;
588
589 // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
590 // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
591 // STABILITY msg at the same time
592 delay=Util.random(stability_delay);
593 startStabilityTask(tmp, delay);
594 }
595
596
597 void handleStabilityMessage(Digest d, Address sender) {
598 if(d == null) {
599 if(log.isErrorEnabled()) log.error("stability digest is null");
600 return;
601 }
602
603 if(!initialized) {
604 if(trace)
605 log.trace("STABLE message will not be handled as I'm not yet initialized");
606 return;
607 }
608
609 if(suspended) {
610 if(log.isDebugEnabled()) {
611 log.debug("stability message will not be handled as I'm suspended");
612 }
613 return;
614 }
615
616 if(trace)
617 log.trace(new StringBuffer("received stability msg from ").append(sender).append(": ").append(d.printHighSeqnos()));
618 stopStabilityTask();
619
620 // we won't handle the gossip d, if d's members don't match the membership in my own digest,
621 // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
622 if(!this.digest.sameSenders(d)) {
623 if(log.isDebugEnabled()) {
624 log.debug("received digest (digest=" + d + ") which does not match my own digest ("+
625 this.digest + "): ignoring digest and re-initializing own digest");
626 }
627 return;
628 }
629
630 resetDigest(mbrs);
631
632 // pass STABLE event down the stack, so NAKACK can garbage collect old messages
633 passDown(new Event(Event.STABLE, d));
634 }
635
636
637
638 /* ------------------------------------End of Private Methods ------------------------------------- */
639
640
641
642
643
644
645
646 public static class StableHeader extends Header implements Streamable {
647 public static final int STABLE_GOSSIP=1;
648 public static final int STABILITY=2;
649
650 int type=0;
651 // Digest digest=new Digest(); // used for both STABLE_GOSSIP and STABILITY message
652 Digest stableDigest=null; // changed by Bela April 4 2004
653
654 public StableHeader() {
655 } // used for externalizable
656
657
658 public StableHeader(int type, Digest digest) {
659 this.type=type;
660 this.stableDigest=digest;
661 }
662
663
664 static String type2String(int t) {
665 switch(t) {
666 case STABLE_GOSSIP:
667 return "STABLE_GOSSIP";
668 case STABILITY:
669 return "STABILITY";
670 default:
671 return "<unknown>";
672 }
673 }
674
675 public String toString() {
676 StringBuffer sb=new StringBuffer();
677 sb.append('[');
678 sb.append(type2String(type));
679 sb.append("]: digest is ");
680 sb.append(stableDigest);
681 return sb.toString();
682 }
683
684
685 public void writeExternal(ObjectOutput out) throws IOException {
686 out.writeInt(type);
687 if(stableDigest == null) {
688 out.writeBoolean(false);
689 return;
690 }
691 out.writeBoolean(true);
692 stableDigest.writeExternal(out);
693 }
694
695
696 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
697 type=in.readInt();
698 boolean digest_not_null=in.readBoolean();
699 if(digest_not_null) {
700 stableDigest=new Digest();
701 stableDigest.readExternal(in);
702 }
703 }
704
705 public long size() {
706 long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
707 if(stableDigest != null)
708 retval+=stableDigest.serializedSize();
709 return retval;
710 }
711
712 public void writeTo(DataOutputStream out) throws IOException {
713 out.writeInt(type);
714 Util.writeStreamable(stableDigest, out);
715 }
716
717 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
718 type=in.readInt();
719 stableDigest=(Digest)Util.readStreamable(Digest.class, in);
720 }
721
722
723 }
724
725
726
727
728 /**
729 Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
730 However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
731 stable_send task terminates only after a period of time within which no messages were either sent
732 or received
733 */
734 private class StableTask implements TimeScheduler.Task {
735 boolean stopped=false;
736
737 public void stop() {
738 stopped=true;
739 }
740
741 public boolean running() { // syntactic sugar
742 return !stopped;
743 }
744
745 public boolean cancelled() {
746 return stopped;
747 }
748
749 public long nextInterval() {
750 long interval=computeSleepTime();
751 if(interval <= 0)
752 return 10000;
753 else
754 return interval;
755 }
756
757
758 public void run() {
759 if(suspended) {
760 if(trace)
761 log.trace("stable task will not run as suspended=" + suspended);
762 return;
763 }
764
765 // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
766 passDown(new Event(Event.GET_DIGEST_STABLE));
767 }
768
769 long computeSleepTime() {
770 return getRandom((mbrs.size() * desired_avg_gossip * 2));
771 }
772
773 long getRandom(long range) {
774 return (long)((Math.random() * range) % range);
775 }
776 }
777
778
779
780
781
782 /**
783 * Multicasts a STABILITY message.
784 */
785 private class StabilitySendTask implements TimeScheduler.Task {
786 Digest d=null;
787 boolean stopped=false;
788 long delay=2000;
789
790
791 StabilitySendTask(Digest d, long delay) {
792 this.d=d;
793 this.delay=delay;
794 }
795
796 public boolean running() {
797 return !stopped;
798 }
799
800 public void stop() {
801 stopped=true;
802 }
803
804 public boolean cancelled() {
805 return stopped;
806 }
807
808
809 /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
810 public long nextInterval() {
811 return delay;
812 }
813
814
815 public void run() {
816 Message msg;
817 StableHeader hdr;
818
819 if(suspended) {
820 if(log.isDebugEnabled()) {
821 log.debug("STABILITY message will not be sent as suspended=" + suspended);
822 }
823 stopped=true;
824 return;
825 }
826
827 if(d != null && !stopped) {
828 msg=new Message();
829 hdr=new StableHeader(StableHeader.STABILITY, d);
830 msg.putHeader(STABLE.name, hdr);
831 if(trace) log.trace("sending stability msg " + d.printHighSeqnos());
832 passDown(new Event(Event.MSG, msg));
833 d=null;
834 }
835 stopped=true; // run only once
836 }
837 }
838
839
840 private class ResumeTask implements TimeScheduler.Task {
841 boolean running=true;
842 long max_suspend_time=0;
843
844 ResumeTask(long max_suspend_time) {
845 this.max_suspend_time=max_suspend_time;
846 }
847
848 void stop() {
849 running=false;
850 }
851
852 public boolean running() {
853 return running;
854 }
855
856 public boolean cancelled() {
857 return running == false;
858 }
859
860 public long nextInterval() {
861 return max_suspend_time;
862 }
863
864 public void run() {
865 if(suspended)
866 log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
867 "check why this event was not received (or increase max_suspend_time for large state transfers)");
868 resume();
869 }
870 }
871
872
873 }