Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/pbcast/GMS.java


1   // $Id: GMS.java,v 1.45 2005/11/04 18:40:36 belaban Exp $
2   
3   package org.jgroups.protocols.pbcast;
4   
5   
6   import org.jgroups.*;
7   import org.jgroups.stack.Protocol;
8   import org.jgroups.util.*;
9   import org.jgroups.util.Queue;
10  import org.apache.commons.logging.Log;
11  
12  import java.io.*;
13  import java.util.*;
14  
15  
16  /**
17   * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
18   * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
19   * any messages until they are members.
20   */
21  public class GMS extends Protocol {
22      private GmsImpl           impl=null;
23      Address                   local_addr=null;
24      final Membership          members=new Membership();     // real membership
25      private final Membership  tmp_members=new Membership(); // base for computing next view
26  
27      /** Members joined but for which no view has been received yet */
28      private final Vector      joining=new Vector(7);
29  
30      /** Members excluded from group, but for which no view has been received yet */
31      private final Vector      leaving=new Vector(7);
32  
33      View                      view=null;
34      ViewId                    view_id=null;
35      private long              ltime=0;
36      long                      join_timeout=5000;
37      long                      join_retry_timeout=2000;
38      long                      leave_timeout=5000;
39      private long              digest_timeout=0;              // time to wait for a digest (from PBCAST). should be fast
40      long                      merge_timeout=10000;           // time to wait for all MERGE_RSPS
41      private final Object      impl_mutex=new Object();       // synchronizes event entry into impl
42      private final Object      digest_mutex=new Object();
43      private final Promise     digest_promise=new Promise();  // holds result of GET_DIGEST event
44      private final Hashtable   impls=new Hashtable(3);
45      private boolean           shun=true;
46      boolean                   merge_leader=false;         // can I initiate a merge ?
47      private boolean           print_local_addr=true;
48      boolean                   disable_initial_coord=false; // can the member become a coord on startup or not ?
49      /** Setting this to false disables concurrent startups. This is only used by unit testing code
50       * for testing merging. To everybody else: don't change it to false ! */
51      boolean                   handle_concurrent_startup=true;
52      static final String       CLIENT="Client";
53      static final String       COORD="Coordinator";
54      static final String       PART="Participant";
55      TimeScheduler             timer=null;
56  
57      /** Max number of old members to keep in history */
58      protected int             num_prev_mbrs=50;
59  
60      /** Keeps track of old members (up to num_prev_mbrs) */
61      BoundedList               prev_members=null;
62  
63      int num_views=0;
64  
65      /** Stores the last 20 views */
66      BoundedList               prev_views=new BoundedList(20);
67  
68  
69      /** Class to process JOIN, LEAVE and MERGE requests */
70      public final ViewHandler  view_handler=new ViewHandler();
71  
72      /** To collect VIEW_ACKs from all members */
73      final AckCollector ack_collector=new AckCollector();
74  
75      /** Time in ms to wait for all VIEW acks (0 == wait forever) */
76      long                      view_ack_collection_timeout=20000;
77  
78      static final String       name="GMS";
79  
80  
81  
82      public GMS() {
83          initState();
84      }
85  
86  
87      public String getName() {
88          return name;
89      }
90  
91  
92      public String getView() {return view_id != null? view_id.toString() : "null";}
93      public int getNumberOfViews() {return num_views;}
94      public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
95      public String getMembers() {return members != null? members.toString() : "[]";}
96      public int getNumMembers() {return members != null? members.size() : 0;}
97      public long getJoinTimeout() {return join_timeout;}
98      public void setJoinTimeout(long t) {join_timeout=t;}
99      public long getJoinRetryTimeout() {return join_retry_timeout;}
100     public void setJoinRetryTimeout(long t) {join_retry_timeout=t;}
101     public boolean isShun() {return shun;}
102     public void setShun(boolean s) {shun=s;}
103     public String printPreviousMembers() {
104         StringBuffer sb=new StringBuffer();
105         if(prev_members != null) {
106             for(Enumeration en=prev_members.elements(); en.hasMoreElements();) {
107                 sb.append(en.nextElement()).append("\n");
108             }
109         }
110         return sb.toString();
111     }
112 
113     Log getLog() {return log;}
114 
115     public String printPreviousViews() {
116         StringBuffer sb=new StringBuffer();
117         for(Enumeration en=prev_views.elements(); en.hasMoreElements();) {
118             sb.append(en.nextElement()).append("\n");
119         }
120         return sb.toString();
121     }
122 
123     public boolean isCoordinator() {
124         Address coord=determineCoordinator();
125         return coord != null && local_addr != null && local_addr.equals(coord);
126     }
127 
128 
129     public void resetStats() {
130         super.resetStats();
131         num_views=0;
132         prev_views.removeAll();
133     }
134 
135 
136     public Vector requiredDownServices() {
137         Vector retval=new Vector(3);
138         retval.addElement(new Integer(Event.GET_DIGEST));
139         retval.addElement(new Integer(Event.SET_DIGEST));
140         retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
141         return retval;
142     }
143 
144 
145     public void setImpl(GmsImpl new_impl) {
146         synchronized(impl_mutex) {
147             if(impl == new_impl) // superfluous
148                 return;
149             impl=new_impl;
150             if(log.isDebugEnabled()) {
151                 String msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName();
152                 log.debug(msg);
153             }
154         }
155     }
156 
157 
158     public GmsImpl getImpl() {
159         return impl;
160     }
161 
162 
163     public void init() throws Exception {
164         prev_members=new BoundedList(num_prev_mbrs);
165         timer=stack != null? stack.timer : null;
166         if(timer == null)
167             throw new Exception("GMS.init(): timer is null");
168         if(impl != null)
169             impl.init();
170     }
171 
172     public void start() throws Exception {
173         if(impl != null) impl.start();
174     }
175 
176     public void stop() {
177         view_handler.stop(true);
178         if(impl != null) impl.stop();
179         if(prev_members != null)
180             prev_members.removeAll();
181     }
182 
183 
184     public void becomeCoordinator() {
185         CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD);
186         if(tmp == null) {
187             tmp=new CoordGmsImpl(this);
188             impls.put(COORD, tmp);
189         }
190         try {
191             tmp.init();
192         }
193         catch(Exception e) {
194             log.error("exception switching to coordinator role", e);
195         }
196         setImpl(tmp);
197     }
198 
199 
200     public void becomeParticipant() {
201         ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART);
202 
203         if(tmp == null) {
204             tmp=new ParticipantGmsImpl(this);
205             impls.put(PART, tmp);
206         }
207         try {
208             tmp.init();
209         }
210         catch(Exception e) {
211             log.error("exception switching to participant", e);
212         }
213         setImpl(tmp);
214     }
215 
216     public void becomeClient() {
217         ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT);
218         if(tmp == null) {
219             tmp=new ClientGmsImpl(this);
220             impls.put(CLIENT, tmp);
221         }
222         try {
223             tmp.init();
224         }
225         catch(Exception e) {
226             log.error("exception switching to client role", e);
227         }
228         setImpl(tmp);
229     }
230 
231 
232     boolean haveCoordinatorRole() {
233         return impl != null && impl instanceof CoordGmsImpl;
234     }
235 
236 
237     /**
238      * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
239      * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
240      */
241     public View getNextView(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) {
242         Vector mbrs;
243         long vid;
244         View v;
245         Membership tmp_mbrs;
246         Address tmp_mbr;
247 
248         synchronized(members) {
249             if(view_id == null) {
250                 log.error("view_id is null");
251                 return null; // this should *never* happen !
252             }
253             vid=Math.max(view_id.getId(), ltime) + 1;
254             ltime=vid;
255             tmp_mbrs=tmp_members.copy();  // always operate on the temporary membership
256             tmp_mbrs.remove(suspected_mbrs);
257             tmp_mbrs.remove(old_mbrs);
258             tmp_mbrs.add(new_mbrs);
259             mbrs=tmp_mbrs.getMembers();
260             v=new View(local_addr, vid, mbrs);
261 
262             // Update membership (see DESIGN for explanation):
263             tmp_members.set(mbrs);
264 
265             // Update joining list (see DESIGN for explanation)
266             if(new_mbrs != null) {
267                 for(int i=0; i < new_mbrs.size(); i++) {
268                     tmp_mbr=(Address)new_mbrs.elementAt(i);
269                     if(!joining.contains(tmp_mbr))
270                         joining.addElement(tmp_mbr);
271                 }
272             }
273 
274             // Update leaving list (see DESIGN for explanations)
275             if(old_mbrs != null) {
276                 for(Iterator it=old_mbrs.iterator(); it.hasNext();) {
277                     Address addr=(Address)it.next();
278                     if(!leaving.contains(addr))
279                         leaving.add(addr);
280                 }
281             }
282             if(suspected_mbrs != null) {
283                 for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
284                     Address addr=(Address)it.next();
285                     if(!leaving.contains(addr))
286                         leaving.add(addr);
287                 }
288             }
289 
290             if(log.isDebugEnabled()) log.debug("new view is " + v);
291             return v;
292         }
293     }
294 
295 
296     /**
297      Compute a new view, given the current view, the new members and the suspected/left
298      members. Then simply mcast the view to all members. This is different to the VS GMS protocol,
299      in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in
300      the current view before proceeding to install the next view.
301 
302      The members for the new view are computed as follows:
303      <pre>
304      existing          leaving        suspected          joining
305 
306      1. new_view      y                 n               n                 y
307      2. tmp_view      y                 y               n                 y
308      (view_dest)
309      </pre>
310 
311      <ol>
312      <li>
313      The new view to be installed includes the existing members plus the joining ones and
314      excludes the leaving and suspected members.
315      <li>
316      A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
317      (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
318      to the new view, leaving members are <em>included</em> since they have are waiting for a
319      view in which they are not members any longer before they leave. So, if we did not set a
320      temporary view, joining members would not receive the view (signalling that they have been
321      joined successfully). The temporary view is essentially the current view plus the joining
322      members (old members are still part of the current view).
323      </ol>
324      @return View The new view
325      */
326     public void castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) {
327         View new_view;
328 
329         // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
330         new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs);
331         castViewChange(new_view, null);
332     }
333 
334 
335     public void castViewChange(View new_view, Digest digest) {
336         Message   view_change_msg;
337         GmsHeader hdr;
338         long      start, stop;
339         ViewId    vid=new_view.getVid();
340         int       size=-1;
341 
342         if(log.isTraceEnabled()) log.trace("mcasting view {" + new_view + "} (" + new_view.size() + " mbrs)\n");
343         start=System.currentTimeMillis();
344         view_change_msg=new Message(); // bcast to all members
345         hdr=new GmsHeader(GmsHeader.VIEW, new_view);
346         hdr.my_digest=digest;
347         view_change_msg.putHeader(name, hdr);
348 
349         ack_collector.reset(vid, new_view.getMembers());
350         size=ack_collector.size();
351         passDown(new Event(Event.MSG, view_change_msg));
352         try {
353             ack_collector.waitForAllAcks(view_ack_collection_timeout);
354             stop=System.currentTimeMillis();
355             if(trace)
356                 log.trace("received all ACKs (" + size + ") for " + vid + " in " + (stop-start) + "ms");
357         }
358         catch(TimeoutException e) {
359             log.warn("failed to collect all ACKs (" + size + ") for view " + vid + " after " + view_ack_collection_timeout +
360                     "ms, missing ACKs from " + ack_collector.getMissing() + " (received=" + ack_collector.getReceived() + ")");
361         }
362     }
363 
364 
365 
366     /**
367      * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass
368      * of View), then digest will be non-null and has to be set before installing the view.
369      */
370     public void installView(View new_view, Digest digest) {
371         if(digest != null)
372             mergeDigest(digest);
373         installView(new_view);
374     }
375 
376 
377     /**
378      * Sets the new view and sends a VIEW_CHANGE event up and down the stack.
379      */
380     public void installView(View new_view) {
381         Address coord;
382         int rc;
383         ViewId vid=new_view.getVid();
384         Vector mbrs=new_view.getMembers();
385 
386         if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view);
387         if(stats) {
388             num_views++;
389             prev_views.add(new_view);
390         }
391 
392         // Discards view with id lower than our own. Will be installed without check if first view
393         if(view_id != null) {
394             rc=vid.compareTo(view_id);
395             if(rc <= 0) {
396                 if(log.isTraceEnabled())
397                     log.trace("[" + local_addr + "] received view <= current view;" +
398                               " discarding it (current vid: " + view_id + ", new vid: " + vid + ')');
399                 return;
400             }
401         }
402 
403         ltime=Math.max(vid.getId(), ltime);  // compute Lamport logical time
404 
405         /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
406         This ensures that messages sent in view V1 are only received by members of V1 */
407         if(checkSelfInclusion(mbrs) == false) {
408             // only shun if this member was previously part of the group. avoids problem where multiple
409             // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view
410             // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership
411             // bela Nov 20 2003
412             if(shun && local_addr != null && prev_members.contains(local_addr)) {
413                 if(warn)
414                     log.warn("I (" + local_addr + ") am not a member of view " + new_view +
415                             ", shunning myself and leaving the group (prev_members are " + prev_members +
416                             ", current view is " + view + ")");
417                 if(impl != null)
418                     impl.handleExit();
419                 passUp(new Event(Event.EXIT));
420             }
421             else {
422                 if(warn) log.warn("I (" + local_addr + ") am not a member of view " + new_view + "; discarding view");
423             }
424             return;
425         }
426 
427         synchronized(members) {   // serialize access to views
428             // assign new_view to view_id
429 
430             view=new_view;
431             view_id=vid.copy();
432 
433             // Set the membership. Take into account joining members
434             if(mbrs != null && mbrs.size() > 0) {
435                 members.set(mbrs);
436                 tmp_members.set(members);
437                 joining.removeAll(mbrs);  // remove all members in mbrs from joining
438                 // remove all elements from 'leaving' that are not in 'mbrs'
439                 leaving.retainAll(mbrs);
440 
441                 tmp_members.add(joining);    // add members that haven't yet shown up in the membership
442                 tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership
443 
444                 // add to prev_members
445                 for(Iterator it=mbrs.iterator(); it.hasNext();) {
446                     Address addr=(Address)it.next();
447                     if(!prev_members.contains(addr))
448                         prev_members.add(addr);
449                 }
450             }
451 
452             // Send VIEW_CHANGE event up and down the stack:
453             Event view_event=new Event(Event.VIEW_CHANGE, new_view.clone());
454             passDown(view_event); // needed e.g. by failure detector or UDP
455             passUp(view_event);
456 
457             coord=determineCoordinator();
458             // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) {
459             // changed on suggestion by yaronr and Nicolas Piedeloupe
460             if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) {
461                 becomeCoordinator();
462             }
463             else {
464                 if(haveCoordinatorRole() && !local_addr.equals(coord))
465                     becomeParticipant();
466             }
467         }
468     }
469 
470 
471     protected Address determineCoordinator() {
472         synchronized(members) {
473             return members != null && members.size() > 0? (Address)members.elementAt(0) : null;
474         }
475     }
476 
477 
478     /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */
479     protected boolean wouldBeNewCoordinator(Address potential_new_coord) {
480         Address new_coord;
481 
482         if(potential_new_coord == null) return false;
483 
484         synchronized(members) {
485             if(members.size() < 2) return false;
486             new_coord=(Address)members.elementAt(1);  // member at 2nd place
487             return new_coord != null && new_coord.equals(potential_new_coord);
488         }
489     }
490 
491 
492     /** Returns true if local_addr is member of mbrs, else false */
493     protected boolean checkSelfInclusion(Vector mbrs) {
494         Object mbr;
495         if(mbrs == null)
496             return false;
497         for(int i=0; i < mbrs.size(); i++) {
498             mbr=mbrs.elementAt(i);
499             if(mbr != null && local_addr.equals(mbr))
500                 return true;
501         }
502         return false;
503     }
504 
505 
506     public View makeView(Vector mbrs) {
507         Address coord=null;
508         long id=0;
509 
510         if(view_id != null) {
511             coord=view_id.getCoordAddress();
512             id=view_id.getId();
513         }
514         return new View(coord, id, mbrs);
515     }
516 
517 
518     public View makeView(Vector mbrs, ViewId vid) {
519         Address coord=null;
520         long id=0;
521 
522         if(vid != null) {
523             coord=vid.getCoordAddress();
524             id=vid.getId();
525         }
526         return new View(coord, id, mbrs);
527     }
528 
529 
530     /** Send down a SET_DIGEST event */
531     public void setDigest(Digest d) {
532         passDown(new Event(Event.SET_DIGEST, d));
533     }
534 
535 
536     /** Send down a MERGE_DIGEST event */
537     public void mergeDigest(Digest d) {
538         passDown(new Event(Event.MERGE_DIGEST, d));
539     }
540 
541 
542     /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or
543      timeout, whichever occurs first */
544     public Digest getDigest() {
545         Digest ret=null;
546 
547         synchronized(digest_mutex) {
548             digest_promise.reset();
549             passDown(Event.GET_DIGEST_EVT);
550             try {
551                 ret=(Digest)digest_promise.getResultWithTimeout(digest_timeout);
552             }
553             catch(TimeoutException e) {
554                 if(log.isErrorEnabled()) log.error("digest could not be fetched from below");
555             }
556             return ret;
557         }
558     }
559 
560 
561     public void up(Event evt) {
562         Object obj;
563         Message msg;
564         GmsHeader hdr;
565         MergeData merge_data;
566 
567         switch(evt.getType()) {
568 
569             case Event.MSG:
570                 msg=(Message)evt.getArg();
571                 obj=msg.getHeader(name);
572                 if(obj == null || !(obj instanceof GmsHeader))
573                     break;
574                 hdr=(GmsHeader)msg.removeHeader(name);
575                 switch(hdr.type) {
576                     case GmsHeader.JOIN_REQ:
577                         view_handler.add(new Request(Request.JOIN, hdr.mbr, false, null));
578                         break;
579                     case GmsHeader.JOIN_RSP:
580                         impl.handleJoinResponse(hdr.join_rsp);
581                         break;
582                     case GmsHeader.LEAVE_REQ:
583                         if(log.isDebugEnabled())
584                             log.debug("received LEAVE_REQ for " + hdr.mbr + " from " + msg.getSrc());
585                         if(hdr.mbr == null) {
586                             if(log.isErrorEnabled()) log.error("LEAVE_REQ's mbr field is null");
587                             return;
588                         }
589                         view_handler.add(new Request(Request.LEAVE, hdr.mbr, false, null));
590                         break;
591                     case GmsHeader.LEAVE_RSP:
592                         impl.handleLeaveResponse();
593                         break;
594                     case GmsHeader.VIEW:
595                         if(hdr.view == null) {
596                             if(log.isErrorEnabled()) log.error("[VIEW]: view == null");
597                             return;
598                         }
599 
600                         // send VIEW_ACK to sender of view
601                         Address coord=msg.getSrc();
602                         Message view_ack=new Message(coord, null, null);
603                         GmsHeader tmphdr=new GmsHeader(GmsHeader.VIEW_ACK, hdr.view);
604                         view_ack.putHeader(name, tmphdr);
605                         passDown(new Event(Event.MSG, view_ack));
606                         impl.handleViewChange(hdr.view, hdr.my_digest);
607                         break;
608 
609                     case GmsHeader.VIEW_ACK:
610                         Object sender=msg.getSrc();
611                         ack_collector.ack(sender);
612                         return; // don't pass further up
613 
614                     case GmsHeader.MERGE_REQ:
615                         impl.handleMergeRequest(msg.getSrc(), hdr.merge_id);
616                         break;
617 
618                     case GmsHeader.MERGE_RSP:
619                         merge_data=new MergeData(msg.getSrc(), hdr.view, hdr.my_digest);
620                         merge_data.merge_rejected=hdr.merge_rejected;
621                         impl.handleMergeResponse(merge_data, hdr.merge_id);
622                         break;
623 
624                     case GmsHeader.INSTALL_MERGE_VIEW:
625                         impl.handleMergeView(new MergeData(msg.getSrc(), hdr.view, hdr.my_digest), hdr.merge_id);
626                         break;
627 
628                     case GmsHeader.CANCEL_MERGE:
629                         impl.handleMergeCancelled(hdr.merge_id);
630                         break;
631 
632                     default:
633                         if(log.isErrorEnabled()) log.error("GmsHeader with type=" + hdr.type + " not known");
634                 }
635                 return;  // don't pass up
636 
637             case Event.CONNECT_OK:     // sent by someone else, but WE are responsible for sending this !
638             case Event.DISCONNECT_OK:  // dito (e.g. sent by TP layer). Don't send up the stack
639                 return;
640 
641 
642             case Event.SET_LOCAL_ADDRESS:
643                 local_addr=(Address)evt.getArg();
644                 if(print_local_addr) {
645                     System.out.println("\n-------------------------------------------------------\n" +
646                                        "GMS: address is " + local_addr +
647                                        "\n-------------------------------------------------------");
648                 }
649                 break;                               // pass up
650 
651             case Event.SUSPECT:
652                 Address suspected=(Address)evt.getArg();
653                 view_handler.add(new Request(Request.LEAVE, suspected, true, null));
654                 ack_collector.suspect(suspected);
655                 break;                               // pass up
656 
657             case Event.UNSUSPECT:
658                 impl.unsuspect((Address)evt.getArg());
659                 return;                              // discard
660 
661             case Event.MERGE:
662                 view_handler.add(new Request(Request.MERGE, null, false, (Vector)evt.getArg()));
663                 return;                              // don't pass up
664         }
665 
666         if(impl.handleUpEvent(evt))
667             passUp(evt);
668     }
669 
670 
671     /**
672      This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs
673      to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer
674      responds with a GET_DIGEST_OK event.<p>
675      However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process
676      the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler
677      thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which
678      JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because
679      it won't be processed twice.
680      */
681     public void receiveUpEvent(Event evt) {
682         switch(evt.getType()) {
683             case Event.GET_DIGEST_OK:
684                 digest_promise.setResult(evt.getArg());
685                 return; // don't pass further up
686         }
687         super.receiveUpEvent(evt);
688     }
689 
690 
691     public void down(Event evt) {
692         switch(evt.getType()) {
693 
694             case Event.CONNECT:
695                 passDown(evt);
696                 if(local_addr == null)
697                     if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null");
698                 impl.join(local_addr);
699                 passUp(new Event(Event.CONNECT_OK));
700                 return;                              // don't pass down: was already passed down
701 
702             case Event.DISCONNECT:
703                 impl.leave((Address)evt.getArg());
704                 passUp(new Event(Event.DISCONNECT_OK));
705                 initState(); // in case connect() is called again
706                 break;       // pass down
707         }
708 
709         if(impl.handleDownEvent(evt))
710             passDown(evt);
711     }
712 
713 
714     /** Setup the Protocol instance according to the configuration string */
715     public boolean setProperties(Properties props) {
716         String str;
717 
718         super.setProperties(props);
719         str=props.getProperty("shun");
720         if(str != null) {
721             shun=Boolean.valueOf(str).booleanValue();
722             props.remove("shun");
723         }
724 
725         str=props.getProperty("merge_leader");
726         if(str != null) {
727             merge_leader=Boolean.valueOf(str).booleanValue();
728             props.remove("merge_leader");
729         }
730 
731         str=props.getProperty("print_local_addr");
732         if(str != null) {
733             print_local_addr=Boolean.valueOf(str).booleanValue();
734             props.remove("print_local_addr");
735         }
736 
737         str=props.getProperty("join_timeout");           // time to wait for JOIN
738         if(str != null) {
739             join_timeout=Long.parseLong(str);
740             props.remove("join_timeout");
741         }
742 
743         str=props.getProperty("join_retry_timeout");     // time to wait between JOINs
744         if(str != null) {
745             join_retry_timeout=Long.parseLong(str);
746             props.remove("join_retry_timeout");
747         }
748 
749         str=props.getProperty("leave_timeout");           // time to wait until coord responds to LEAVE req.
750         if(str != null) {
751             leave_timeout=Long.parseLong(str);
752             props.remove("leave_timeout");
753         }
754 
755         str=props.getProperty("merge_timeout");           // time to wait for MERGE_RSPS from subgroup coordinators
756         if(str != null) {
757             merge_timeout=Long.parseLong(str);
758             props.remove("merge_timeout");
759         }
760 
761         str=props.getProperty("digest_timeout");          // time to wait for GET_DIGEST_OK from PBCAST
762         if(str != null) {
763             digest_timeout=Long.parseLong(str);
764             props.remove("digest_timeout");
765         }
766 
767         str=props.getProperty("view_ack_collection_timeout");
768         if(str != null) {
769             view_ack_collection_timeout=Long.parseLong(str);
770             props.remove("view_ack_collection_timeout");
771         }
772 
773         str=props.getProperty("disable_initial_coord");
774         if(str != null) {
775             disable_initial_coord=Boolean.valueOf(str).booleanValue();
776             props.remove("disable_initial_coord");
777         }
778 
779         str=props.getProperty("handle_concurrent_startup");
780         if(str != null) {
781             handle_concurrent_startup=Boolean.valueOf(str).booleanValue();
782             props.remove("handle_concurrent_startup");
783         }
784 
785         str=props.getProperty("num_prev_mbrs");
786         if(str != null) {
787             num_prev_mbrs=Integer.parseInt(str);
788             props.remove("num_prev_mbrs");
789         }
790 
791         if(props.size() > 0) {
792             log.error("GMS.setProperties(): the following properties are not recognized: " + props);
793 
794             return false;
795         }
796         return true;
797     }
798 
799 
800 
801     /* ------------------------------- Private Methods --------------------------------- */
802 
803     void initState() {
804         becomeClient();
805         view_id=null;
806         view=null;
807     }
808 
809 
810     /* --------------------------- End of Private Methods ------------------------------- */
811 
812 
813 
814     public static class GmsHeader extends Header implements Streamable {
815         public static final byte JOIN_REQ=1;
816         public static final byte JOIN_RSP=2;
817         public static final byte LEAVE_REQ=3;
818         public static final byte LEAVE_RSP=4;
819         public static final byte VIEW=5;
820         public static final byte MERGE_REQ=6;
821         public static final byte MERGE_RSP=7;
822         public static final byte INSTALL_MERGE_VIEW=8;
823         public static final byte CANCEL_MERGE=9;
824         public static final byte VIEW_ACK=10;
825 
826         byte type=0;
827         View view=null;            // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW
828         Address mbr=null;             // used when type=JOIN_REQ or LEAVE_REQ
829         JoinRsp join_rsp=null;        // used when type=JOIN_RSP
830         Digest my_digest=null;          // used when type=MERGE_RSP or INSTALL_MERGE_VIEW
831         ViewId merge_id=null;        // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE
832         boolean merge_rejected=false; // used when type=MERGE_RSP
833 
834 
835         public GmsHeader() {
836         } // used for Externalization
837 
838         public GmsHeader(byte type) {
839             this.type=type;
840         }
841 
842 
843         /** Used for VIEW header */
844         public GmsHeader(byte type, View view) {
845             this.type=type;
846             this.view=view;
847         }
848 
849 
850         /** Used for JOIN_REQ or LEAVE_REQ header */
851         public GmsHeader(byte type, Address mbr) {
852             this.type=type;
853             this.mbr=mbr;
854         }
855 
856         /** Used for JOIN_RSP header */
857         public GmsHeader(byte type, JoinRsp join_rsp) {
858             this.type=type;
859             this.join_rsp=join_rsp;
860         }
861 
862         public byte getType() {
863             return type;
864         }
865 
866         public Address getMemeber() {
867             return mbr;
868         }
869 
870         public String toString() {
871             StringBuffer sb=new StringBuffer("GmsHeader");
872             sb.append('[' + type2String(type) + ']');
873             switch(type) {
874                 case JOIN_REQ:
875                     sb.append(": mbr=" + mbr);
876                     break;
877 
878                 case JOIN_RSP:
879                     sb.append(": join_rsp=" + join_rsp);
880                     break;
881 
882                 case LEAVE_REQ:
883                     sb.append(": mbr=" + mbr);
884                     break;
885 
886                 case LEAVE_RSP:
887                     break;
888 
889                 case VIEW:
890                 case VIEW_ACK:
891                     sb.append(": view=" + view);
892                     break;
893 
894                 case MERGE_REQ:
895                     sb.append(": merge_id=" + merge_id);
896                     break;
897 
898                 case MERGE_RSP:
899                     sb.append(": view=" + view + ", digest=" + my_digest + ", merge_rejected=" + merge_rejected +
900                             ", merge_id=" + merge_id);
901                     break;
902 
903                 case INSTALL_MERGE_VIEW:
904                     sb.append(": view=" + view + ", digest=" + my_digest);
905                     break;
906 
907                 case CANCEL_MERGE:
908                     sb.append(", <merge cancelled>, merge_id=" + merge_id);
909                     break;
910             }
911             return sb.toString();
912         }
913 
914 
915         public static String type2String(int type) {
916             switch(type) {
917                 case JOIN_REQ: return "JOIN_REQ";
918                 case JOIN_RSP: return "JOIN_RSP";
919                 case LEAVE_REQ: return "LEAVE_REQ";
920                 case LEAVE_RSP: return "LEAVE_RSP";
921                 case VIEW: return "VIEW";
922                 case MERGE_REQ: return "MERGE_REQ";
923                 case MERGE_RSP: return "MERGE_RSP";
924                 case INSTALL_MERGE_VIEW: return "INSTALL_MERGE_VIEW";
925                 case CANCEL_MERGE: return "CANCEL_MERGE";
926                 case VIEW_ACK: return "VIEW_ACK";
927                 default: return "<unknown>";
928             }
929         }
930 
931 
932         public void writeExternal(ObjectOutput out) throws IOException {
933             out.writeByte(type);
934             out.writeObject(view);
935             out.writeObject(mbr);
936             out.writeObject(join_rsp);
937             out.writeObject(my_digest);
938             out.writeObject(merge_id);
939             out.writeBoolean(merge_rejected);
940         }
941 
942 
943         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
944             type=in.readByte();
945             view=(View)in.readObject();
946             mbr=(Address)in.readObject();
947             join_rsp=(JoinRsp)in.readObject();
948             my_digest=(Digest)in.readObject();
949             merge_id=(ViewId)in.readObject();
950             merge_rejected=in.readBoolean();
951         }
952 
953 
954         public void writeTo(DataOutputStream out) throws IOException {
955             out.writeByte(type);
956             Util.writeStreamable(view, out);
957             Util.writeAddress(mbr, out);
958             Util.writeStreamable(join_rsp, out);
959             Util.writeStreamable(my_digest, out);
960             Util.writeStreamable(merge_id, out); // kludge: we know merge_id is a ViewId
961             out.writeBoolean(merge_rejected);
962         }
963 
964         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
965             type=in.readByte();
966             view=(View)Util.readStreamable(View.class, in);
967             mbr=Util.readAddress(in);
968             join_rsp=(JoinRsp)Util.readStreamable(JoinRsp.class, in);
969             my_digest=(Digest)Util.readStreamable(Digest.class, in);
970             merge_id=(ViewId)Util.readStreamable(ViewId.class, in);
971             merge_rejected=in.readBoolean();
972         }
973 
974         public long size() {
975             long retval=Global.BYTE_SIZE *2; // type + merge_rejected
976 
977             retval+=Global.BYTE_SIZE; // presence view
978             if(view != null)
979                 retval+=view.serializedSize();
980 
981             retval+=Util.size(mbr);
982 
983             retval+=Global.BYTE_SIZE; // presence of join_rsp
984             if(join_rsp != null)
985                 retval+=join_rsp.serializedSize();
986 
987             retval+=Global.BYTE_SIZE; // presence for my_digest
988             if(my_digest != null)
989                 retval+=my_digest.serializedSize();
990 
991             retval+=Global.BYTE_SIZE; // presence for merge_id
992             if(merge_id != null)
993                 retval+=merge_id.serializedSize();
994             return retval;
995         }
996 
997     }
998 
999 
1000
1001
1002    public static class Request {
1003        static final int JOIN    = 1;
1004        static final int LEAVE   = 2;
1005        static final int SUSPECT = 3;
1006        static final int MERGE   = 4;
1007
1008
1009        int     type=-1;
1010        Address mbr=null;
1011        boolean suspected;
1012        Vector  coordinators=null;
1013
1014        Request(int type, Address mbr, boolean suspected, Vector coordinators) {
1015            this.type=type;
1016            this.mbr=mbr;
1017            this.suspected=suspected;
1018            this.coordinators=coordinators;
1019        }
1020
1021        public String toString() {
1022            switch(type) {
1023                case JOIN:    return "JOIN(" + mbr + ")";
1024                case LEAVE:   return "LEAVE(" + mbr + ", " + suspected + ")";
1025                case SUSPECT: return "SUSPECT(" + mbr + ")";
1026                case MERGE:   return "MERGE(" + coordinators + ")";
1027            }
1028            return "<invalid (type=" + type + ")";
1029        }
1030    }
1031
1032
1033
1034
1035    /**
1036     * Class which processes JOIN, LEAVE and MERGE requests. Requests are queued and processed in FIFO order
1037     * @author Bela Ban
1038     * @version $Id: GMS.java,v 1.45 2005/11/04 18:40:36 belaban Exp $
1039     */
1040    class ViewHandler implements Runnable {
1041        Thread                t;
1042        Queue                 q=new Queue(); // Queue<Request>
1043        boolean               suspended=false;
1044        final static long     INTERVAL=5000;
1045
1046
1047        void add(Request req) {
1048            if(suspended) {
1049                log.warn("queue is suspended; request is discarded");
1050                return;
1051            }
1052            start();
1053            try {
1054                q.add(req);
1055            }
1056            catch(QueueClosedException e) {
1057                if(trace)
1058                    log.trace("queue is closed; request " + req + " is discarded");
1059            }
1060        }
1061
1062        synchronized void waitUntilCompleted(long timeout) {
1063            if(t != null) {
1064                try {
1065                    t.join(timeout);
1066                }
1067                catch(InterruptedException e) {
1068                }
1069            }
1070
1071        }
1072
1073        public void suspend() {
1074            suspended=true;
1075            q.close(true);
1076        }
1077
1078        public void resume() {
1079            if(q.closed())
1080                q.reset();
1081            suspended=false;
1082        }
1083
1084        public void run() {
1085            Request req;
1086            while(!q.closed() && Thread.currentThread().equals(t)) {
1087                try {
1088                    req=(Request)q.remove(INTERVAL);
1089                    process(req);
1090                }
1091                catch(QueueClosedException e) {
1092                    break;
1093                }
1094                catch(TimeoutException e) {
1095                    break;
1096                }
1097            }
1098        }
1099
1100
1101
1102        private void process(Request req) {
1103            if(trace)
1104                log.trace("processing " + req);
1105            switch(req.type) {
1106                case Request.JOIN:
1107                    impl.handleJoin(req.mbr);
1108                    break;
1109                case Request.LEAVE:
1110                    if(req.suspected)
1111                        impl.suspect(req.mbr);
1112                    else
1113                        impl.handleLeave(req.mbr, req.suspected);
1114                    break;
1115                case Request.SUSPECT:
1116                    impl.suspect(req.mbr);
1117                    break;
1118                case Request.MERGE:
1119                    impl.merge(req.coordinators);
1120                    break;
1121                default:
1122                    log.error("Request " + req.type + " is unknown; discarded");
1123            }
1124        }
1125
1126
1127        private void handleMergeRequest(Vector coordinators) {
1128
1129        }
1130
1131
1132        synchronized void start() {
1133            if(q.closed())
1134                q.reset();
1135            suspended=false;
1136            if(t == null || !t.isAlive()) {
1137                t=new Thread(this, "ViewHandler");
1138                t.setDaemon(true);
1139                t.start();
1140                if(trace)
1141                    log.trace("ViewHandler started");
1142            }
1143        }
1144
1145        synchronized void stop(boolean flush) {
1146            q.close(flush);
1147        }
1148
1149
1150    }
1151
1152
1153
1154}