Source code: org/jgroups/protocols/pbcast/GMS.java
1 // $Id: GMS.java,v 1.45 2005/11/04 18:40:36 belaban Exp $
2
3 package org.jgroups.protocols.pbcast;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.*;
9 import org.jgroups.util.Queue;
10 import org.apache.commons.logging.Log;
11
12 import java.io.*;
13 import java.util.*;
14
15
16 /**
17 * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
18 * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
19 * any messages until they are members.
20 */
21 public class GMS extends Protocol {
22 private GmsImpl impl=null;
23 Address local_addr=null;
24 final Membership members=new Membership(); // real membership
25 private final Membership tmp_members=new Membership(); // base for computing next view
26
27 /** Members joined but for which no view has been received yet */
28 private final Vector joining=new Vector(7);
29
30 /** Members excluded from group, but for which no view has been received yet */
31 private final Vector leaving=new Vector(7);
32
33 View view=null;
34 ViewId view_id=null;
35 private long ltime=0;
36 long join_timeout=5000;
37 long join_retry_timeout=2000;
38 long leave_timeout=5000;
39 private long digest_timeout=0; // time to wait for a digest (from PBCAST). should be fast
40 long merge_timeout=10000; // time to wait for all MERGE_RSPS
41 private final Object impl_mutex=new Object(); // synchronizes event entry into impl
42 private final Object digest_mutex=new Object();
43 private final Promise digest_promise=new Promise(); // holds result of GET_DIGEST event
44 private final Hashtable impls=new Hashtable(3);
45 private boolean shun=true;
46 boolean merge_leader=false; // can I initiate a merge ?
47 private boolean print_local_addr=true;
48 boolean disable_initial_coord=false; // can the member become a coord on startup or not ?
49 /** Setting this to false disables concurrent startups. This is only used by unit testing code
50 * for testing merging. To everybody else: don't change it to false ! */
51 boolean handle_concurrent_startup=true;
52 static final String CLIENT="Client";
53 static final String COORD="Coordinator";
54 static final String PART="Participant";
55 TimeScheduler timer=null;
56
57 /** Max number of old members to keep in history */
58 protected int num_prev_mbrs=50;
59
60 /** Keeps track of old members (up to num_prev_mbrs) */
61 BoundedList prev_members=null;
62
63 int num_views=0;
64
65 /** Stores the last 20 views */
66 BoundedList prev_views=new BoundedList(20);
67
68
69 /** Class to process JOIN, LEAVE and MERGE requests */
70 public final ViewHandler view_handler=new ViewHandler();
71
72 /** To collect VIEW_ACKs from all members */
73 final AckCollector ack_collector=new AckCollector();
74
75 /** Time in ms to wait for all VIEW acks (0 == wait forever) */
76 long view_ack_collection_timeout=20000;
77
78 static final String name="GMS";
79
80
81
82 public GMS() {
83 initState();
84 }
85
86
87 public String getName() {
88 return name;
89 }
90
91
92 public String getView() {return view_id != null? view_id.toString() : "null";}
93 public int getNumberOfViews() {return num_views;}
94 public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
95 public String getMembers() {return members != null? members.toString() : "[]";}
96 public int getNumMembers() {return members != null? members.size() : 0;}
97 public long getJoinTimeout() {return join_timeout;}
98 public void setJoinTimeout(long t) {join_timeout=t;}
99 public long getJoinRetryTimeout() {return join_retry_timeout;}
100 public void setJoinRetryTimeout(long t) {join_retry_timeout=t;}
101 public boolean isShun() {return shun;}
102 public void setShun(boolean s) {shun=s;}
103 public String printPreviousMembers() {
104 StringBuffer sb=new StringBuffer();
105 if(prev_members != null) {
106 for(Enumeration en=prev_members.elements(); en.hasMoreElements();) {
107 sb.append(en.nextElement()).append("\n");
108 }
109 }
110 return sb.toString();
111 }
112
113 Log getLog() {return log;}
114
115 public String printPreviousViews() {
116 StringBuffer sb=new StringBuffer();
117 for(Enumeration en=prev_views.elements(); en.hasMoreElements();) {
118 sb.append(en.nextElement()).append("\n");
119 }
120 return sb.toString();
121 }
122
123 public boolean isCoordinator() {
124 Address coord=determineCoordinator();
125 return coord != null && local_addr != null && local_addr.equals(coord);
126 }
127
128
129 public void resetStats() {
130 super.resetStats();
131 num_views=0;
132 prev_views.removeAll();
133 }
134
135
136 public Vector requiredDownServices() {
137 Vector retval=new Vector(3);
138 retval.addElement(new Integer(Event.GET_DIGEST));
139 retval.addElement(new Integer(Event.SET_DIGEST));
140 retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
141 return retval;
142 }
143
144
145 public void setImpl(GmsImpl new_impl) {
146 synchronized(impl_mutex) {
147 if(impl == new_impl) // superfluous
148 return;
149 impl=new_impl;
150 if(log.isDebugEnabled()) {
151 String msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName();
152 log.debug(msg);
153 }
154 }
155 }
156
157
158 public GmsImpl getImpl() {
159 return impl;
160 }
161
162
163 public void init() throws Exception {
164 prev_members=new BoundedList(num_prev_mbrs);
165 timer=stack != null? stack.timer : null;
166 if(timer == null)
167 throw new Exception("GMS.init(): timer is null");
168 if(impl != null)
169 impl.init();
170 }
171
172 public void start() throws Exception {
173 if(impl != null) impl.start();
174 }
175
176 public void stop() {
177 view_handler.stop(true);
178 if(impl != null) impl.stop();
179 if(prev_members != null)
180 prev_members.removeAll();
181 }
182
183
184 public void becomeCoordinator() {
185 CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD);
186 if(tmp == null) {
187 tmp=new CoordGmsImpl(this);
188 impls.put(COORD, tmp);
189 }
190 try {
191 tmp.init();
192 }
193 catch(Exception e) {
194 log.error("exception switching to coordinator role", e);
195 }
196 setImpl(tmp);
197 }
198
199
200 public void becomeParticipant() {
201 ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART);
202
203 if(tmp == null) {
204 tmp=new ParticipantGmsImpl(this);
205 impls.put(PART, tmp);
206 }
207 try {
208 tmp.init();
209 }
210 catch(Exception e) {
211 log.error("exception switching to participant", e);
212 }
213 setImpl(tmp);
214 }
215
216 public void becomeClient() {
217 ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT);
218 if(tmp == null) {
219 tmp=new ClientGmsImpl(this);
220 impls.put(CLIENT, tmp);
221 }
222 try {
223 tmp.init();
224 }
225 catch(Exception e) {
226 log.error("exception switching to client role", e);
227 }
228 setImpl(tmp);
229 }
230
231
232 boolean haveCoordinatorRole() {
233 return impl != null && impl instanceof CoordGmsImpl;
234 }
235
236
237 /**
238 * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
239 * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
240 */
241 public View getNextView(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) {
242 Vector mbrs;
243 long vid;
244 View v;
245 Membership tmp_mbrs;
246 Address tmp_mbr;
247
248 synchronized(members) {
249 if(view_id == null) {
250 log.error("view_id is null");
251 return null; // this should *never* happen !
252 }
253 vid=Math.max(view_id.getId(), ltime) + 1;
254 ltime=vid;
255 tmp_mbrs=tmp_members.copy(); // always operate on the temporary membership
256 tmp_mbrs.remove(suspected_mbrs);
257 tmp_mbrs.remove(old_mbrs);
258 tmp_mbrs.add(new_mbrs);
259 mbrs=tmp_mbrs.getMembers();
260 v=new View(local_addr, vid, mbrs);
261
262 // Update membership (see DESIGN for explanation):
263 tmp_members.set(mbrs);
264
265 // Update joining list (see DESIGN for explanation)
266 if(new_mbrs != null) {
267 for(int i=0; i < new_mbrs.size(); i++) {
268 tmp_mbr=(Address)new_mbrs.elementAt(i);
269 if(!joining.contains(tmp_mbr))
270 joining.addElement(tmp_mbr);
271 }
272 }
273
274 // Update leaving list (see DESIGN for explanations)
275 if(old_mbrs != null) {
276 for(Iterator it=old_mbrs.iterator(); it.hasNext();) {
277 Address addr=(Address)it.next();
278 if(!leaving.contains(addr))
279 leaving.add(addr);
280 }
281 }
282 if(suspected_mbrs != null) {
283 for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
284 Address addr=(Address)it.next();
285 if(!leaving.contains(addr))
286 leaving.add(addr);
287 }
288 }
289
290 if(log.isDebugEnabled()) log.debug("new view is " + v);
291 return v;
292 }
293 }
294
295
296 /**
297 Compute a new view, given the current view, the new members and the suspected/left
298 members. Then simply mcast the view to all members. This is different to the VS GMS protocol,
299 in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in
300 the current view before proceeding to install the next view.
301
302 The members for the new view are computed as follows:
303 <pre>
304 existing leaving suspected joining
305
306 1. new_view y n n y
307 2. tmp_view y y n y
308 (view_dest)
309 </pre>
310
311 <ol>
312 <li>
313 The new view to be installed includes the existing members plus the joining ones and
314 excludes the leaving and suspected members.
315 <li>
316 A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
317 (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
318 to the new view, leaving members are <em>included</em> since they have are waiting for a
319 view in which they are not members any longer before they leave. So, if we did not set a
320 temporary view, joining members would not receive the view (signalling that they have been
321 joined successfully). The temporary view is essentially the current view plus the joining
322 members (old members are still part of the current view).
323 </ol>
324 @return View The new view
325 */
326 public void castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) {
327 View new_view;
328
329 // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
330 new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs);
331 castViewChange(new_view, null);
332 }
333
334
335 public void castViewChange(View new_view, Digest digest) {
336 Message view_change_msg;
337 GmsHeader hdr;
338 long start, stop;
339 ViewId vid=new_view.getVid();
340 int size=-1;
341
342 if(log.isTraceEnabled()) log.trace("mcasting view {" + new_view + "} (" + new_view.size() + " mbrs)\n");
343 start=System.currentTimeMillis();
344 view_change_msg=new Message(); // bcast to all members
345 hdr=new GmsHeader(GmsHeader.VIEW, new_view);
346 hdr.my_digest=digest;
347 view_change_msg.putHeader(name, hdr);
348
349 ack_collector.reset(vid, new_view.getMembers());
350 size=ack_collector.size();
351 passDown(new Event(Event.MSG, view_change_msg));
352 try {
353 ack_collector.waitForAllAcks(view_ack_collection_timeout);
354 stop=System.currentTimeMillis();
355 if(trace)
356 log.trace("received all ACKs (" + size + ") for " + vid + " in " + (stop-start) + "ms");
357 }
358 catch(TimeoutException e) {
359 log.warn("failed to collect all ACKs (" + size + ") for view " + vid + " after " + view_ack_collection_timeout +
360 "ms, missing ACKs from " + ack_collector.getMissing() + " (received=" + ack_collector.getReceived() + ")");
361 }
362 }
363
364
365
366 /**
367 * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass
368 * of View), then digest will be non-null and has to be set before installing the view.
369 */
370 public void installView(View new_view, Digest digest) {
371 if(digest != null)
372 mergeDigest(digest);
373 installView(new_view);
374 }
375
376
377 /**
378 * Sets the new view and sends a VIEW_CHANGE event up and down the stack.
379 */
380 public void installView(View new_view) {
381 Address coord;
382 int rc;
383 ViewId vid=new_view.getVid();
384 Vector mbrs=new_view.getMembers();
385
386 if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view);
387 if(stats) {
388 num_views++;
389 prev_views.add(new_view);
390 }
391
392 // Discards view with id lower than our own. Will be installed without check if first view
393 if(view_id != null) {
394 rc=vid.compareTo(view_id);
395 if(rc <= 0) {
396 if(log.isTraceEnabled())
397 log.trace("[" + local_addr + "] received view <= current view;" +
398 " discarding it (current vid: " + view_id + ", new vid: " + vid + ')');
399 return;
400 }
401 }
402
403 ltime=Math.max(vid.getId(), ltime); // compute Lamport logical time
404
405 /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
406 This ensures that messages sent in view V1 are only received by members of V1 */
407 if(checkSelfInclusion(mbrs) == false) {
408 // only shun if this member was previously part of the group. avoids problem where multiple
409 // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view
410 // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership
411 // bela Nov 20 2003
412 if(shun && local_addr != null && prev_members.contains(local_addr)) {
413 if(warn)
414 log.warn("I (" + local_addr + ") am not a member of view " + new_view +
415 ", shunning myself and leaving the group (prev_members are " + prev_members +
416 ", current view is " + view + ")");
417 if(impl != null)
418 impl.handleExit();
419 passUp(new Event(Event.EXIT));
420 }
421 else {
422 if(warn) log.warn("I (" + local_addr + ") am not a member of view " + new_view + "; discarding view");
423 }
424 return;
425 }
426
427 synchronized(members) { // serialize access to views
428 // assign new_view to view_id
429
430 view=new_view;
431 view_id=vid.copy();
432
433 // Set the membership. Take into account joining members
434 if(mbrs != null && mbrs.size() > 0) {
435 members.set(mbrs);
436 tmp_members.set(members);
437 joining.removeAll(mbrs); // remove all members in mbrs from joining
438 // remove all elements from 'leaving' that are not in 'mbrs'
439 leaving.retainAll(mbrs);
440
441 tmp_members.add(joining); // add members that haven't yet shown up in the membership
442 tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership
443
444 // add to prev_members
445 for(Iterator it=mbrs.iterator(); it.hasNext();) {
446 Address addr=(Address)it.next();
447 if(!prev_members.contains(addr))
448 prev_members.add(addr);
449 }
450 }
451
452 // Send VIEW_CHANGE event up and down the stack:
453 Event view_event=new Event(Event.VIEW_CHANGE, new_view.clone());
454 passDown(view_event); // needed e.g. by failure detector or UDP
455 passUp(view_event);
456
457 coord=determineCoordinator();
458 // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) {
459 // changed on suggestion by yaronr and Nicolas Piedeloupe
460 if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) {
461 becomeCoordinator();
462 }
463 else {
464 if(haveCoordinatorRole() && !local_addr.equals(coord))
465 becomeParticipant();
466 }
467 }
468 }
469
470
471 protected Address determineCoordinator() {
472 synchronized(members) {
473 return members != null && members.size() > 0? (Address)members.elementAt(0) : null;
474 }
475 }
476
477
478 /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */
479 protected boolean wouldBeNewCoordinator(Address potential_new_coord) {
480 Address new_coord;
481
482 if(potential_new_coord == null) return false;
483
484 synchronized(members) {
485 if(members.size() < 2) return false;
486 new_coord=(Address)members.elementAt(1); // member at 2nd place
487 return new_coord != null && new_coord.equals(potential_new_coord);
488 }
489 }
490
491
492 /** Returns true if local_addr is member of mbrs, else false */
493 protected boolean checkSelfInclusion(Vector mbrs) {
494 Object mbr;
495 if(mbrs == null)
496 return false;
497 for(int i=0; i < mbrs.size(); i++) {
498 mbr=mbrs.elementAt(i);
499 if(mbr != null && local_addr.equals(mbr))
500 return true;
501 }
502 return false;
503 }
504
505
506 public View makeView(Vector mbrs) {
507 Address coord=null;
508 long id=0;
509
510 if(view_id != null) {
511 coord=view_id.getCoordAddress();
512 id=view_id.getId();
513 }
514 return new View(coord, id, mbrs);
515 }
516
517
518 public View makeView(Vector mbrs, ViewId vid) {
519 Address coord=null;
520 long id=0;
521
522 if(vid != null) {
523 coord=vid.getCoordAddress();
524 id=vid.getId();
525 }
526 return new View(coord, id, mbrs);
527 }
528
529
530 /** Send down a SET_DIGEST event */
531 public void setDigest(Digest d) {
532 passDown(new Event(Event.SET_DIGEST, d));
533 }
534
535
536 /** Send down a MERGE_DIGEST event */
537 public void mergeDigest(Digest d) {
538 passDown(new Event(Event.MERGE_DIGEST, d));
539 }
540
541
542 /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or
543 timeout, whichever occurs first */
544 public Digest getDigest() {
545 Digest ret=null;
546
547 synchronized(digest_mutex) {
548 digest_promise.reset();
549 passDown(Event.GET_DIGEST_EVT);
550 try {
551 ret=(Digest)digest_promise.getResultWithTimeout(digest_timeout);
552 }
553 catch(TimeoutException e) {
554 if(log.isErrorEnabled()) log.error("digest could not be fetched from below");
555 }
556 return ret;
557 }
558 }
559
560
561 public void up(Event evt) {
562 Object obj;
563 Message msg;
564 GmsHeader hdr;
565 MergeData merge_data;
566
567 switch(evt.getType()) {
568
569 case Event.MSG:
570 msg=(Message)evt.getArg();
571 obj=msg.getHeader(name);
572 if(obj == null || !(obj instanceof GmsHeader))
573 break;
574 hdr=(GmsHeader)msg.removeHeader(name);
575 switch(hdr.type) {
576 case GmsHeader.JOIN_REQ:
577 view_handler.add(new Request(Request.JOIN, hdr.mbr, false, null));
578 break;
579 case GmsHeader.JOIN_RSP:
580 impl.handleJoinResponse(hdr.join_rsp);
581 break;
582 case GmsHeader.LEAVE_REQ:
583 if(log.isDebugEnabled())
584 log.debug("received LEAVE_REQ for " + hdr.mbr + " from " + msg.getSrc());
585 if(hdr.mbr == null) {
586 if(log.isErrorEnabled()) log.error("LEAVE_REQ's mbr field is null");
587 return;
588 }
589 view_handler.add(new Request(Request.LEAVE, hdr.mbr, false, null));
590 break;
591 case GmsHeader.LEAVE_RSP:
592 impl.handleLeaveResponse();
593 break;
594 case GmsHeader.VIEW:
595 if(hdr.view == null) {
596 if(log.isErrorEnabled()) log.error("[VIEW]: view == null");
597 return;
598 }
599
600 // send VIEW_ACK to sender of view
601 Address coord=msg.getSrc();
602 Message view_ack=new Message(coord, null, null);
603 GmsHeader tmphdr=new GmsHeader(GmsHeader.VIEW_ACK, hdr.view);
604 view_ack.putHeader(name, tmphdr);
605 passDown(new Event(Event.MSG, view_ack));
606 impl.handleViewChange(hdr.view, hdr.my_digest);
607 break;
608
609 case GmsHeader.VIEW_ACK:
610 Object sender=msg.getSrc();
611 ack_collector.ack(sender);
612 return; // don't pass further up
613
614 case GmsHeader.MERGE_REQ:
615 impl.handleMergeRequest(msg.getSrc(), hdr.merge_id);
616 break;
617
618 case GmsHeader.MERGE_RSP:
619 merge_data=new MergeData(msg.getSrc(), hdr.view, hdr.my_digest);
620 merge_data.merge_rejected=hdr.merge_rejected;
621 impl.handleMergeResponse(merge_data, hdr.merge_id);
622 break;
623
624 case GmsHeader.INSTALL_MERGE_VIEW:
625 impl.handleMergeView(new MergeData(msg.getSrc(), hdr.view, hdr.my_digest), hdr.merge_id);
626 break;
627
628 case GmsHeader.CANCEL_MERGE:
629 impl.handleMergeCancelled(hdr.merge_id);
630 break;
631
632 default:
633 if(log.isErrorEnabled()) log.error("GmsHeader with type=" + hdr.type + " not known");
634 }
635 return; // don't pass up
636
637 case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
638 case Event.DISCONNECT_OK: // dito (e.g. sent by TP layer). Don't send up the stack
639 return;
640
641
642 case Event.SET_LOCAL_ADDRESS:
643 local_addr=(Address)evt.getArg();
644 if(print_local_addr) {
645 System.out.println("\n-------------------------------------------------------\n" +
646 "GMS: address is " + local_addr +
647 "\n-------------------------------------------------------");
648 }
649 break; // pass up
650
651 case Event.SUSPECT:
652 Address suspected=(Address)evt.getArg();
653 view_handler.add(new Request(Request.LEAVE, suspected, true, null));
654 ack_collector.suspect(suspected);
655 break; // pass up
656
657 case Event.UNSUSPECT:
658 impl.unsuspect((Address)evt.getArg());
659 return; // discard
660
661 case Event.MERGE:
662 view_handler.add(new Request(Request.MERGE, null, false, (Vector)evt.getArg()));
663 return; // don't pass up
664 }
665
666 if(impl.handleUpEvent(evt))
667 passUp(evt);
668 }
669
670
671 /**
672 This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs
673 to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer
674 responds with a GET_DIGEST_OK event.<p>
675 However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process
676 the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler
677 thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which
678 JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because
679 it won't be processed twice.
680 */
681 public void receiveUpEvent(Event evt) {
682 switch(evt.getType()) {
683 case Event.GET_DIGEST_OK:
684 digest_promise.setResult(evt.getArg());
685 return; // don't pass further up
686 }
687 super.receiveUpEvent(evt);
688 }
689
690
691 public void down(Event evt) {
692 switch(evt.getType()) {
693
694 case Event.CONNECT:
695 passDown(evt);
696 if(local_addr == null)
697 if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null");
698 impl.join(local_addr);
699 passUp(new Event(Event.CONNECT_OK));
700 return; // don't pass down: was already passed down
701
702 case Event.DISCONNECT:
703 impl.leave((Address)evt.getArg());
704 passUp(new Event(Event.DISCONNECT_OK));
705 initState(); // in case connect() is called again
706 break; // pass down
707 }
708
709 if(impl.handleDownEvent(evt))
710 passDown(evt);
711 }
712
713
714 /** Setup the Protocol instance according to the configuration string */
715 public boolean setProperties(Properties props) {
716 String str;
717
718 super.setProperties(props);
719 str=props.getProperty("shun");
720 if(str != null) {
721 shun=Boolean.valueOf(str).booleanValue();
722 props.remove("shun");
723 }
724
725 str=props.getProperty("merge_leader");
726 if(str != null) {
727 merge_leader=Boolean.valueOf(str).booleanValue();
728 props.remove("merge_leader");
729 }
730
731 str=props.getProperty("print_local_addr");
732 if(str != null) {
733 print_local_addr=Boolean.valueOf(str).booleanValue();
734 props.remove("print_local_addr");
735 }
736
737 str=props.getProperty("join_timeout"); // time to wait for JOIN
738 if(str != null) {
739 join_timeout=Long.parseLong(str);
740 props.remove("join_timeout");
741 }
742
743 str=props.getProperty("join_retry_timeout"); // time to wait between JOINs
744 if(str != null) {
745 join_retry_timeout=Long.parseLong(str);
746 props.remove("join_retry_timeout");
747 }
748
749 str=props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
750 if(str != null) {
751 leave_timeout=Long.parseLong(str);
752 props.remove("leave_timeout");
753 }
754
755 str=props.getProperty("merge_timeout"); // time to wait for MERGE_RSPS from subgroup coordinators
756 if(str != null) {
757 merge_timeout=Long.parseLong(str);
758 props.remove("merge_timeout");
759 }
760
761 str=props.getProperty("digest_timeout"); // time to wait for GET_DIGEST_OK from PBCAST
762 if(str != null) {
763 digest_timeout=Long.parseLong(str);
764 props.remove("digest_timeout");
765 }
766
767 str=props.getProperty("view_ack_collection_timeout");
768 if(str != null) {
769 view_ack_collection_timeout=Long.parseLong(str);
770 props.remove("view_ack_collection_timeout");
771 }
772
773 str=props.getProperty("disable_initial_coord");
774 if(str != null) {
775 disable_initial_coord=Boolean.valueOf(str).booleanValue();
776 props.remove("disable_initial_coord");
777 }
778
779 str=props.getProperty("handle_concurrent_startup");
780 if(str != null) {
781 handle_concurrent_startup=Boolean.valueOf(str).booleanValue();
782 props.remove("handle_concurrent_startup");
783 }
784
785 str=props.getProperty("num_prev_mbrs");
786 if(str != null) {
787 num_prev_mbrs=Integer.parseInt(str);
788 props.remove("num_prev_mbrs");
789 }
790
791 if(props.size() > 0) {
792 log.error("GMS.setProperties(): the following properties are not recognized: " + props);
793
794 return false;
795 }
796 return true;
797 }
798
799
800
801 /* ------------------------------- Private Methods --------------------------------- */
802
803 void initState() {
804 becomeClient();
805 view_id=null;
806 view=null;
807 }
808
809
810 /* --------------------------- End of Private Methods ------------------------------- */
811
812
813
814 public static class GmsHeader extends Header implements Streamable {
815 public static final byte JOIN_REQ=1;
816 public static final byte JOIN_RSP=2;
817 public static final byte LEAVE_REQ=3;
818 public static final byte LEAVE_RSP=4;
819 public static final byte VIEW=5;
820 public static final byte MERGE_REQ=6;
821 public static final byte MERGE_RSP=7;
822 public static final byte INSTALL_MERGE_VIEW=8;
823 public static final byte CANCEL_MERGE=9;
824 public static final byte VIEW_ACK=10;
825
826 byte type=0;
827 View view=null; // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW
828 Address mbr=null; // used when type=JOIN_REQ or LEAVE_REQ
829 JoinRsp join_rsp=null; // used when type=JOIN_RSP
830 Digest my_digest=null; // used when type=MERGE_RSP or INSTALL_MERGE_VIEW
831 ViewId merge_id=null; // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE
832 boolean merge_rejected=false; // used when type=MERGE_RSP
833
834
835 public GmsHeader() {
836 } // used for Externalization
837
838 public GmsHeader(byte type) {
839 this.type=type;
840 }
841
842
843 /** Used for VIEW header */
844 public GmsHeader(byte type, View view) {
845 this.type=type;
846 this.view=view;
847 }
848
849
850 /** Used for JOIN_REQ or LEAVE_REQ header */
851 public GmsHeader(byte type, Address mbr) {
852 this.type=type;
853 this.mbr=mbr;
854 }
855
856 /** Used for JOIN_RSP header */
857 public GmsHeader(byte type, JoinRsp join_rsp) {
858 this.type=type;
859 this.join_rsp=join_rsp;
860 }
861
862 public byte getType() {
863 return type;
864 }
865
866 public Address getMemeber() {
867 return mbr;
868 }
869
870 public String toString() {
871 StringBuffer sb=new StringBuffer("GmsHeader");
872 sb.append('[' + type2String(type) + ']');
873 switch(type) {
874 case JOIN_REQ:
875 sb.append(": mbr=" + mbr);
876 break;
877
878 case JOIN_RSP:
879 sb.append(": join_rsp=" + join_rsp);
880 break;
881
882 case LEAVE_REQ:
883 sb.append(": mbr=" + mbr);
884 break;
885
886 case LEAVE_RSP:
887 break;
888
889 case VIEW:
890 case VIEW_ACK:
891 sb.append(": view=" + view);
892 break;
893
894 case MERGE_REQ:
895 sb.append(": merge_id=" + merge_id);
896 break;
897
898 case MERGE_RSP:
899 sb.append(": view=" + view + ", digest=" + my_digest + ", merge_rejected=" + merge_rejected +
900 ", merge_id=" + merge_id);
901 break;
902
903 case INSTALL_MERGE_VIEW:
904 sb.append(": view=" + view + ", digest=" + my_digest);
905 break;
906
907 case CANCEL_MERGE:
908 sb.append(", <merge cancelled>, merge_id=" + merge_id);
909 break;
910 }
911 return sb.toString();
912 }
913
914
915 public static String type2String(int type) {
916 switch(type) {
917 case JOIN_REQ: return "JOIN_REQ";
918 case JOIN_RSP: return "JOIN_RSP";
919 case LEAVE_REQ: return "LEAVE_REQ";
920 case LEAVE_RSP: return "LEAVE_RSP";
921 case VIEW: return "VIEW";
922 case MERGE_REQ: return "MERGE_REQ";
923 case MERGE_RSP: return "MERGE_RSP";
924 case INSTALL_MERGE_VIEW: return "INSTALL_MERGE_VIEW";
925 case CANCEL_MERGE: return "CANCEL_MERGE";
926 case VIEW_ACK: return "VIEW_ACK";
927 default: return "<unknown>";
928 }
929 }
930
931
932 public void writeExternal(ObjectOutput out) throws IOException {
933 out.writeByte(type);
934 out.writeObject(view);
935 out.writeObject(mbr);
936 out.writeObject(join_rsp);
937 out.writeObject(my_digest);
938 out.writeObject(merge_id);
939 out.writeBoolean(merge_rejected);
940 }
941
942
943 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
944 type=in.readByte();
945 view=(View)in.readObject();
946 mbr=(Address)in.readObject();
947 join_rsp=(JoinRsp)in.readObject();
948 my_digest=(Digest)in.readObject();
949 merge_id=(ViewId)in.readObject();
950 merge_rejected=in.readBoolean();
951 }
952
953
954 public void writeTo(DataOutputStream out) throws IOException {
955 out.writeByte(type);
956 Util.writeStreamable(view, out);
957 Util.writeAddress(mbr, out);
958 Util.writeStreamable(join_rsp, out);
959 Util.writeStreamable(my_digest, out);
960 Util.writeStreamable(merge_id, out); // kludge: we know merge_id is a ViewId
961 out.writeBoolean(merge_rejected);
962 }
963
964 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
965 type=in.readByte();
966 view=(View)Util.readStreamable(View.class, in);
967 mbr=Util.readAddress(in);
968 join_rsp=(JoinRsp)Util.readStreamable(JoinRsp.class, in);
969 my_digest=(Digest)Util.readStreamable(Digest.class, in);
970 merge_id=(ViewId)Util.readStreamable(ViewId.class, in);
971 merge_rejected=in.readBoolean();
972 }
973
974 public long size() {
975 long retval=Global.BYTE_SIZE *2; // type + merge_rejected
976
977 retval+=Global.BYTE_SIZE; // presence view
978 if(view != null)
979 retval+=view.serializedSize();
980
981 retval+=Util.size(mbr);
982
983 retval+=Global.BYTE_SIZE; // presence of join_rsp
984 if(join_rsp != null)
985 retval+=join_rsp.serializedSize();
986
987 retval+=Global.BYTE_SIZE; // presence for my_digest
988 if(my_digest != null)
989 retval+=my_digest.serializedSize();
990
991 retval+=Global.BYTE_SIZE; // presence for merge_id
992 if(merge_id != null)
993 retval+=merge_id.serializedSize();
994 return retval;
995 }
996
997 }
998
999
1000
1001
1002 public static class Request {
1003 static final int JOIN = 1;
1004 static final int LEAVE = 2;
1005 static final int SUSPECT = 3;
1006 static final int MERGE = 4;
1007
1008
1009 int type=-1;
1010 Address mbr=null;
1011 boolean suspected;
1012 Vector coordinators=null;
1013
1014 Request(int type, Address mbr, boolean suspected, Vector coordinators) {
1015 this.type=type;
1016 this.mbr=mbr;
1017 this.suspected=suspected;
1018 this.coordinators=coordinators;
1019 }
1020
1021 public String toString() {
1022 switch(type) {
1023 case JOIN: return "JOIN(" + mbr + ")";
1024 case LEAVE: return "LEAVE(" + mbr + ", " + suspected + ")";
1025 case SUSPECT: return "SUSPECT(" + mbr + ")";
1026 case MERGE: return "MERGE(" + coordinators + ")";
1027 }
1028 return "<invalid (type=" + type + ")";
1029 }
1030 }
1031
1032
1033
1034
1035 /**
1036 * Class which processes JOIN, LEAVE and MERGE requests. Requests are queued and processed in FIFO order
1037 * @author Bela Ban
1038 * @version $Id: GMS.java,v 1.45 2005/11/04 18:40:36 belaban Exp $
1039 */
1040 class ViewHandler implements Runnable {
1041 Thread t;
1042 Queue q=new Queue(); // Queue<Request>
1043 boolean suspended=false;
1044 final static long INTERVAL=5000;
1045
1046
1047 void add(Request req) {
1048 if(suspended) {
1049 log.warn("queue is suspended; request is discarded");
1050 return;
1051 }
1052 start();
1053 try {
1054 q.add(req);
1055 }
1056 catch(QueueClosedException e) {
1057 if(trace)
1058 log.trace("queue is closed; request " + req + " is discarded");
1059 }
1060 }
1061
1062 synchronized void waitUntilCompleted(long timeout) {
1063 if(t != null) {
1064 try {
1065 t.join(timeout);
1066 }
1067 catch(InterruptedException e) {
1068 }
1069 }
1070
1071 }
1072
1073 public void suspend() {
1074 suspended=true;
1075 q.close(true);
1076 }
1077
1078 public void resume() {
1079 if(q.closed())
1080 q.reset();
1081 suspended=false;
1082 }
1083
1084 public void run() {
1085 Request req;
1086 while(!q.closed() && Thread.currentThread().equals(t)) {
1087 try {
1088 req=(Request)q.remove(INTERVAL);
1089 process(req);
1090 }
1091 catch(QueueClosedException e) {
1092 break;
1093 }
1094 catch(TimeoutException e) {
1095 break;
1096 }
1097 }
1098 }
1099
1100
1101
1102 private void process(Request req) {
1103 if(trace)
1104 log.trace("processing " + req);
1105 switch(req.type) {
1106 case Request.JOIN:
1107 impl.handleJoin(req.mbr);
1108 break;
1109 case Request.LEAVE:
1110 if(req.suspected)
1111 impl.suspect(req.mbr);
1112 else
1113 impl.handleLeave(req.mbr, req.suspected);
1114 break;
1115 case Request.SUSPECT:
1116 impl.suspect(req.mbr);
1117 break;
1118 case Request.MERGE:
1119 impl.merge(req.coordinators);
1120 break;
1121 default:
1122 log.error("Request " + req.type + " is unknown; discarded");
1123 }
1124 }
1125
1126
1127 private void handleMergeRequest(Vector coordinators) {
1128
1129 }
1130
1131
1132 synchronized void start() {
1133 if(q.closed())
1134 q.reset();
1135 suspended=false;
1136 if(t == null || !t.isAlive()) {
1137 t=new Thread(this, "ViewHandler");
1138 t.setDaemon(true);
1139 t.start();
1140 if(trace)
1141 log.trace("ViewHandler started");
1142 }
1143 }
1144
1145 synchronized void stop(boolean flush) {
1146 q.close(flush);
1147 }
1148
1149
1150 }
1151
1152
1153
1154}