Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/pbcast/STABLE.java


1   // $Id: STABLE.java,v 1.39 2005/08/11 12:43:46 belaban Exp $
2   
3   package org.jgroups.protocols.pbcast;
4   
5   
6   import org.jgroups.*;
7   import org.jgroups.stack.Protocol;
8   import org.jgroups.util.Streamable;
9   import org.jgroups.util.TimeScheduler;
10  import org.jgroups.util.Util;
11  
12  import java.io.*;
13  import java.util.*;
14  
15  
16  
17  
18  /**
19   * Computes the broadcast messages that are stable; i.e., have been received by all members. Sends
20   * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
21   * have been seen by all members.<p>
22   * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
23   * A stability vector, which maintains the highest seqno for each member and initially contains no data,
24   * is updated when such a message is received. The entry for a member P is computed set to
25   * min(entry[P], digest[P]). When messages from all members have been received, a stability
26   * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
27   * in the NAKACK layer).<p>
28   * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
29   * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
30   * STABLE messages in the face of no activity.<br/>
31   * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
32   * a STABLE task will be started (unless it is already running).
33   * @author Bela Ban
34   */
35  public class STABLE extends Protocol {
36      Address             local_addr=null;
37      final Vector        mbrs=new Vector();
38      final Digest        digest=new Digest(10);        // keeps track of the highest seqnos from all members
39      final Digest        latest_local_digest=new Digest(10); // keeps track of the latest digests received from NAKACK
40      final Vector        heard_from=new Vector();      // keeps track of who we already heard from (STABLE_GOSSIP msgs)
41  
42      /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
43      long                desired_avg_gossip=20000;
44  
45      /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
46       * small number (> 0 !) if <code>max_bytes</code> is used */
47      long                stability_delay=6000;
48      StabilitySendTask   stability_task=null;
49      final Object        stability_mutex=new Object(); // to synchronize on stability_task
50      StableTask          stable_task=null;             // bcasts periodic STABLE message (added to timer below)
51      final Object        stable_task_mutex=new Object(); // to sync on stable_task
52      TimeScheduler       timer=null;                   // to send periodic STABLE msgs (and STABILITY messages)
53      static final String name="STABLE";
54  
55      /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
56       * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
57       * <code>stability_delay</code> should be set to a low number as well */
58      long                max_bytes=0;
59  
60      /** The total number of bytes received from unicast and multicast messages */
61      long                num_bytes_received=0;
62  
63      /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
64       * handle STABILITY messages */
65      boolean             suspended=false;
66  
67      boolean             initialized=false;
68  
69      ResumeTask          resume_task=null;
70      final Object        resume_task_mutex=new Object();
71  
72      /** Number of gossip messages */
73      int                 num_gossips=0;
74  
75  
76  
77      public String getName() {
78          return name;
79      }
80  
81      public long getDesiredAverageGossip() {
82          return desired_avg_gossip;
83      }
84  
85      public void setDesiredAverageGossip(long gossip_interval) {
86          desired_avg_gossip=gossip_interval;
87      }
88  
89      public long getMaxBytes() {
90          return max_bytes;
91      }
92  
93      public void setMaxBytes(long max_bytes) {
94          this.max_bytes=max_bytes;
95      }
96  
97      public int getNumberOfGossipMessages() {return num_gossips;}
98  
99      public void resetStats() {
100         super.resetStats();
101         num_gossips=0;
102     }
103 
104 
105     public Vector requiredDownServices() {
106         Vector retval=new Vector();
107         retval.addElement(new Integer(Event.GET_DIGEST_STABLE));  // NAKACK layer
108         return retval;
109     }
110 
111     public boolean setProperties(Properties props) {
112         String str;
113 
114         super.setProperties(props);
115         str=props.getProperty("digest_timeout");
116         if(str != null) {
117             props.remove("digest_timeout");
118             log.error("digest_timeout has been deprecated; it will be ignored");
119         }
120 
121         str=props.getProperty("desired_avg_gossip");
122         if(str != null) {
123             desired_avg_gossip=Long.parseLong(str);
124             props.remove("desired_avg_gossip");
125         }
126 
127         str=props.getProperty("stability_delay");
128         if(str != null) {
129             stability_delay=Long.parseLong(str);
130             props.remove("stability_delay");
131         }
132 
133         str=props.getProperty("max_gossip_runs");
134         if(str != null) {
135             props.remove("max_gossip_runs");
136             log.error("max_gossip_runs has been deprecated and will be ignored");
137         }
138 
139         str=props.getProperty("max_bytes");
140         if(str != null) {
141             max_bytes=Long.parseLong(str);
142             props.remove("max_bytes");
143         }
144 
145         str=props.getProperty("max_suspend_time");
146         if(str != null) {
147             log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
148             props.remove("max_suspend_time");
149         }
150 
151         if(props.size() > 0) {
152             log.error("these properties are not recognized: " + props);
153             
154             return false;
155         }
156         return true;
157     }
158 
159 
160     private void suspend(long timeout) {
161         if(!suspended) {
162             suspended=true;
163             if(log.isDebugEnabled())
164                 log.debug("suspending message garbage collection");
165         }
166         startResumeTask(timeout); // will not start task if already running
167     }
168 
169     private void resume() {
170         suspended=false;
171         if(log.isDebugEnabled())
172             log.debug("resuming message garbage collection");
173         stopResumeTask();
174     }
175 
176     public void start() throws Exception {
177         if(stack != null && stack.timer != null)
178             timer=stack.timer;
179         else
180             throw new Exception("timer cannot be retrieved from protocol stack");
181         if(desired_avg_gossip > 0)
182             startStableTask();
183     }
184 
185     public void stop() {
186         stopStableTask();
187         clearDigest();
188     }
189 
190 
191     public void up(Event evt) {
192         Message msg;
193         StableHeader hdr;
194         int type=evt.getType();
195 
196         switch(type) {
197 
198         case Event.MSG:
199             msg=(Message)evt.getArg();
200             if(max_bytes > 0) {  // message counting is enabled
201                 long size=Math.max(msg.getLength(), 24);
202                 num_bytes_received+=size;
203                 if(num_bytes_received >= max_bytes) {
204                     if(trace) {
205                         log.trace(new StringBuffer("max_bytes has been reached (").append(max_bytes).
206                                   append(", bytes received=").append(num_bytes_received).append("): triggers stable msg"));
207                     }
208                     num_bytes_received=0;
209                     // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
210                     passDown(new Event(Event.GET_DIGEST_STABLE));
211                 }
212             }
213 
214             hdr=(StableHeader)msg.removeHeader(name);
215             if(hdr == null)
216                 break;
217             switch(hdr.type) {
218             case StableHeader.STABLE_GOSSIP:
219                 handleStableMessage(msg.getSrc(), hdr.stableDigest);
220                 break;
221             case StableHeader.STABILITY:
222                 handleStabilityMessage(hdr.stableDigest, msg.getSrc());
223                 break;
224             default:
225                 if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");
226             }
227             return;  // don't pass STABLE or STABILITY messages up the stack
228 
229         case Event.GET_DIGEST_STABLE_OK:
230             Digest d=(Digest)evt.getArg();
231             synchronized(latest_local_digest) {
232                 latest_local_digest.replace(d);
233             }
234             if(trace)
235                 log.trace("setting latest_local_digest from NAKACK: " + d.printHighSeqnos());
236             sendStableMessage(d);
237             break;
238 
239         case Event.VIEW_CHANGE:
240             View view=(View)evt.getArg();
241             handleViewChange(view);
242             break;
243 
244         case Event.SET_LOCAL_ADDRESS:
245             local_addr=(Address)evt.getArg();
246             break;
247         }
248         passUp(evt);
249     }
250 
251 
252 
253 
254     public void down(Event evt) {
255         switch(evt.getType()) {
256         case Event.VIEW_CHANGE:
257             View v=(View)evt.getArg();
258             handleViewChange(v);
259             break;
260 
261         case Event.SUSPEND_STABLE:
262             long timeout=0;
263             Object t=evt.getArg();
264             if(t != null && t instanceof Long)
265                 timeout=((Long)t).longValue();
266             suspend(timeout);
267             break;
268 
269         case Event.RESUME_STABLE:
270             resume();
271             break;
272         }
273         passDown(evt);
274     }
275 
276 
277     public void runMessageGarbageCollection() {
278         Digest copy;
279         synchronized(digest) {
280             copy=digest.copy();
281         }
282         sendStableMessage(copy);
283     }
284 
285 
286 
287     /* --------------------------------------- Private Methods ---------------------------------------- */
288 
289 
290     private void handleViewChange(View v) {
291         Vector tmp=v.getMembers();
292         mbrs.clear();
293         mbrs.addAll(tmp);
294         adjustSenders(digest, tmp);
295         adjustSenders(latest_local_digest, tmp);
296         resetDigest(tmp);
297         if(!initialized)
298             initialized=true;
299     }
300 
301 
302     /** Digest and members are guaranteed to be non-null */
303     private void adjustSenders(Digest d, Vector members) {
304         synchronized(d) {
305             // 1. remove all members from digest who are not in the view
306             Iterator it=d.senders.keySet().iterator();
307             Address mbr;
308             while(it.hasNext()) {
309                 mbr=(Address)it.next();
310                 if(!members.contains(mbr))
311                     it.remove();
312             }
313             // 2. add members to digest which are in the new view but not in the digest
314             for(int i=0; i < members.size(); i++) {
315                 mbr=(Address)members.get(i);
316                 if(!d.contains(mbr))
317                     d.add(mbr, -1, -1);
318             }
319         }
320     }
321 
322 
323     private void clearDigest() {
324         synchronized(digest) {
325             digest.clear();
326         }
327     }
328 
329 
330 
331     /** Update my own digest from a digest received by somebody else. Returns whether the update was successful.
332      *  Needs to be called with a lock on digest */
333     private boolean updateLocalDigest(Digest d, Address sender) {
334         if(d == null || d.size() == 0)
335             return false;
336 
337         if(!initialized) {
338             if(trace)
339                 log.trace("STABLE message will not be handled as I'm not yet initialized");
340             return false;
341         }
342 
343         if(!digest.sameSenders(d)) {
344             if(trace)
345                 log.trace(new StringBuffer("received a digest ").append(d.printHighSeqnos()).append(" from ").
346                           append(sender).append(" which has different members than mine (").
347                           append(digest.printHighSeqnos()).append("), discarding it and resetting heard_from list"));
348             // to avoid sending incorrect stability/stable msgs, we simply reset our heard_from list, see DESIGN
349             resetDigest(mbrs);
350             return false;
351         }
352 
353         StringBuffer sb=null;
354         if(trace)
355             sb=new StringBuffer("my [").append(local_addr).append("] digest before: ").append(digest).
356                     append("\ndigest from ").append(sender).append(": ").append(d);
357         Address mbr;
358         long highest_seqno, my_highest_seqno, new_highest_seqno;
359         long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno;
360         Map.Entry entry;
361         org.jgroups.protocols.pbcast.Digest.Entry val;
362         for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
363             entry=(Map.Entry)it.next();
364             mbr=(Address)entry.getKey();
365             val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
366             highest_seqno=val.high_seqno;
367             highest_seen_seqno=val.high_seqno_seen;
368 
369             // compute the minimum of the highest seqnos deliverable (for garbage collection)
370             my_highest_seqno=digest.highSeqnoAt(mbr);
371             // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
372             my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
373 
374             new_highest_seqno=Math.min(my_highest_seqno, highest_seqno);
375             new_highest_seen_seqno=Math.max(my_highest_seen_seqno, highest_seen_seqno);
376             digest.setHighestDeliveredAndSeenSeqnos(mbr, new_highest_seqno, new_highest_seen_seqno);
377         }
378         if(trace) {
379             sb.append("\nmy [").append(local_addr).append("] digest after: ").append(digest).append("\n");
380             log.trace(sb);
381         }
382         return true;
383     }
384 
385 
386 
387     private void resetDigest(Vector new_members) {
388         if(new_members == null || new_members.size() == 0)
389             return;
390         synchronized(heard_from) {
391             heard_from.clear();
392             heard_from.addAll(new_members);
393         }
394 
395         Digest copy_of_latest;
396         synchronized(latest_local_digest) {
397             copy_of_latest=latest_local_digest.copy();
398         }
399         synchronized(digest) {
400             digest.replace(copy_of_latest);
401             if(trace)
402                 log.trace("resetting digest from NAKACK: " + copy_of_latest.printHighSeqnos());
403         }
404     }
405 
406     /**
407      * Removes mbr from heard_from and returns true if this was the last member, otherwise false.
408      * Resets the heard_from list (populates with membership)
409      * @param mbr
410      * @return
411      */
412     private boolean removeFromHeardFromList(Address mbr) {
413         synchronized(heard_from) {
414             heard_from.remove(mbr);
415             if(heard_from.size() == 0) {
416                 resetDigest(this.mbrs);
417                 return true;
418             }
419         }
420         return false;
421     }
422 
423 
424     void startStableTask() {
425         // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
426         // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
427         // 1 cycle: on the next message or view, we will start the task
428         if(stable_task != null)
429             return;
430         synchronized(stable_task_mutex) {
431             if(stable_task != null && stable_task.running()) {
432                 return;  // already running
433             }
434             stable_task=new StableTask();
435             timer.add(stable_task, true); // fixed-rate scheduling
436         }
437         if(trace)
438             log.trace("stable task started");
439     }
440 
441 
442     void stopStableTask() {
443         // contrary to startStableTask(), we don't need double-checked locking here because this method is not
444         // called frequently
445         synchronized(stable_task_mutex) {
446             if(stable_task != null) {
447                 stable_task.stop();
448                 stable_task=null;
449             }
450         }
451     }
452 
453 
454     void startResumeTask(long max_suspend_time) {
455         max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
456 
457         synchronized(resume_task_mutex) {
458             if(resume_task != null && resume_task.running()) {
459                 return;  // already running
460             }
461             else {
462                 resume_task=new ResumeTask(max_suspend_time);
463                 timer.add(resume_task, true); // fixed-rate scheduling
464             }
465         }
466         if(log.isDebugEnabled())
467             log.debug("resume task started, max_suspend_time=" + max_suspend_time);
468     }
469 
470 
471     void stopResumeTask() {
472         synchronized(resume_task_mutex) {
473             if(resume_task != null) {
474                 resume_task.stop();
475                 resume_task=null;
476             }
477         }
478     }
479 
480 
481     void startStabilityTask(Digest d, long delay) {
482         synchronized(stability_mutex) {
483             if(stability_task != null && stability_task.running()) {
484             }
485             else {
486                 stability_task=new StabilitySendTask(d, delay); // runs only once
487                 timer.add(stability_task, true);
488             }
489         }
490     }
491 
492 
493     void stopStabilityTask() {
494         synchronized(stability_mutex) {
495             if(stability_task != null) {
496                 stability_task.stop();
497                 stability_task=null;
498             }
499         }
500     }
501 
502 
503     /**
504      Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
505      <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
506      seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
507      message, which results in garbage collection of messages lower than the ones in the stability vector. The
508      maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
509      for details).
510      */
511     private void handleStableMessage(Address sender, Digest d) {
512         if(d == null || sender == null) {
513             if(log.isErrorEnabled()) log.error("digest or sender is null");
514             return;
515         }
516 
517         if(!initialized) {
518             if(trace)
519                 log.trace("STABLE message will not be handled as I'm not yet initialized");
520             return;
521         }
522 
523         if(suspended) {
524             if(trace)
525                 log.trace("STABLE message will not be handled as I'm suspended");
526             return;
527         }
528 
529         if(trace)
530             log.trace(new StringBuffer("received stable msg from ").append(sender).append(": ").append(d.printHighSeqnos()));
531         if(!heard_from.contains(sender)) {  // already received gossip from sender; discard it
532             if(trace) log.trace("already received stable msg from " + sender);
533             return;
534         }
535 
536         Digest copy;
537         synchronized(digest) {
538             boolean success=updateLocalDigest(d, sender);
539             if(!success) // we can only remove the sender from heard_from if *all* elements of my digest were updated
540                 return;
541             copy=digest.copy();
542         }
543 
544         boolean was_last=removeFromHeardFromList(sender);
545         if(was_last) {
546             sendStabilityMessage(copy);
547         }
548     }
549 
550 
551     /**
552      * Bcasts a STABLE message of the current digest to all members. Message contains highest seqnos of all members
553      * seen by this member. Highest seqnos are retrieved from the NAKACK layer below.
554      * @param Digest A <em>copy</em> of this.digest
555      */
556     private void sendStableMessage(Digest d) {
557         if(suspended) {
558             if(trace)
559                 log.trace("will not send STABLE message as I'm suspended");
560             return;
561         }
562 
563         if(d != null && d.size() > 0) {
564             if(trace)
565                 log.trace("sending stable msg " + d.printHighSeqnos());
566             Message msg=new Message(); // mcast message
567             StableHeader hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
568             msg.putHeader(name, hdr);
569             num_gossips++;
570             passDown(new Event(Event.MSG, msg));
571         }
572     }
573 
574 
575 
576     /**
577      Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
578      The reason for waiting a random amount of time is that, in the worst case, all members receive a
579      STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
580      STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
581      elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
582      waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
583      discard S2.
584      @param tmp A copy of te stability digest, so we don't need to copy it again
585      */
586     void sendStabilityMessage(Digest tmp) {
587         long delay;
588 
589         // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
590         // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
591         // STABILITY msg at the same time
592         delay=Util.random(stability_delay);
593         startStabilityTask(tmp, delay);
594     }
595 
596 
597     void handleStabilityMessage(Digest d, Address sender) {
598         if(d == null) {
599             if(log.isErrorEnabled()) log.error("stability digest is null");
600             return;
601         }
602 
603         if(!initialized) {
604             if(trace)
605                 log.trace("STABLE message will not be handled as I'm not yet initialized");
606             return;
607         }
608 
609         if(suspended) {
610             if(log.isDebugEnabled()) {
611                 log.debug("stability message will not be handled as I'm suspended");
612             }
613             return;
614         }
615 
616         if(trace)
617             log.trace(new StringBuffer("received stability msg from ").append(sender).append(": ").append(d.printHighSeqnos()));
618         stopStabilityTask();
619 
620         // we won't handle the gossip d, if d's members don't match the membership in my own digest,
621         // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
622         if(!this.digest.sameSenders(d)) {
623             if(log.isDebugEnabled()) {
624                 log.debug("received digest (digest=" + d + ") which does not match my own digest ("+
625                         this.digest + "): ignoring digest and re-initializing own digest");
626             }
627             return;
628         }
629 
630         resetDigest(mbrs);
631 
632         // pass STABLE event down the stack, so NAKACK can garbage collect old messages
633         passDown(new Event(Event.STABLE, d));
634     }
635 
636 
637 
638     /* ------------------------------------End of Private Methods ------------------------------------- */
639 
640 
641 
642 
643 
644 
645 
646     public static class StableHeader extends Header implements Streamable {
647         public static final int STABLE_GOSSIP=1;
648         public static final int STABILITY=2;
649 
650         int type=0;
651         // Digest digest=new Digest();  // used for both STABLE_GOSSIP and STABILITY message
652         Digest stableDigest=null; // changed by Bela April 4 2004
653 
654         public StableHeader() {
655         } // used for externalizable
656 
657 
658         public StableHeader(int type, Digest digest) {
659             this.type=type;
660             this.stableDigest=digest;
661         }
662 
663 
664         static String type2String(int t) {
665             switch(t) {
666                 case STABLE_GOSSIP:
667                     return "STABLE_GOSSIP";
668                 case STABILITY:
669                     return "STABILITY";
670                 default:
671                     return "<unknown>";
672             }
673         }
674 
675         public String toString() {
676             StringBuffer sb=new StringBuffer();
677             sb.append('[');
678             sb.append(type2String(type));
679             sb.append("]: digest is ");
680             sb.append(stableDigest);
681             return sb.toString();
682         }
683 
684 
685         public void writeExternal(ObjectOutput out) throws IOException {
686             out.writeInt(type);
687             if(stableDigest == null) {
688                 out.writeBoolean(false);
689                 return;
690             }
691             out.writeBoolean(true);
692             stableDigest.writeExternal(out);
693         }
694 
695 
696         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
697             type=in.readInt();
698             boolean digest_not_null=in.readBoolean();
699             if(digest_not_null) {
700                 stableDigest=new Digest();
701                 stableDigest.readExternal(in);
702             }
703         }
704 
705         public long size() {
706             long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
707             if(stableDigest != null)
708                 retval+=stableDigest.serializedSize();
709             return retval;
710         }
711 
712         public void writeTo(DataOutputStream out) throws IOException {
713             out.writeInt(type);
714             Util.writeStreamable(stableDigest, out);
715         }
716 
717         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
718             type=in.readInt();
719             stableDigest=(Digest)Util.readStreamable(Digest.class, in);
720         }
721 
722 
723     }
724 
725 
726 
727 
728     /**
729      Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
730      However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
731      stable_send task terminates only after a period of time within which no messages were either sent
732      or received
733      */
734     private class StableTask implements TimeScheduler.Task {
735         boolean stopped=false;
736 
737         public void stop() {
738             stopped=true;
739         }
740 
741         public boolean running() { // syntactic sugar
742             return !stopped;
743         }
744 
745         public boolean cancelled() {
746             return stopped;
747         }
748 
749         public long nextInterval() {
750             long interval=computeSleepTime();
751             if(interval <= 0)
752                 return 10000;
753             else
754                 return interval;
755         }
756 
757 
758         public void run() {
759             if(suspended) {
760                 if(trace)
761                     log.trace("stable task will not run as suspended=" + suspended);
762                 return;
763             }
764 
765             // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
766             passDown(new Event(Event.GET_DIGEST_STABLE));
767         }
768 
769         long computeSleepTime() {
770             return getRandom((mbrs.size() * desired_avg_gossip * 2));
771         }
772 
773         long getRandom(long range) {
774             return (long)((Math.random() * range) % range);
775         }
776     }
777 
778 
779 
780 
781 
782     /**
783      * Multicasts a STABILITY message.
784      */
785     private class StabilitySendTask implements TimeScheduler.Task {
786         Digest   d=null;
787         boolean  stopped=false;
788         long     delay=2000;
789 
790 
791         StabilitySendTask(Digest d, long delay) {
792             this.d=d;
793             this.delay=delay;
794         }
795 
796         public boolean running() {
797             return !stopped;
798         }
799 
800         public void stop() {
801             stopped=true;
802         }
803 
804         public boolean cancelled() {
805             return stopped;
806         }
807 
808 
809         /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
810         public long nextInterval() {
811             return delay;
812         }
813 
814 
815         public void run() {
816             Message msg;
817             StableHeader hdr;
818 
819             if(suspended) {
820                 if(log.isDebugEnabled()) {
821                     log.debug("STABILITY message will not be sent as suspended=" + suspended);
822                 }
823                 stopped=true;
824                 return;
825             }
826 
827             if(d != null && !stopped) {
828                 msg=new Message();
829                 hdr=new StableHeader(StableHeader.STABILITY, d);
830                 msg.putHeader(STABLE.name, hdr);
831                 if(trace) log.trace("sending stability msg " + d.printHighSeqnos());
832                 passDown(new Event(Event.MSG, msg));
833                 d=null;
834             }
835             stopped=true; // run only once
836         }
837     }
838 
839 
840     private class ResumeTask implements TimeScheduler.Task {
841         boolean running=true;
842         long max_suspend_time=0;
843 
844         ResumeTask(long max_suspend_time) {
845             this.max_suspend_time=max_suspend_time;
846         }
847 
848         void stop() {
849             running=false;
850         }
851 
852         public boolean running() {
853             return running;
854         }
855 
856         public boolean cancelled() {
857             return running == false;
858         }
859 
860         public long nextInterval() {
861             return max_suspend_time;
862         }
863 
864         public void run() {
865             if(suspended)
866                 log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
867                          "check why this event was not received (or increase max_suspend_time for large state transfers)");
868             resume();
869         }
870     }
871 
872 
873 }