Source code: org/jgroups/protocols/FD_SOCK.java
1 // $Id: FD_SOCK.java,v 1.30 2005/10/19 12:12:56 belaban Exp $
2
3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.IpAddress;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.*;
9
10 import java.io.*;
11 import java.net.ServerSocket;
12 import java.net.Socket;
13 import java.net.InetAddress;
14 import java.net.UnknownHostException;
15 import java.util.*;
16 import java.util.List;
17
18
19 /**
20 * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a
21 * server socket and announces its address together with the server socket's address in a multicast. A
22 * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below
23 * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When
24 * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be
25 * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between
26 * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity
27 * will take place between 2 peers as long as they are alive (i.e. have their server sockets open).
28 * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore
29 * they won't be detected.
30 * The FD_SOCK protocol will work for groups where members are on different hosts<p>
31 * The costs involved are 2 additional threads: one that
32 * monitors the client side of the socket connection (to monitor a peer) and another one that manages the
33 * server socket. However, those threads will be idle as long as both peers are running.
34 * @author Bela Ban May 29 2001
35 */
36 public class FD_SOCK extends Protocol implements Runnable {
37 long get_cache_timeout=3000; // msecs to wait for the socket cache from the coordinator
38 final long get_cache_retry_timeout=500; // msecs to wait until we retry getting the cache from coord
39 long suspect_msg_interval=5000; // (BroadcastTask): mcast SUSPECT every 5000 msecs
40 int num_tries=3; // attempts coord is solicited for socket cache until we give up
41 final Vector members=new Vector(11); // list of group members (updated on VIEW_CHANGE)
42 boolean srv_sock_sent=false; // has own socket been broadcast yet ?
43 final Vector pingable_mbrs=new Vector(11); // mbrs from which we select ping_dest. may be subset of 'members'
44 final Promise get_cache_promise=new Promise(); // used for rendezvous on GET_CACHE and GET_CACHE_RSP
45 boolean got_cache_from_coord=false; // was cache already fetched ?
46 Address local_addr=null; // our own address
47 ServerSocket srv_sock=null; // server socket to which another member connects to monitor me
48 InetAddress srv_sock_bind_addr=null; // the NIC on which the ServerSocket should listen
49 ServerSocketHandler srv_sock_handler=null; // accepts new connections on srv_sock
50 IpAddress srv_sock_addr=null; // pair of server_socket:port
51 Address ping_dest=null; // address of the member we monitor
52 Socket ping_sock=null; // socket to the member we monitor
53 InputStream ping_input=null; // input stream of the socket to the member we monitor
54 Thread pinger_thread=null; // listens on ping_sock, suspects member if socket is closed
55 final Hashtable cache=new Hashtable(11); // keys=Addresses, vals=IpAddresses (socket:port)
56
57 /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)
58 * picks a random port */
59 int start_port=0;
60 final Promise ping_addr_promise=new Promise(); // to fetch the ping_addr for ping_dest
61 final Object sock_mutex=new Object(); // for access to ping_sock, ping_input
62 TimeScheduler timer=null;
63 final BroadcastTask bcast_task=new BroadcastTask(); // to transmit SUSPECT message (until view change)
64 boolean regular_sock_close=false; // used by interruptPingerThread() when new ping_dest is computed
65 int num_suspect_events=0;
66 private static final int NORMAL_TEMINATION=9;
67 private static final int ABNORMAL_TEMINATION=-1;
68 private static final String name="FD_SOCK";
69
70 BoundedList suspect_history=new BoundedList(20);
71
72
73 public String getName() {
74 return name;
75 }
76
77 public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
78 public String getMembers() {return members != null? members.toString() : "null";}
79 public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}
80 public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}
81 public int getNumSuspectEventsGenerated() {return num_suspect_events;}
82 public String printSuspectHistory() {
83 StringBuffer sb=new StringBuffer();
84 for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {
85 sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");
86 }
87 return sb.toString();
88 }
89
90 public boolean setProperties(Properties props) {
91 String str, tmp=null;
92
93 super.setProperties(props);
94 str=props.getProperty("get_cache_timeout");
95 if(str != null) {
96 get_cache_timeout=Long.parseLong(str);
97 props.remove("get_cache_timeout");
98 }
99
100 str=props.getProperty("suspect_msg_interval");
101 if(str != null) {
102 suspect_msg_interval=Long.parseLong(str);
103 props.remove("suspect_msg_interval");
104 }
105
106 str=props.getProperty("num_tries");
107 if(str != null) {
108 num_tries=Integer.parseInt(str);
109 props.remove("num_tries");
110 }
111
112 str=props.getProperty("start_port");
113 if(str != null) {
114 start_port=Integer.parseInt(str);
115 props.remove("start_port");
116 }
117
118
119 // PropertyPermission not granted if running in an untrusted environment with JNLP.
120 try {
121 tmp=System.getProperty("bind.address");
122 if(Util.isBindAddressPropertyIgnored()) {
123 tmp=null;
124 }
125 }
126 catch (SecurityException ex){
127 }
128
129 if(tmp != null)
130 str=tmp;
131 else
132 str=props.getProperty("srv_sock_bind_addr");
133 if(str != null) {
134 try {
135 srv_sock_bind_addr=InetAddress.getByName(str);
136 }
137 catch(UnknownHostException unknown) {
138 if(log.isFatalEnabled()) log.fatal("(srv_sock_bind_addr): host " + str + " not known");
139 return false;
140 }
141 props.remove("srv_sock_bind_addr");
142 }
143
144
145 if(props.size() > 0) {
146 log.error("FD_SOCK.setProperties(): the following properties are not recognized: " + props);
147 return false;
148 }
149 return true;
150 }
151
152
153 public void init() throws Exception {
154 srv_sock_handler=new ServerSocketHandler();
155 timer=stack != null ? stack.timer : null;
156 if(timer == null)
157 throw new Exception("FD_SOCK.init(): timer == null");
158 }
159
160
161 public void stop() {
162 bcast_task.removeAll();
163 stopPingerThread();
164 stopServerSocket();
165 }
166
167 public void resetStats() {
168 super.resetStats();
169 num_suspect_events=0;
170 suspect_history.removeAll();
171 }
172
173
174 public void up(Event evt) {
175 Message msg;
176 FdHeader hdr;
177
178 switch(evt.getType()) {
179
180 case Event.SET_LOCAL_ADDRESS:
181 local_addr=(Address) evt.getArg();
182 break;
183
184 case Event.MSG:
185 msg=(Message) evt.getArg();
186 hdr=(FdHeader) msg.removeHeader(name);
187 if(hdr == null)
188 break; // message did not originate from FD_SOCK layer, just pass up
189
190 switch(hdr.type) {
191
192 case FdHeader.SUSPECT:
193 if(hdr.mbrs != null) {
194 if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr);
195 for(int i=0; i < hdr.mbrs.size(); i++) {
196 passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
197 passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
198 }
199 }
200 else
201 if(warn) log.warn("[SUSPECT]: hdr.mbrs == null");
202 break;
203
204 // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it
205 case FdHeader.WHO_HAS_SOCK:
206 if(local_addr != null && local_addr.equals(msg.getSrc()))
207 return; // don't reply to WHO_HAS bcasts sent by me !
208
209 if(hdr.mbr == null) {
210 if(log.isErrorEnabled()) log.error("hdr.mbr is null");
211 return;
212 }
213
214 if(trace) log.trace("who-has-sock " + hdr.mbr);
215
216 // 1. Try my own address, maybe it's me whose socket is wanted
217 if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) {
218 sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr); // unicast message to msg.getSrc()
219 return;
220 }
221
222 // 2. If I don't have it, maybe it is in the cache
223 if(cache.containsKey(hdr.mbr))
224 sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr)); // ucast msg
225 break;
226
227
228 // Update the cache with the addr:sock_addr entry (if on the same host)
229 case FdHeader.I_HAVE_SOCK:
230 if(hdr.mbr == null || hdr.sock_addr == null) {
231 if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");
232 return;
233 }
234
235 // if(!cache.containsKey(hdr.mbr))
236 cache.put(hdr.mbr, hdr.sock_addr); // update the cache
237 if(trace) log.trace("i-have-sock: " + hdr.mbr + " --> " +
238 hdr.sock_addr + " (cache is " + cache + ')');
239
240 if(ping_dest != null && hdr.mbr.equals(ping_dest))
241 ping_addr_promise.setResult(hdr.sock_addr);
242 break;
243
244 // Return the cache to the sender of this message
245 case FdHeader.GET_CACHE:
246 if(hdr.mbr == null) {
247 if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null");
248 return;
249 }
250 hdr=new FdHeader(FdHeader.GET_CACHE_RSP);
251 hdr.cachedAddrs=(Hashtable) cache.clone();
252 msg=new Message(hdr.mbr, null, null);
253 msg.putHeader(name, hdr);
254 passDown(new Event(Event.MSG, msg));
255 break;
256
257 case FdHeader.GET_CACHE_RSP:
258 if(hdr.cachedAddrs == null) {
259 if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null");
260 return;
261 }
262 get_cache_promise.setResult(hdr.cachedAddrs);
263 break;
264 }
265 return;
266 }
267
268 passUp(evt); // pass up to the layer above us
269 }
270
271
272 public void down(Event evt) {
273 Address mbr, tmp_ping_dest;
274 View v;
275
276 switch(evt.getType()) {
277
278 case Event.UNSUSPECT:
279 bcast_task.removeSuspectedMember((Address)evt.getArg());
280 break;
281
282 case Event.CONNECT:
283 passDown(evt);
284 srv_sock=Util.createServerSocket(srv_sock_bind_addr, start_port); // grab a random unused port above 10000
285 srv_sock_addr=new IpAddress(srv_sock_bind_addr, srv_sock.getLocalPort());
286 startServerSocket();
287 //if(pinger_thread == null)
288 // startPingerThread();
289 break;
290
291 case Event.VIEW_CHANGE:
292 synchronized(this) {
293 v=(View) evt.getArg();
294 members.removeAllElements();
295 members.addAll(v.getMembers());
296 bcast_task.adjustSuspectedMembers(members);
297 pingable_mbrs.removeAllElements();
298 pingable_mbrs.addAll(members);
299 passDown(evt);
300
301 if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members);
302
303 // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
304 if(!got_cache_from_coord) {
305 getCacheFromCoordinator();
306 got_cache_from_coord=true;
307 }
308
309
310 // 2. Broadcast my own addr:sock to all members so they can update their cache
311 if(!srv_sock_sent) {
312 if(srv_sock_addr != null) {
313 sendIHaveSockMessage(null, // send to all members
314 local_addr,
315 srv_sock_addr);
316 srv_sock_sent=true;
317 }
318 else
319 if(warn) log.warn("(VIEW_CHANGE): srv_sock_addr == null");
320 }
321
322 // 3. Remove all entries in 'cache' which are not in the new membership
323 for(Enumeration e=cache.keys(); e.hasMoreElements();) {
324 mbr=(Address) e.nextElement();
325 if(!members.contains(mbr))
326 cache.remove(mbr);
327 }
328
329 if(members.size() > 1) {
330 if(pinger_thread != null && pinger_thread.isAlive()) {
331 tmp_ping_dest=determinePingDest();
332 if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) {
333 interruptPingerThread(); // allows the thread to use the new socket
334 }
335 }
336 else
337 startPingerThread(); // only starts if not yet running
338 }
339 else {
340 ping_dest=null;
341 stopPingerThread();
342 }
343 }
344 break;
345
346 default:
347 passDown(evt);
348 break;
349 }
350 }
351
352
353 /**
354 * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its
355 * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on
356 * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,
357 * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
358 * there are fewer than 2 members).
359 */
360 public void run() {
361 Address tmp_ping_dest;
362 IpAddress ping_addr;
363 int max_fetch_tries=10; // number of times a socket address is to be requested before giving up
364
365 if(trace) log.trace("pinger_thread started"); // +++ remove
366
367 while(pinger_thread != null) {
368 tmp_ping_dest=determinePingDest(); // gets the neighbor to our right
369 if(log.isDebugEnabled())
370 log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs);
371 if(tmp_ping_dest == null) {
372 ping_dest=null;
373 pinger_thread=null;
374 break;
375 }
376 ping_dest=tmp_ping_dest;
377 ping_addr=fetchPingAddress(ping_dest);
378 if(ping_addr == null) {
379 if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying");
380 if(--max_fetch_tries <= 0)
381 break;
382 Util.sleep(2000);
383 continue;
384 }
385
386 if(!setupPingSocket(ping_addr)) {
387 // covers use cases #7 and #8 in GmsTests.txt
388 if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest);
389 broadcastSuspectMessage(ping_dest);
390 pingable_mbrs.removeElement(ping_dest);
391 continue;
392 }
393
394 if(log.isDebugEnabled()) log.debug("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache);
395
396 // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception
397 try {
398 if(ping_input != null) {
399 int c=ping_input.read();
400 switch(c) {
401 case NORMAL_TEMINATION:
402 if(log.isDebugEnabled())
403 log.debug("peer closed socket normally");
404 pinger_thread=null;
405 break;
406 case ABNORMAL_TEMINATION:
407 handleSocketClose(null);
408 break;
409 default:
410 break;
411 }
412 }
413 }
414 catch(IOException ex) { // we got here when the peer closed the socket --> suspect peer and then continue
415 handleSocketClose(ex);
416 }
417 catch(Throwable catch_all_the_rest) {
418 log.error("exception", catch_all_the_rest);
419 }
420 }
421 if(log.isDebugEnabled()) log.debug("pinger thread terminated");
422 pinger_thread=null;
423 }
424
425
426
427
428 /* ----------------------------------- Private Methods -------------------------------------- */
429
430
431 void handleSocketClose(Exception ex) {
432 teardownPingSocket(); // make sure we have no leftovers
433 if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
434 if(log.isDebugEnabled())
435 log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')');
436 broadcastSuspectMessage(ping_dest);
437 pingable_mbrs.removeElement(ping_dest);
438 }
439 else {
440 if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset");
441 regular_sock_close=false;
442 }
443 }
444
445
446 void startPingerThread() {
447 if(pinger_thread == null) {
448 pinger_thread=new Thread(this, "FD_SOCK Ping thread");
449 pinger_thread.setDaemon(true);
450 pinger_thread.start();
451 }
452 }
453
454
455 void stopPingerThread() {
456 if(pinger_thread != null && pinger_thread.isAlive()) {
457 regular_sock_close=true;
458 teardownPingSocket();
459 }
460 pinger_thread=null;
461 }
462
463
464 /**
465 * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1
466 * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are
467 * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>
468 * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this
469 * code portable and we don't have to check for OSs.
470 * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().
471 */
472 void interruptPingerThread() {
473 if(pinger_thread != null && pinger_thread.isAlive()) {
474 regular_sock_close=true;
475 teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
476 }
477 }
478
479 void startServerSocket() {
480 if(srv_sock_handler != null)
481 srv_sock_handler.start(); // won't start if already running
482 }
483
484 void stopServerSocket() {
485 if(srv_sock_handler != null)
486 srv_sock_handler.stop();
487 }
488
489
490 /**
491 * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
492 */
493 boolean setupPingSocket(IpAddress dest) {
494 synchronized(sock_mutex) {
495 if(dest == null) {
496 if(log.isErrorEnabled()) log.error("destination address is null");
497 return false;
498 }
499 try {
500 ping_sock=new Socket(dest.getIpAddress(), dest.getPort());
501 ping_sock.setSoLinger(true, 1);
502 ping_input=ping_sock.getInputStream();
503 return true;
504 }
505 catch(Throwable ex) {
506 return false;
507 }
508 }
509 }
510
511
512 void teardownPingSocket() {
513 synchronized(sock_mutex) {
514 if(ping_sock != null) {
515 try {
516 ping_sock.shutdownInput();
517 ping_sock.close();
518 }
519 catch(Exception ex) {
520 }
521 ping_sock=null;
522 }
523 if(ping_input != null) {
524 try {
525 ping_input.close();
526 }
527 catch(Exception ex) {
528 }
529 ping_input=null;
530 }
531 }
532 }
533
534
535 /**
536 * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
537 * to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
538 */
539 void getCacheFromCoordinator() {
540 Address coord;
541 int attempts=num_tries;
542 Message msg;
543 FdHeader hdr;
544 Hashtable result;
545
546 get_cache_promise.reset();
547 while(attempts > 0) {
548 if((coord=determineCoordinator()) != null) {
549 if(coord.equals(local_addr)) { // we are the first member --> empty cache
550 if(log.isDebugEnabled()) log.debug("first member; cache is empty");
551 return;
552 }
553 hdr=new FdHeader(FdHeader.GET_CACHE);
554 hdr.mbr=local_addr;
555 msg=new Message(coord, null, null);
556 msg.putHeader(name, hdr);
557 passDown(new Event(Event.MSG, msg));
558 result=(Hashtable) get_cache_promise.getResult(get_cache_timeout);
559 if(result != null) {
560 cache.putAll(result); // replace all entries (there should be none !) in cache with the new values
561 if(trace) log.trace("got cache from " + coord + ": cache is " + cache);
562 return;
563 }
564 else {
565 if(log.isErrorEnabled()) log.error("received null cache; retrying");
566 }
567 }
568
569 Util.sleep(get_cache_retry_timeout);
570 --attempts;
571 }
572 }
573
574
575 /**
576 * Sends a SUSPECT message to all group members. Only the coordinator (or the next member in line if the coord
577 * itself is suspected) will react to this message by installing a new view. To overcome the unreliability
578 * of the SUSPECT message (it may be lost because we are not above any retransmission layer), the following scheme
579 * is used: after sending the SUSPECT message, it is also added to the broadcast task, which will periodically
580 * re-send the SUSPECT until a view is received in which the suspected process is not a member anymore. The reason is
581 * that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
582 * react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
583 */
584 void broadcastSuspectMessage(Address suspected_mbr) {
585 Message suspect_msg;
586 FdHeader hdr;
587
588 if(suspected_mbr == null) return;
589
590 if(trace) log.trace("suspecting " + suspected_mbr + " (own address is " + local_addr + ')');
591
592 // 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
593 hdr=new FdHeader(FdHeader.SUSPECT);
594 hdr.mbrs=new Vector(1);
595 hdr.mbrs.addElement(suspected_mbr);
596 suspect_msg=new Message();
597 suspect_msg.putHeader(name, hdr);
598 passDown(new Event(Event.MSG, suspect_msg));
599
600 // 2. Add to broadcast task and start latter (if not yet running). The task will end when
601 // suspected members are removed from the membership
602 bcast_task.addSuspectedMember(suspected_mbr);
603 if(stats) {
604 num_suspect_events++;
605 suspect_history.add(suspected_mbr);
606 }
607 }
608
609
610 void broadcastWhoHasSockMessage(Address mbr) {
611 Message msg;
612 FdHeader hdr;
613
614 if(local_addr != null && mbr != null)
615 if(log.isDebugEnabled()) log.debug("[" + local_addr + "]: who-has " + mbr);
616
617 msg=new Message(); // bcast msg
618 hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
619 hdr.mbr=mbr;
620 msg.putHeader(name, hdr);
621 passDown(new Event(Event.MSG, msg));
622 }
623
624
625 /**
626 Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
627 it will be unicast back to the requester
628 */
629 void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
630 Message msg=new Message(dst, null, null);
631 FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK);
632 hdr.mbr=mbr;
633 hdr.sock_addr=addr;
634 msg.putHeader(name, hdr);
635
636 if(trace) // +++ remove
637 log.trace("hdr=" + hdr);
638
639 passDown(new Event(Event.MSG, msg));
640 }
641
642
643 /**
644 Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
645 then by multicasting a request to all members.
646 */
647 IpAddress fetchPingAddress(Address mbr) {
648 IpAddress ret;
649 Message ping_addr_req;
650 FdHeader hdr;
651
652 if(mbr == null) {
653 if(log.isErrorEnabled()) log.error("mbr == null");
654 return null;
655 }
656 // 1. Try to get from cache. Add a little delay so that joining mbrs can send their socket address before
657 // we ask them to do so
658 ret=(IpAddress)cache.get(mbr);
659 if(ret != null) {
660 return ret;
661 }
662
663 Util.sleep(300);
664 if((ret=(IpAddress)cache.get(mbr)) != null)
665 return ret;
666
667
668 // 2. Try to get from mbr
669 ping_addr_promise.reset();
670 ping_addr_req=new Message(mbr, null, null); // unicast
671 hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
672 hdr.mbr=mbr;
673 ping_addr_req.putHeader(name, hdr);
674 passDown(new Event(Event.MSG, ping_addr_req));
675 ret=(IpAddress) ping_addr_promise.getResult(3000);
676 if(ret != null) {
677 return ret;
678 }
679
680
681 // 3. Try to get from all members
682 ping_addr_req=new Message(null, null, null); // multicast
683 hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
684 hdr.mbr=mbr;
685 ping_addr_req.putHeader(name, hdr);
686 passDown(new Event(Event.MSG, ping_addr_req));
687 ret=(IpAddress) ping_addr_promise.getResult(3000);
688 return ret;
689 }
690
691
692 Address determinePingDest() {
693 Address tmp;
694
695 if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
696 return null;
697 for(int i=0; i < pingable_mbrs.size(); i++) {
698 tmp=(Address) pingable_mbrs.elementAt(i);
699 if(local_addr.equals(tmp)) {
700 if(i + 1 >= pingable_mbrs.size())
701 return (Address) pingable_mbrs.elementAt(0);
702 else
703 return (Address) pingable_mbrs.elementAt(i + 1);
704 }
705 }
706 return null;
707 }
708
709
710 Address determineCoordinator() {
711 return members.size() > 0 ? (Address) members.elementAt(0) : null;
712 }
713
714
715
716
717
718 /* ------------------------------- End of Private Methods ------------------------------------ */
719
720
721 public static class FdHeader extends Header implements Streamable {
722 public static final byte SUSPECT=10;
723 public static final byte WHO_HAS_SOCK=11;
724 public static final byte I_HAVE_SOCK=12;
725 public static final byte GET_CACHE=13; // sent by joining member to coordinator
726 public static final byte GET_CACHE_RSP=14; // sent by coordinator to joining member in response to GET_CACHE
727
728
729 byte type=SUSPECT;
730 Address mbr=null; // set on WHO_HAS_SOCK (requested mbr), I_HAVE_SOCK
731 IpAddress sock_addr; // set on I_HAVE_SOCK
732
733 // Hashtable<Address,IpAddress>
734 Hashtable cachedAddrs=null; // set on GET_CACHE_RSP
735 Vector mbrs=null; // set on SUSPECT (list of suspected members)
736
737
738 public FdHeader() {
739 } // used for externalization
740
741 public FdHeader(byte type) {
742 this.type=type;
743 }
744
745 public FdHeader(byte type, Address mbr) {
746 this.type=type;
747 this.mbr=mbr;
748 }
749
750 public FdHeader(byte type, Vector mbrs) {
751 this.type=type;
752 this.mbrs=mbrs;
753 }
754
755 public FdHeader(byte type, Hashtable cachedAddrs) {
756 this.type=type;
757 this.cachedAddrs=cachedAddrs;
758 }
759
760
761 public String toString() {
762 StringBuffer sb=new StringBuffer();
763 sb.append(type2String(type));
764 if(mbr != null)
765 sb.append(", mbr=").append(mbr);
766 if(sock_addr != null)
767 sb.append(", sock_addr=").append(sock_addr);
768 if(cachedAddrs != null)
769 sb.append(", cache=").append(cachedAddrs);
770 if(mbrs != null)
771 sb.append(", mbrs=").append(mbrs);
772 return sb.toString();
773 }
774
775
776 public static String type2String(byte type) {
777 switch(type) {
778 case SUSPECT:
779 return "SUSPECT";
780 case WHO_HAS_SOCK:
781 return "WHO_HAS_SOCK";
782 case I_HAVE_SOCK:
783 return "I_HAVE_SOCK";
784 case GET_CACHE:
785 return "GET_CACHE";
786 case GET_CACHE_RSP:
787 return "GET_CACHE_RSP";
788 default:
789 return "unknown type (" + type + ')';
790 }
791 }
792
793 public void writeExternal(ObjectOutput out) throws IOException {
794 out.writeByte(type);
795 out.writeObject(mbr);
796 out.writeObject(sock_addr);
797 out.writeObject(cachedAddrs);
798 out.writeObject(mbrs);
799 }
800
801
802 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
803 type=in.readByte();
804 mbr=(Address) in.readObject();
805 sock_addr=(IpAddress) in.readObject();
806 cachedAddrs=(Hashtable) in.readObject();
807 mbrs=(Vector) in.readObject();
808 }
809
810 public long size() {
811 long retval=Global.BYTE_SIZE; // type
812 retval+=Util.size(mbr);
813 retval+=Util.size(sock_addr);
814
815 retval+=Global.INT_SIZE; // cachedAddrs size
816 Map.Entry entry;
817 Address key;
818 IpAddress val;
819 if(cachedAddrs != null) {
820 for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
821 entry=(Map.Entry)it.next();
822 if((key=(Address)entry.getKey()) != null)
823 retval+=Util.size(key);
824 retval+=Global.BYTE_SIZE; // presence for val
825 if((val=(IpAddress)entry.getValue()) != null)
826 retval+=val.size();
827 }
828 }
829
830 retval+=Global.INT_SIZE; // mbrs size
831 if(mbrs != null) {
832 for(int i=0; i < mbrs.size(); i++) {
833 retval+=Util.size((Address)mbrs.elementAt(i));
834 }
835 }
836
837 return retval;
838 }
839
840 public void writeTo(DataOutputStream out) throws IOException {
841 int size;
842 out.writeByte(type);
843 Util.writeAddress(mbr, out);
844 Util.writeStreamable(sock_addr, out);
845 size=cachedAddrs != null? cachedAddrs.size() : 0;
846 out.writeInt(size);
847 if(size > 0) {
848 for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
849 Map.Entry entry=(Map.Entry)it.next();
850 Address key=(Address)entry.getKey();
851 IpAddress val=(IpAddress)entry.getValue();
852 Util.writeAddress(key, out);
853 Util.writeStreamable(val, out);
854 }
855 }
856 size=mbrs != null? mbrs.size() : 0;
857 out.writeInt(size);
858 if(size > 0) {
859 for(Iterator it=mbrs.iterator(); it.hasNext();) {
860 Address address=(Address)it.next();
861 Util.writeAddress(address, out);
862 }
863 }
864 }
865
866 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
867 int size;
868 type=in.readByte();
869 mbr=Util.readAddress(in);
870 sock_addr=(IpAddress)Util.readStreamable(IpAddress.class, in);
871 size=in.readInt();
872 if(size > 0) {
873 if(cachedAddrs == null)
874 cachedAddrs=new Hashtable();
875 for(int i=0; i < size; i++) {
876 Address key=Util.readAddress(in);
877 IpAddress val=(IpAddress)Util.readStreamable(IpAddress.class, in);
878 cachedAddrs.put(key, val);
879 }
880 }
881 size=in.readInt();
882 if(size > 0) {
883 if(mbrs == null)
884 mbrs=new Vector();
885 for(int i=0; i < size; i++) {
886 Address addr=Util.readAddress(in);
887 mbrs.add(addr);
888 }
889 }
890 }
891
892 }
893
894
895 /**
896 * Handles the server-side of a client-server socket connection. Waits until a client connects, and then loops
897 * until that client closes the connection. Note that there is no new thread spawned for the listening on the
898 * client socket, therefore there can only be 1 client connection at the same time. Subsequent clients attempting
899 * to create a connection will be blocked until the first client closes its connection. This should not be a problem
900 * as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
901 */
902 private class ServerSocketHandler implements Runnable {
903 Thread acceptor=null;
904 /** List<ClientConnectionHandler> */
905 final List clients=new ArrayList();
906
907
908
909 ServerSocketHandler() {
910 start();
911 }
912
913 void start() {
914 if(acceptor == null) {
915 acceptor=new Thread(this, "ServerSocket acceptor thread");
916 acceptor.setDaemon(true);
917 acceptor.start();
918 }
919 }
920
921
922 void stop() {
923 if(acceptor != null && acceptor.isAlive()) {
924 try {
925 srv_sock.close(); // this will terminate thread, peer will receive SocketException (socket close)
926 }
927 catch(Exception ex) {
928 }
929 }
930 synchronized(clients) {
931 for(Iterator it=clients.iterator(); it.hasNext();) {
932 ClientConnectionHandler handler=(ClientConnectionHandler)it.next();
933 handler.stopThread();
934 }
935 clients.clear();
936 }
937 acceptor=null;
938 }
939
940
941 /** Only accepts 1 client connection at a time (saving threads) */
942 public void run() {
943 Socket client_sock;
944 while(acceptor != null && srv_sock != null) {
945 try {
946 if(trace) // +++ remove
947 log.trace("waiting for client connections on " + srv_sock.getInetAddress() + ":" +
948 srv_sock.getLocalPort());
949 client_sock=srv_sock.accept();
950 if(trace) // +++ remove
951 log.trace("accepted connection from " + client_sock.getInetAddress() + ':' + client_sock.getPort());
952 ClientConnectionHandler client_conn_handler=new ClientConnectionHandler(client_sock, clients);
953 synchronized(clients) {
954 clients.add(client_conn_handler);
955 }
956 client_conn_handler.start();
957 }
958 catch(IOException io_ex2) {
959 break;
960 }
961 }
962 acceptor=null;
963 }
964 }
965
966
967
968 /** Handles a client connection; multiple client can connect at the same time */
969 private static class ClientConnectionHandler extends Thread {
970 Socket client_sock=null;
971 InputStream in;
972 final Object mutex=new Object();
973 List clients=new ArrayList();
974
975 ClientConnectionHandler(Socket client_sock, List clients) {
976 setName("ClientConnectionHandler");
977 setDaemon(true);
978 this.client_sock=client_sock;
979 this.clients.addAll(clients);
980 }
981
982 void stopThread() {
983 synchronized(mutex) {
984 if(client_sock != null) {
985 try {
986 OutputStream out=client_sock.getOutputStream();
987 out.write(NORMAL_TEMINATION);
988 }
989 catch(Throwable t) {
990 }
991 }
992 }
993 closeClientSocket();
994 }
995
996 void closeClientSocket() {
997 synchronized(mutex) {
998 if(client_sock != null) {
999 try {
1000 client_sock.close();
1001 }
1002 catch(Exception ex) {
1003 }
1004 client_sock=null;
1005 }
1006 }
1007 }
1008
1009 public void run() {
1010 try {
1011 synchronized(mutex) {
1012 if(client_sock == null)
1013 return;
1014 in=client_sock.getInputStream();
1015 }
1016 while((in.read()) != -1) {
1017 }
1018 }
1019 catch(IOException io_ex1) {
1020 }
1021 finally {
1022 closeClientSocket();
1023 synchronized(clients) {
1024 clients.remove(this);
1025 }
1026 }
1027 }
1028 }
1029
1030
1031 /**
1032 * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
1033 * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
1034 * sure they are retransmitted until a view has been received which doesn't contain the suspected members
1035 * any longer. Then the task terminates.
1036 */
1037 private class BroadcastTask implements TimeScheduler.Task {
1038 final Vector suspected_mbrs=new Vector(7);
1039 boolean stopped=false;
1040
1041
1042 /** Adds a suspected member. Starts the task if not yet running */
1043 public void addSuspectedMember(Address mbr) {
1044 if(mbr == null) return;
1045 if(!members.contains(mbr)) return;
1046 synchronized(suspected_mbrs) {
1047 if(!suspected_mbrs.contains(mbr)) {
1048 suspected_mbrs.addElement(mbr);
1049 if(log.isDebugEnabled()) log.debug("mbr=" + mbr + " (size=" + suspected_mbrs.size() + ')');
1050 }
1051 if(stopped && suspected_mbrs.size() > 0) {
1052 stopped=false;
1053 timer.add(this, true);
1054 }
1055 }
1056 }
1057
1058
1059 public void removeSuspectedMember(Address suspected_mbr) {
1060 if(suspected_mbr == null) return;
1061 if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr);
1062 synchronized(suspected_mbrs) {
1063 suspected_mbrs.removeElement(suspected_mbr);
1064 if(suspected_mbrs.size() == 0)
1065 stopped=true;
1066 }
1067 }
1068
1069
1070 public void removeAll() {
1071 synchronized(suspected_mbrs) {
1072 suspected_mbrs.removeAllElements();
1073 stopped=true;
1074 }
1075 }
1076
1077
1078 /**
1079 * Removes all elements from suspected_mbrs that are <em>not</em> in the new membership
1080 */
1081 public void adjustSuspectedMembers(Vector new_mbrship) {
1082 Address suspected_mbr;
1083
1084 if(new_mbrship == null || new_mbrship.size() == 0) return;
1085 synchronized(suspected_mbrs) {
1086 for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
1087 suspected_mbr=(Address) it.next();
1088 if(!new_mbrship.contains(suspected_mbr)) {
1089 it.remove();
1090 if(log.isDebugEnabled())
1091 log.debug("removed " + suspected_mbr + " (size=" + suspected_mbrs.size() + ')');
1092 }
1093 }
1094 if(suspected_mbrs.size() == 0)
1095 stopped=true;
1096 }
1097 }
1098
1099
1100 public boolean cancelled() {
1101 return stopped;
1102 }
1103
1104
1105 public long nextInterval() {
1106 return suspect_msg_interval;
1107 }
1108
1109
1110 public void run() {
1111 Message suspect_msg;
1112 FdHeader hdr;
1113
1114 if(log.isDebugEnabled())
1115 log.debug("broadcasting SUSPECT message (suspected_mbrs=" + suspected_mbrs + ") to group");
1116
1117 synchronized(suspected_mbrs) {
1118 if(suspected_mbrs.size() == 0) {
1119 stopped=true;
1120 if(log.isDebugEnabled()) log.debug("task done (no suspected members)");
1121 return;
1122 }
1123
1124 hdr=new FdHeader(FdHeader.SUSPECT);
1125 hdr.mbrs=(Vector) suspected_mbrs.clone();
1126 }
1127 suspect_msg=new Message(); // mcast SUSPECT to all members
1128 suspect_msg.putHeader(name, hdr);
1129 passDown(new Event(Event.MSG, suspect_msg));
1130 if(log.isDebugEnabled()) log.debug("task done");
1131 }
1132 }
1133
1134
1135}