Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/FD_SOCK.java


1   // $Id: FD_SOCK.java,v 1.30 2005/10/19 12:12:56 belaban Exp $
2   
3   package org.jgroups.protocols;
4   
5   import org.jgroups.*;
6   import org.jgroups.stack.IpAddress;
7   import org.jgroups.stack.Protocol;
8   import org.jgroups.util.*;
9   
10  import java.io.*;
11  import java.net.ServerSocket;
12  import java.net.Socket;
13  import java.net.InetAddress;
14  import java.net.UnknownHostException;
15  import java.util.*;
16  import java.util.List;
17  
18  
19  /**
20   * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a
21   * server socket and announces its address together with the server socket's address in a multicast. A
22   * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below
23   * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When
24   * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be
25   * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between
26   * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity
27   * will take place between 2 peers as long as they are alive (i.e. have their server sockets open).
28   * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore
29   * they won't be detected.
30   * The FD_SOCK protocol will work for groups where members are on different hosts<p>
31   * The costs involved are 2 additional threads: one that
32   * monitors the client side of the socket connection (to monitor a peer) and another one that manages the
33   * server socket. However, those threads will be idle as long as both peers are running.
34   * @author Bela Ban May 29 2001
35   */
36  public class FD_SOCK extends Protocol implements Runnable {
37      long                get_cache_timeout=3000;            // msecs to wait for the socket cache from the coordinator
38      final long          get_cache_retry_timeout=500;       // msecs to wait until we retry getting the cache from coord
39      long                suspect_msg_interval=5000;         // (BroadcastTask): mcast SUSPECT every 5000 msecs
40      int                 num_tries=3;                       // attempts coord is solicited for socket cache until we give up
41      final Vector        members=new Vector(11);            // list of group members (updated on VIEW_CHANGE)
42      boolean             srv_sock_sent=false;               // has own socket been broadcast yet ?
43      final Vector        pingable_mbrs=new Vector(11);      // mbrs from which we select ping_dest. may be subset of 'members'
44      final Promise       get_cache_promise=new Promise();   // used for rendezvous on GET_CACHE and GET_CACHE_RSP
45      boolean             got_cache_from_coord=false;        // was cache already fetched ?
46      Address             local_addr=null;                   // our own address
47      ServerSocket        srv_sock=null;                     // server socket to which another member connects to monitor me
48      InetAddress         srv_sock_bind_addr=null;           // the NIC on which the ServerSocket should listen
49      ServerSocketHandler srv_sock_handler=null;             // accepts new connections on srv_sock
50      IpAddress           srv_sock_addr=null;                // pair of server_socket:port
51      Address             ping_dest=null;                    // address of the member we monitor
52      Socket              ping_sock=null;                    // socket to the member we monitor
53      InputStream         ping_input=null;                   // input stream of the socket to the member we monitor
54      Thread              pinger_thread=null;                // listens on ping_sock, suspects member if socket is closed
55      final Hashtable     cache=new Hashtable(11);           // keys=Addresses, vals=IpAddresses (socket:port)
56  
57      /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)
58       * picks a random port */
59      int                 start_port=0;
60      final Promise       ping_addr_promise=new Promise();   // to fetch the ping_addr for ping_dest
61      final Object        sock_mutex=new Object();           // for access to ping_sock, ping_input
62      TimeScheduler       timer=null;
63      final BroadcastTask bcast_task=new BroadcastTask();    // to transmit SUSPECT message (until view change)
64      boolean             regular_sock_close=false;         // used by interruptPingerThread() when new ping_dest is computed
65      int                 num_suspect_events=0;
66      private static final int NORMAL_TEMINATION=9;
67      private static final int ABNORMAL_TEMINATION=-1;
68      private static final String name="FD_SOCK";
69  
70      BoundedList          suspect_history=new BoundedList(20);
71  
72  
73      public String getName() {
74          return name;
75      }
76  
77      public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
78      public String getMembers() {return members != null? members.toString() : "null";}
79      public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}
80      public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}
81      public int getNumSuspectEventsGenerated() {return num_suspect_events;}
82      public String printSuspectHistory() {
83          StringBuffer sb=new StringBuffer();
84          for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {
85              sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");
86          }
87          return sb.toString();
88      }
89  
90      public boolean setProperties(Properties props) {
91          String str, tmp=null;
92  
93          super.setProperties(props);
94          str=props.getProperty("get_cache_timeout");
95          if(str != null) {
96              get_cache_timeout=Long.parseLong(str);
97              props.remove("get_cache_timeout");
98          }
99  
100         str=props.getProperty("suspect_msg_interval");
101         if(str != null) {
102             suspect_msg_interval=Long.parseLong(str);
103             props.remove("suspect_msg_interval");
104         }
105 
106         str=props.getProperty("num_tries");
107         if(str != null) {
108             num_tries=Integer.parseInt(str);
109             props.remove("num_tries");
110         }
111 
112         str=props.getProperty("start_port");
113         if(str != null) {
114             start_port=Integer.parseInt(str);
115             props.remove("start_port");
116         }
117 
118 
119         // PropertyPermission not granted if running in an untrusted environment with JNLP.
120         try {
121             tmp=System.getProperty("bind.address");
122             if(Util.isBindAddressPropertyIgnored()) {
123                 tmp=null;
124             }
125         }
126         catch (SecurityException ex){
127         }
128 
129         if(tmp != null)
130             str=tmp;
131         else
132             str=props.getProperty("srv_sock_bind_addr");
133         if(str != null) {
134             try {
135                 srv_sock_bind_addr=InetAddress.getByName(str);
136             }
137             catch(UnknownHostException unknown) {
138                 if(log.isFatalEnabled()) log.fatal("(srv_sock_bind_addr): host " + str + " not known");
139                 return false;
140             }
141             props.remove("srv_sock_bind_addr");
142         }
143 
144 
145         if(props.size() > 0) {
146             log.error("FD_SOCK.setProperties(): the following properties are not recognized: " + props);
147             return false;
148         }
149         return true;
150     }
151 
152 
153     public void init() throws Exception {
154         srv_sock_handler=new ServerSocketHandler();
155         timer=stack != null ? stack.timer : null;
156         if(timer == null)
157             throw new Exception("FD_SOCK.init(): timer == null");
158     }
159 
160 
161     public void stop() {
162         bcast_task.removeAll();
163         stopPingerThread();
164         stopServerSocket();
165     }
166 
167     public void resetStats() {
168         super.resetStats();
169         num_suspect_events=0;
170         suspect_history.removeAll();
171     }
172 
173 
174     public void up(Event evt) {
175         Message msg;
176         FdHeader hdr;
177 
178         switch(evt.getType()) {
179 
180         case Event.SET_LOCAL_ADDRESS:
181             local_addr=(Address) evt.getArg();
182             break;
183 
184         case Event.MSG:
185             msg=(Message) evt.getArg();
186             hdr=(FdHeader) msg.removeHeader(name);
187             if(hdr == null)
188                 break;  // message did not originate from FD_SOCK layer, just pass up
189 
190             switch(hdr.type) {
191 
192             case FdHeader.SUSPECT:
193                 if(hdr.mbrs != null) {
194                     if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr);
195                     for(int i=0; i < hdr.mbrs.size(); i++) {
196                         passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
197                         passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
198                     }
199                 }
200                 else
201                     if(warn) log.warn("[SUSPECT]: hdr.mbrs == null");
202                 break;
203 
204                 // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it
205             case FdHeader.WHO_HAS_SOCK:
206                 if(local_addr != null && local_addr.equals(msg.getSrc()))
207                     return; // don't reply to WHO_HAS bcasts sent by me !
208 
209                 if(hdr.mbr == null) {
210                     if(log.isErrorEnabled()) log.error("hdr.mbr is null");
211                     return;
212                 }
213 
214                 if(trace) log.trace("who-has-sock " + hdr.mbr);
215 
216                 // 1. Try my own address, maybe it's me whose socket is wanted
217                 if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) {
218                     sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr);  // unicast message to msg.getSrc()
219                     return;
220                 }
221 
222                 // 2. If I don't have it, maybe it is in the cache
223                 if(cache.containsKey(hdr.mbr))
224                     sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr));  // ucast msg
225                 break;
226 
227 
228                 // Update the cache with the addr:sock_addr entry (if on the same host)
229             case FdHeader.I_HAVE_SOCK:
230                 if(hdr.mbr == null || hdr.sock_addr == null) {
231                     if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");
232                     return;
233                 }
234 
235                 // if(!cache.containsKey(hdr.mbr))
236                 cache.put(hdr.mbr, hdr.sock_addr); // update the cache
237                 if(trace) log.trace("i-have-sock: " + hdr.mbr + " --> " +
238                                                    hdr.sock_addr + " (cache is " + cache + ')');
239 
240                 if(ping_dest != null && hdr.mbr.equals(ping_dest))
241                     ping_addr_promise.setResult(hdr.sock_addr);
242                 break;
243 
244                 // Return the cache to the sender of this message
245             case FdHeader.GET_CACHE:
246                 if(hdr.mbr == null) {
247                     if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null");
248                     return;
249                 }
250                 hdr=new FdHeader(FdHeader.GET_CACHE_RSP);
251                 hdr.cachedAddrs=(Hashtable) cache.clone();
252                 msg=new Message(hdr.mbr, null, null);
253                 msg.putHeader(name, hdr);
254                 passDown(new Event(Event.MSG, msg));
255                 break;
256 
257             case FdHeader.GET_CACHE_RSP:
258                 if(hdr.cachedAddrs == null) {
259                     if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null");
260                     return;
261                 }
262                 get_cache_promise.setResult(hdr.cachedAddrs);
263                 break;
264             }
265             return;
266         }
267 
268         passUp(evt);                                        // pass up to the layer above us
269     }
270 
271 
272     public void down(Event evt) {
273         Address mbr, tmp_ping_dest;
274         View v;
275 
276         switch(evt.getType()) {
277 
278             case Event.UNSUSPECT:
279                 bcast_task.removeSuspectedMember((Address)evt.getArg());
280                 break;
281 
282             case Event.CONNECT:
283                 passDown(evt);
284                 srv_sock=Util.createServerSocket(srv_sock_bind_addr, start_port); // grab a random unused port above 10000
285                 srv_sock_addr=new IpAddress(srv_sock_bind_addr, srv_sock.getLocalPort());
286                 startServerSocket();
287                 //if(pinger_thread == null)
288                   //  startPingerThread();
289                 break;
290 
291             case Event.VIEW_CHANGE:
292                 synchronized(this) {
293                     v=(View) evt.getArg();
294                     members.removeAllElements();
295                     members.addAll(v.getMembers());
296                     bcast_task.adjustSuspectedMembers(members);
297                     pingable_mbrs.removeAllElements();
298                     pingable_mbrs.addAll(members);
299                     passDown(evt);
300 
301                     if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members);
302 
303                     // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
304                     if(!got_cache_from_coord) {
305                         getCacheFromCoordinator();
306                         got_cache_from_coord=true;
307                     }
308 
309 
310                     // 2. Broadcast my own addr:sock to all members so they can update their cache
311                     if(!srv_sock_sent) {
312                         if(srv_sock_addr != null) {
313                             sendIHaveSockMessage(null, // send to all members
314                                     local_addr,
315                                     srv_sock_addr);
316                             srv_sock_sent=true;
317                         }
318                         else
319                             if(warn) log.warn("(VIEW_CHANGE): srv_sock_addr == null");
320                     }
321 
322                     // 3. Remove all entries in 'cache' which are not in the new membership
323                     for(Enumeration e=cache.keys(); e.hasMoreElements();) {
324                         mbr=(Address) e.nextElement();
325                         if(!members.contains(mbr))
326                             cache.remove(mbr);
327                     }
328 
329                     if(members.size() > 1) {
330                         if(pinger_thread != null && pinger_thread.isAlive()) {
331                             tmp_ping_dest=determinePingDest();
332                             if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) {
333                                 interruptPingerThread(); // allows the thread to use the new socket
334                             }
335                         }
336                         else
337                             startPingerThread(); // only starts if not yet running
338                     }
339                     else {
340                         ping_dest=null;
341                         stopPingerThread();
342                     }
343                 }
344                 break;
345 
346             default:
347                 passDown(evt);
348                 break;
349         }
350     }
351 
352 
353     /**
354      * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its
355      * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on
356      * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,
357      * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
358      * there are fewer than 2 members).
359      */
360     public void run() {
361         Address tmp_ping_dest;
362         IpAddress ping_addr;
363         int max_fetch_tries=10;  // number of times a socket address is to be requested before giving up
364 
365         if(trace) log.trace("pinger_thread started"); // +++ remove
366 
367         while(pinger_thread != null) {
368             tmp_ping_dest=determinePingDest(); // gets the neighbor to our right
369             if(log.isDebugEnabled())
370                 log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs);
371             if(tmp_ping_dest == null) {
372                 ping_dest=null;
373                 pinger_thread=null;
374                 break;
375             }
376             ping_dest=tmp_ping_dest;
377             ping_addr=fetchPingAddress(ping_dest);
378             if(ping_addr == null) {
379                 if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying");
380                 if(--max_fetch_tries <= 0)
381                     break;
382                 Util.sleep(2000);
383                 continue;
384             }
385 
386             if(!setupPingSocket(ping_addr)) {
387                 // covers use cases #7 and #8 in GmsTests.txt
388                 if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest);
389                 broadcastSuspectMessage(ping_dest);
390                 pingable_mbrs.removeElement(ping_dest);
391                 continue;
392             }
393 
394             if(log.isDebugEnabled()) log.debug("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache);
395 
396             // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception
397             try {
398                 if(ping_input != null) {
399                     int c=ping_input.read();
400                     switch(c) {
401                         case NORMAL_TEMINATION:
402                             if(log.isDebugEnabled())
403                                 log.debug("peer closed socket normally");
404                             pinger_thread=null;
405                             break;
406                         case ABNORMAL_TEMINATION:
407                             handleSocketClose(null);
408                             break;
409                         default:
410                             break;
411                     }
412                 }
413             }
414             catch(IOException ex) {  // we got here when the peer closed the socket --> suspect peer and then continue
415                 handleSocketClose(ex);
416             }
417             catch(Throwable catch_all_the_rest) {
418                 log.error("exception", catch_all_the_rest);
419             }
420         }
421         if(log.isDebugEnabled()) log.debug("pinger thread terminated");
422         pinger_thread=null;
423     }
424 
425 
426 
427 
428     /* ----------------------------------- Private Methods -------------------------------------- */
429 
430 
431     void handleSocketClose(Exception ex) {
432         teardownPingSocket();     // make sure we have no leftovers
433         if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
434             if(log.isDebugEnabled())
435                 log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')');
436             broadcastSuspectMessage(ping_dest);
437             pingable_mbrs.removeElement(ping_dest);
438         }
439         else {
440             if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset");
441             regular_sock_close=false;
442         }
443     }
444 
445 
446     void startPingerThread() {
447         if(pinger_thread == null) {
448             pinger_thread=new Thread(this, "FD_SOCK Ping thread");
449             pinger_thread.setDaemon(true);
450             pinger_thread.start();
451         }
452     }
453 
454 
455     void stopPingerThread() {
456         if(pinger_thread != null && pinger_thread.isAlive()) {
457             regular_sock_close=true;
458             teardownPingSocket();
459         }
460         pinger_thread=null;
461     }
462 
463 
464     /**
465      * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1
466      * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are
467      * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>
468      * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this
469      * code portable and we don't have to check for OSs.
470      * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().
471      */
472     void interruptPingerThread() {
473         if(pinger_thread != null && pinger_thread.isAlive()) {
474             regular_sock_close=true;
475             teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
476         }
477     }
478 
479     void startServerSocket() {
480         if(srv_sock_handler != null)
481             srv_sock_handler.start(); // won't start if already running
482     }
483 
484     void stopServerSocket() {
485         if(srv_sock_handler != null)
486             srv_sock_handler.stop();
487     }
488 
489 
490     /**
491      * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
492      */
493     boolean setupPingSocket(IpAddress dest) {
494         synchronized(sock_mutex) {
495             if(dest == null) {
496                 if(log.isErrorEnabled()) log.error("destination address is null");
497                 return false;
498             }
499             try {
500                 ping_sock=new Socket(dest.getIpAddress(), dest.getPort());
501                 ping_sock.setSoLinger(true, 1);
502                 ping_input=ping_sock.getInputStream();
503                 return true;
504             }
505             catch(Throwable ex) {
506                 return false;
507             }
508         }
509     }
510 
511 
512     void teardownPingSocket() {
513         synchronized(sock_mutex) {
514             if(ping_sock != null) {
515                 try {
516                     ping_sock.shutdownInput();
517                     ping_sock.close();
518                 }
519                 catch(Exception ex) {
520                 }
521                 ping_sock=null;
522             }
523             if(ping_input != null) {
524                 try {
525                     ping_input.close();
526                 }
527                 catch(Exception ex) {
528                 }
529                 ping_input=null;
530             }
531         }
532     }
533 
534 
535     /**
536      * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
537      * to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
538      */
539     void getCacheFromCoordinator() {
540         Address coord;
541         int attempts=num_tries;
542         Message msg;
543         FdHeader hdr;
544         Hashtable result;
545 
546         get_cache_promise.reset();
547         while(attempts > 0) {
548             if((coord=determineCoordinator()) != null) {
549                 if(coord.equals(local_addr)) { // we are the first member --> empty cache
550                     if(log.isDebugEnabled()) log.debug("first member; cache is empty");
551                     return;
552                 }
553                 hdr=new FdHeader(FdHeader.GET_CACHE);
554                 hdr.mbr=local_addr;
555                 msg=new Message(coord, null, null);
556                 msg.putHeader(name, hdr);
557                 passDown(new Event(Event.MSG, msg));
558                 result=(Hashtable) get_cache_promise.getResult(get_cache_timeout);
559                 if(result != null) {
560                     cache.putAll(result); // replace all entries (there should be none !) in cache with the new values
561                     if(trace) log.trace("got cache from " + coord + ": cache is " + cache);
562                     return;
563                 }
564                 else {
565                     if(log.isErrorEnabled()) log.error("received null cache; retrying");
566                 }
567             }
568 
569             Util.sleep(get_cache_retry_timeout);
570             --attempts;
571         }
572     }
573 
574 
575     /**
576      * Sends a SUSPECT message to all group members. Only the coordinator (or the next member in line if the coord
577      * itself is suspected) will react to this message by installing a new view. To overcome the unreliability
578      * of the SUSPECT message (it may be lost because we are not above any retransmission layer), the following scheme
579      * is used: after sending the SUSPECT message, it is also added to the broadcast task, which will periodically
580      * re-send the SUSPECT until a view is received in which the suspected process is not a member anymore. The reason is
581      * that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
582      * react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
583      */
584     void broadcastSuspectMessage(Address suspected_mbr) {
585         Message suspect_msg;
586         FdHeader hdr;
587 
588         if(suspected_mbr == null) return;
589 
590         if(trace) log.trace("suspecting " + suspected_mbr + " (own address is " + local_addr + ')');
591 
592         // 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
593         hdr=new FdHeader(FdHeader.SUSPECT);
594         hdr.mbrs=new Vector(1);
595         hdr.mbrs.addElement(suspected_mbr);
596         suspect_msg=new Message();
597         suspect_msg.putHeader(name, hdr);
598         passDown(new Event(Event.MSG, suspect_msg));
599 
600         // 2. Add to broadcast task and start latter (if not yet running). The task will end when
601         //    suspected members are removed from the membership
602         bcast_task.addSuspectedMember(suspected_mbr);
603         if(stats) {
604             num_suspect_events++;
605             suspect_history.add(suspected_mbr);
606         }
607     }
608 
609 
610     void broadcastWhoHasSockMessage(Address mbr) {
611         Message msg;
612         FdHeader hdr;
613 
614         if(local_addr != null && mbr != null)
615             if(log.isDebugEnabled()) log.debug("[" + local_addr + "]: who-has " + mbr);
616 
617         msg=new Message();  // bcast msg
618         hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
619         hdr.mbr=mbr;
620         msg.putHeader(name, hdr);
621         passDown(new Event(Event.MSG, msg));
622     }
623 
624 
625     /**
626      Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
627      it will be unicast back to the requester
628      */
629     void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
630         Message msg=new Message(dst, null, null);
631         FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK);
632         hdr.mbr=mbr;
633         hdr.sock_addr=addr;
634         msg.putHeader(name, hdr);
635 
636         if(trace) // +++ remove
637             log.trace("hdr=" + hdr);
638 
639         passDown(new Event(Event.MSG, msg));
640     }
641 
642 
643     /**
644      Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
645      then by multicasting a request to all members.
646      */
647     IpAddress fetchPingAddress(Address mbr) {
648         IpAddress ret;
649         Message ping_addr_req;
650         FdHeader hdr;
651 
652         if(mbr == null) {
653             if(log.isErrorEnabled()) log.error("mbr == null");
654             return null;
655         }
656         // 1. Try to get from cache. Add a little delay so that joining mbrs can send their socket address before
657         //    we ask them to do so
658         ret=(IpAddress)cache.get(mbr);
659         if(ret != null) {
660             return ret;
661         }
662 
663         Util.sleep(300);
664         if((ret=(IpAddress)cache.get(mbr)) != null)
665             return ret;
666 
667 
668         // 2. Try to get from mbr
669         ping_addr_promise.reset();
670         ping_addr_req=new Message(mbr, null, null); // unicast
671         hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
672         hdr.mbr=mbr;
673         ping_addr_req.putHeader(name, hdr);
674         passDown(new Event(Event.MSG, ping_addr_req));
675         ret=(IpAddress) ping_addr_promise.getResult(3000);
676         if(ret != null) {
677             return ret;
678         }
679 
680 
681         // 3. Try to get from all members
682         ping_addr_req=new Message(null, null, null); // multicast
683         hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
684         hdr.mbr=mbr;
685         ping_addr_req.putHeader(name, hdr);
686         passDown(new Event(Event.MSG, ping_addr_req));
687         ret=(IpAddress) ping_addr_promise.getResult(3000);
688         return ret;
689     }
690 
691 
692     Address determinePingDest() {
693         Address tmp;
694 
695         if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
696             return null;
697         for(int i=0; i < pingable_mbrs.size(); i++) {
698             tmp=(Address) pingable_mbrs.elementAt(i);
699             if(local_addr.equals(tmp)) {
700                 if(i + 1 >= pingable_mbrs.size())
701                     return (Address) pingable_mbrs.elementAt(0);
702                 else
703                     return (Address) pingable_mbrs.elementAt(i + 1);
704             }
705         }
706         return null;
707     }
708 
709 
710     Address determineCoordinator() {
711         return members.size() > 0 ? (Address) members.elementAt(0) : null;
712     }
713 
714 
715 
716 
717 
718     /* ------------------------------- End of Private Methods ------------------------------------ */
719 
720 
721     public static class FdHeader extends Header implements Streamable {
722         public static final byte SUSPECT=10;
723         public static final byte WHO_HAS_SOCK=11;
724         public static final byte I_HAVE_SOCK=12;
725         public static final byte GET_CACHE=13; // sent by joining member to coordinator
726         public static final byte GET_CACHE_RSP=14; // sent by coordinator to joining member in response to GET_CACHE
727 
728 
729         byte      type=SUSPECT;
730         Address   mbr=null;           // set on WHO_HAS_SOCK (requested mbr), I_HAVE_SOCK
731         IpAddress sock_addr;          // set on I_HAVE_SOCK
732 
733         // Hashtable<Address,IpAddress>
734         Hashtable cachedAddrs=null;   // set on GET_CACHE_RSP
735         Vector    mbrs=null;          // set on SUSPECT (list of suspected members)
736 
737 
738         public FdHeader() {
739         } // used for externalization
740 
741         public FdHeader(byte type) {
742             this.type=type;
743         }
744 
745         public FdHeader(byte type, Address mbr) {
746             this.type=type;
747             this.mbr=mbr;
748         }
749 
750         public FdHeader(byte type, Vector mbrs) {
751             this.type=type;
752             this.mbrs=mbrs;
753         }
754 
755         public FdHeader(byte type, Hashtable cachedAddrs) {
756             this.type=type;
757             this.cachedAddrs=cachedAddrs;
758         }
759 
760 
761         public String toString() {
762             StringBuffer sb=new StringBuffer();
763             sb.append(type2String(type));
764             if(mbr != null)
765                 sb.append(", mbr=").append(mbr);
766             if(sock_addr != null)
767                 sb.append(", sock_addr=").append(sock_addr);
768             if(cachedAddrs != null)
769                 sb.append(", cache=").append(cachedAddrs);
770             if(mbrs != null)
771                 sb.append(", mbrs=").append(mbrs);
772             return sb.toString();
773         }
774 
775 
776         public static String type2String(byte type) {
777             switch(type) {
778                 case SUSPECT:
779                     return "SUSPECT";
780                 case WHO_HAS_SOCK:
781                     return "WHO_HAS_SOCK";
782                 case I_HAVE_SOCK:
783                     return "I_HAVE_SOCK";
784                 case GET_CACHE:
785                     return "GET_CACHE";
786                 case GET_CACHE_RSP:
787                     return "GET_CACHE_RSP";
788                 default:
789                     return "unknown type (" + type + ')';
790             }
791         }
792 
793         public void writeExternal(ObjectOutput out) throws IOException {
794             out.writeByte(type);
795             out.writeObject(mbr);
796             out.writeObject(sock_addr);
797             out.writeObject(cachedAddrs);
798             out.writeObject(mbrs);
799         }
800 
801 
802         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
803             type=in.readByte();
804             mbr=(Address) in.readObject();
805             sock_addr=(IpAddress) in.readObject();
806             cachedAddrs=(Hashtable) in.readObject();
807             mbrs=(Vector) in.readObject();
808         }
809 
810         public long size() {
811             long retval=Global.BYTE_SIZE; // type
812             retval+=Util.size(mbr);
813             retval+=Util.size(sock_addr);
814 
815             retval+=Global.INT_SIZE; // cachedAddrs size
816             Map.Entry entry;
817             Address key;
818             IpAddress val;
819             if(cachedAddrs != null) {
820                 for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
821                     entry=(Map.Entry)it.next();
822                     if((key=(Address)entry.getKey()) != null)
823                         retval+=Util.size(key);
824                     retval+=Global.BYTE_SIZE; // presence for val
825                     if((val=(IpAddress)entry.getValue()) != null)
826                         retval+=val.size();
827                 }
828             }
829 
830             retval+=Global.INT_SIZE; // mbrs size
831             if(mbrs != null) {
832                 for(int i=0; i < mbrs.size(); i++) {
833                     retval+=Util.size((Address)mbrs.elementAt(i));
834                 }
835             }
836 
837             return retval;
838         }
839 
840         public void writeTo(DataOutputStream out) throws IOException {
841             int size;
842             out.writeByte(type);
843             Util.writeAddress(mbr, out);
844             Util.writeStreamable(sock_addr, out);
845             size=cachedAddrs != null? cachedAddrs.size() : 0;
846             out.writeInt(size);
847             if(size > 0) {
848                 for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
849                     Map.Entry entry=(Map.Entry)it.next();
850                     Address key=(Address)entry.getKey();
851                     IpAddress val=(IpAddress)entry.getValue();
852                     Util.writeAddress(key, out);
853                     Util.writeStreamable(val, out);
854                 }
855             }
856             size=mbrs != null? mbrs.size() : 0;
857             out.writeInt(size);
858             if(size > 0) {
859                 for(Iterator it=mbrs.iterator(); it.hasNext();) {
860                     Address address=(Address)it.next();
861                     Util.writeAddress(address, out);
862                 }
863             }
864         }
865 
866         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
867             int size;
868             type=in.readByte();
869             mbr=Util.readAddress(in);
870             sock_addr=(IpAddress)Util.readStreamable(IpAddress.class, in);
871             size=in.readInt();
872             if(size > 0) {
873                 if(cachedAddrs == null)
874                     cachedAddrs=new Hashtable();
875                 for(int i=0; i < size; i++) {
876                     Address key=Util.readAddress(in);
877                     IpAddress val=(IpAddress)Util.readStreamable(IpAddress.class, in);
878                     cachedAddrs.put(key, val);
879                 }
880             }
881             size=in.readInt();
882             if(size > 0) {
883                 if(mbrs == null)
884                     mbrs=new Vector();
885                 for(int i=0; i < size; i++) {
886                     Address addr=Util.readAddress(in);
887                     mbrs.add(addr);
888                 }
889             }
890         }
891 
892     }
893 
894 
895     /**
896      * Handles the server-side of a client-server socket connection. Waits until a client connects, and then loops
897      * until that client closes the connection. Note that there is no new thread spawned for the listening on the
898      * client socket, therefore there can only be 1 client connection at the same time. Subsequent clients attempting
899      * to create a connection will be blocked until the first client closes its connection. This should not be a problem
900      * as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
901      */
902     private class ServerSocketHandler implements Runnable {
903         Thread acceptor=null;
904         /** List<ClientConnectionHandler> */
905         final List clients=new ArrayList();
906 
907 
908 
909         ServerSocketHandler() {
910             start();
911         }
912 
913         void start() {
914             if(acceptor == null) {
915                 acceptor=new Thread(this, "ServerSocket acceptor thread");
916                 acceptor.setDaemon(true);
917                 acceptor.start();
918             }
919         }
920 
921 
922         void stop() {
923             if(acceptor != null && acceptor.isAlive()) {
924                 try {
925                     srv_sock.close(); // this will terminate thread, peer will receive SocketException (socket close)
926                 }
927                 catch(Exception ex) {
928                 }
929             }
930             synchronized(clients) {
931                 for(Iterator it=clients.iterator(); it.hasNext();) {
932                     ClientConnectionHandler handler=(ClientConnectionHandler)it.next();
933                     handler.stopThread();
934                 }
935                 clients.clear();
936             }
937             acceptor=null;
938         }
939 
940 
941         /** Only accepts 1 client connection at a time (saving threads) */
942         public void run() {
943             Socket client_sock;
944             while(acceptor != null && srv_sock != null) {
945                 try {
946                     if(trace) // +++ remove
947                         log.trace("waiting for client connections on " + srv_sock.getInetAddress() + ":" +
948                                   srv_sock.getLocalPort());
949                     client_sock=srv_sock.accept();
950                     if(trace) // +++ remove
951                         log.trace("accepted connection from " + client_sock.getInetAddress() + ':' + client_sock.getPort());
952                     ClientConnectionHandler client_conn_handler=new ClientConnectionHandler(client_sock, clients);
953                     synchronized(clients) {
954                         clients.add(client_conn_handler);
955                     }
956                     client_conn_handler.start();
957                 }
958                 catch(IOException io_ex2) {
959                     break;
960                 }
961             }
962             acceptor=null;
963         }
964     }
965 
966 
967 
968     /** Handles a client connection; multiple client can connect at the same time */
969     private static class ClientConnectionHandler extends Thread {
970         Socket      client_sock=null;
971         InputStream in;
972         final Object mutex=new Object();
973         List clients=new ArrayList();
974 
975         ClientConnectionHandler(Socket client_sock, List clients) {
976             setName("ClientConnectionHandler");
977             setDaemon(true);
978             this.client_sock=client_sock;
979             this.clients.addAll(clients);
980         }
981 
982         void stopThread() {
983             synchronized(mutex) {
984                 if(client_sock != null) {
985                     try {
986                         OutputStream out=client_sock.getOutputStream();
987                         out.write(NORMAL_TEMINATION);
988                     }
989                     catch(Throwable t) {
990                     }
991                 }
992             }
993             closeClientSocket();
994         }
995 
996         void closeClientSocket() {
997             synchronized(mutex) {
998                 if(client_sock != null) {
999                     try {
1000                        client_sock.close();
1001                    }
1002                    catch(Exception ex) {
1003                    }
1004                    client_sock=null;
1005                }
1006            }
1007        }
1008
1009        public void run() {
1010            try {
1011                synchronized(mutex) {
1012                    if(client_sock == null)
1013                        return;
1014                    in=client_sock.getInputStream();
1015                }
1016                while((in.read()) != -1) {
1017                }
1018            }
1019            catch(IOException io_ex1) {
1020            }
1021            finally {
1022                closeClientSocket();
1023                synchronized(clients) {
1024                    clients.remove(this);
1025                }
1026            }
1027        }
1028    }
1029
1030
1031    /**
1032     * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
1033     * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
1034     * sure they are retransmitted until a view has been received which doesn't contain the suspected members
1035     * any longer. Then the task terminates.
1036     */
1037    private class BroadcastTask implements TimeScheduler.Task {
1038        final Vector suspected_mbrs=new Vector(7);
1039        boolean stopped=false;
1040
1041
1042        /** Adds a suspected member. Starts the task if not yet running */
1043        public void addSuspectedMember(Address mbr) {
1044            if(mbr == null) return;
1045            if(!members.contains(mbr)) return;
1046            synchronized(suspected_mbrs) {
1047                if(!suspected_mbrs.contains(mbr)) {
1048                    suspected_mbrs.addElement(mbr);
1049                    if(log.isDebugEnabled()) log.debug("mbr=" + mbr + " (size=" + suspected_mbrs.size() + ')');
1050                }
1051                if(stopped && suspected_mbrs.size() > 0) {
1052                    stopped=false;
1053                    timer.add(this, true);
1054                }
1055            }
1056        }
1057
1058
1059        public void removeSuspectedMember(Address suspected_mbr) {
1060            if(suspected_mbr == null) return;
1061            if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr);
1062            synchronized(suspected_mbrs) {
1063                suspected_mbrs.removeElement(suspected_mbr);
1064                if(suspected_mbrs.size() == 0)
1065                    stopped=true;
1066            }
1067        }
1068
1069
1070        public void removeAll() {
1071            synchronized(suspected_mbrs) {
1072                suspected_mbrs.removeAllElements();
1073                stopped=true;
1074            }
1075        }
1076
1077
1078        /**
1079         * Removes all elements from suspected_mbrs that are <em>not</em> in the new membership
1080         */
1081        public void adjustSuspectedMembers(Vector new_mbrship) {
1082            Address suspected_mbr;
1083
1084            if(new_mbrship == null || new_mbrship.size() == 0) return;
1085            synchronized(suspected_mbrs) {
1086                for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
1087                    suspected_mbr=(Address) it.next();
1088                    if(!new_mbrship.contains(suspected_mbr)) {
1089                        it.remove();
1090                        if(log.isDebugEnabled())
1091                            log.debug("removed " + suspected_mbr + " (size=" + suspected_mbrs.size() + ')');
1092                    }
1093                }
1094                if(suspected_mbrs.size() == 0)
1095                    stopped=true;
1096            }
1097        }
1098
1099
1100        public boolean cancelled() {
1101            return stopped;
1102        }
1103
1104
1105        public long nextInterval() {
1106            return suspect_msg_interval;
1107        }
1108
1109
1110        public void run() {
1111            Message suspect_msg;
1112            FdHeader hdr;
1113
1114            if(log.isDebugEnabled())
1115                log.debug("broadcasting SUSPECT message (suspected_mbrs=" + suspected_mbrs + ") to group");
1116
1117            synchronized(suspected_mbrs) {
1118                if(suspected_mbrs.size() == 0) {
1119                    stopped=true;
1120                    if(log.isDebugEnabled()) log.debug("task done (no suspected members)");
1121                    return;
1122                }
1123
1124                hdr=new FdHeader(FdHeader.SUSPECT);
1125                hdr.mbrs=(Vector) suspected_mbrs.clone();
1126            }
1127            suspect_msg=new Message();       // mcast SUSPECT to all members
1128            suspect_msg.putHeader(name, hdr);
1129            passDown(new Event(Event.MSG, suspect_msg));
1130            if(log.isDebugEnabled()) log.debug("task done");
1131        }
1132    }
1133
1134
1135}