Source code: org/jgroups/protocols/VERIFY_SUSPECT.java
1 // $Id: VERIFY_SUSPECT.java,v 1.15 2005/08/11 12:43:47 belaban Exp $
2
3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Header;
8 import org.jgroups.Message;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.Util;
11 import org.jgroups.util.Streamable;
12
13 import java.io.*;
14 import java.util.Enumeration;
15 import java.util.Hashtable;
16 import java.util.Properties;
17 import java.util.Vector;
18
19
20 /**
21 * Catches SUSPECT events traveling up the stack. Verifies that the suspected member is really dead. If yes,
22 * passes SUSPECT event up the stack, otherwise discards it. Has to be placed somewhere above the FD layer and
23 * below the GMS layer (receiver of the SUSPECT event). Note that SUSPECT events may be reordered by this protocol.
24 */
25 public class VERIFY_SUSPECT extends Protocol implements Runnable {
26 Address local_addr=null;
27 long timeout=2000; // number of millisecs to wait for an are-you-dead msg
28 int num_msgs=1; // number of are-you-alive msgs and i-am-not-dead responses (for redundancy)
29 final Vector members=null;
30 final Hashtable suspects=new Hashtable(); // keys=Addresses, vals=time in mcses since added
31 Thread timer=null;
32 final String name="VERIFY_SUSPECT";
33
34
35 public String getName() {
36 return name;
37 }
38
39
40 public boolean setProperties(Properties props) {
41 String str;
42
43 super.setProperties(props);
44 str=props.getProperty("timeout");
45 if(str != null) {
46 timeout=Long.parseLong(str);
47 props.remove("timeout");
48 }
49
50 str=props.getProperty("num_msgs");
51 if(str != null) {
52 num_msgs=Integer.parseInt(str);
53 if(num_msgs <= 0) {
54 if(warn) log.warn("num_msgs is invalid (" +
55 num_msgs + "): setting it to 1");
56 num_msgs=1;
57 }
58 props.remove("num_msgs");
59 }
60
61 if(props.size() > 0) {
62 log.error("VERIFY_SUSPECT.setProperties(): the following properties are not recognized: " + props);
63
64 return false;
65 }
66 return true;
67 }
68
69
70 public void up(Event evt) {
71 Address suspected_mbr;
72 Message msg, rsp;
73 Object obj;
74 VerifyHeader hdr;
75
76 switch(evt.getType()) {
77
78 case Event.SET_LOCAL_ADDRESS:
79 local_addr=(Address)evt.getArg();
80 break;
81
82 case Event.SUSPECT: // it all starts here ...
83 suspected_mbr=(Address)evt.getArg();
84 if(suspected_mbr == null) {
85 if(log.isErrorEnabled()) log.error("suspected member is null");
86 return;
87 }
88 suspect(suspected_mbr);
89 return; // don't pass up; we will decide later (after verification) whether to pass it up
90
91
92 case Event.MSG:
93 msg=(Message)evt.getArg();
94 obj=msg.getHeader(name);
95 if(obj == null || !(obj instanceof VerifyHeader))
96 break;
97 hdr=(VerifyHeader)msg.removeHeader(name);
98 switch(hdr.type) {
99 case VerifyHeader.ARE_YOU_DEAD:
100 if(hdr.from == null) {
101 if(log.isErrorEnabled()) log.error("ARE_YOU_DEAD: hdr.from is null");
102 }
103 else {
104 for(int i=0; i < num_msgs; i++) {
105 rsp=new Message(hdr.from, null, null);
106 rsp.putHeader(name, new VerifyHeader(VerifyHeader.I_AM_NOT_DEAD, local_addr));
107 passDown(new Event(Event.MSG, rsp));
108 }
109 }
110 return;
111 case VerifyHeader.I_AM_NOT_DEAD:
112 if(hdr.from == null) {
113 if(log.isErrorEnabled()) log.error("I_AM_NOT_DEAD: hdr.from is null");
114 return;
115 }
116 unsuspect(hdr.from);
117 return;
118 }
119 return;
120 }
121 passUp(evt);
122 }
123
124
125 /**
126 * Will be started when a suspect is added to the suspects hashtable. Continually iterates over the
127 * entries and removes entries whose time have elapsed. For each removed entry, a SUSPECT event is passed
128 * up the stack (because elapsed time means verification of member's liveness failed). Computes the shortest
129 * time to wait (min of all timeouts) and waits(time) msecs. Will be woken up when entry is removed (in case
130 * of successful verification of that member's liveness). Terminates when no entry remains in the hashtable.
131 */
132 public void run() {
133 Address mbr;
134 long val, curr_time, diff;
135
136 while(timer != null && Thread.currentThread().equals(timer) && suspects.size() > 0) {
137 diff=0;
138
139 synchronized(suspects) {
140 for(Enumeration e=suspects.keys(); e.hasMoreElements();) {
141 mbr=(Address)e.nextElement();
142 val=((Long)suspects.get(mbr)).longValue();
143 curr_time=System.currentTimeMillis();
144 diff=curr_time - val;
145 if(diff >= timeout) { // haven't been unsuspected, pass up SUSPECT
146 if(trace)
147 log.trace("diff=" + diff + ", mbr " + mbr + " is dead (passing up SUSPECT event)");
148 passUp(new Event(Event.SUSPECT, mbr));
149 suspects.remove(mbr);
150 continue;
151 }
152 diff=Math.max(diff, timeout - diff);
153 }
154 }
155
156 if(diff > 0)
157 Util.sleep(diff);
158 }
159 timer=null;
160 }
161
162
163
164 /* --------------------------------- Private Methods ----------------------------------- */
165
166
167 /**
168 * Sends ARE_YOU_DEAD message to suspected_mbr, wait for return or timeout
169 */
170 void suspect(Address mbr) {
171 Message msg;
172 if(mbr == null) return;
173
174 synchronized(suspects) {
175 if(suspects.containsKey(mbr))
176 return;
177 suspects.put(mbr, new Long(System.currentTimeMillis()));
178 if(trace) log.trace("verifying that " + mbr + " is dead");
179 for(int i=0; i < num_msgs; i++) {
180 msg=new Message(mbr, null, null);
181 msg.putHeader(name, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr));
182 passDown(new Event(Event.MSG, msg));
183 }
184 }
185 if(timer == null)
186 startTimer();
187 }
188
189 void unsuspect(Address mbr) {
190 if(mbr == null) return;
191 synchronized(suspects) {
192 if(suspects.containsKey(mbr)) {
193 if(trace) log.trace("member " + mbr + " is not dead !");
194 suspects.remove(mbr);
195 passDown(new Event(Event.UNSUSPECT, mbr));
196 passUp(new Event(Event.UNSUSPECT, mbr));
197 }
198 }
199 }
200
201
202 void startTimer() {
203 if(timer == null || !timer.isAlive()) {
204 timer=new Thread(this, "VERIFY_SUSPECT.TimerThread");
205 timer.setDaemon(true);
206 timer.start();
207 }
208 }
209
210 public void stop() {
211 Thread tmp;
212 if(timer != null && timer.isAlive()) {
213 tmp=timer;
214 timer=null;
215 tmp.interrupt();
216 tmp=null;
217 }
218 timer=null;
219 }
220 /* ----------------------------- End of Private Methods -------------------------------- */
221
222
223
224
225
226 public static class VerifyHeader extends Header implements Streamable {
227 static final short ARE_YOU_DEAD=1; // 'from' is sender of verify msg
228 static final short I_AM_NOT_DEAD=2; // 'from' is suspected member
229
230 short type=ARE_YOU_DEAD;
231 Address from=null; // member who wants to verify that suspected_mbr is dead
232
233
234 public VerifyHeader() {
235 } // used for externalization
236
237 VerifyHeader(short type) {
238 this.type=type;
239 }
240
241 VerifyHeader(short type, Address from) {
242 this(type);
243 this.from=from;
244 }
245
246
247 public String toString() {
248 switch(type) {
249 case ARE_YOU_DEAD:
250 return "[VERIFY_SUSPECT: ARE_YOU_DEAD]";
251 case I_AM_NOT_DEAD:
252 return "[VERIFY_SUSPECT: I_AM_NOT_DEAD]";
253 default:
254 return "[VERIFY_SUSPECT: unknown type (" + type + ")]";
255 }
256 }
257
258 public void writeExternal(ObjectOutput out) throws IOException {
259 out.writeShort(type);
260 out.writeObject(from);
261 }
262
263
264 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
265 type=in.readShort();
266 from=(Address)in.readObject();
267 }
268
269 public void writeTo(DataOutputStream out) throws IOException {
270 out.writeShort(type);
271 Util.writeAddress(from, out);
272 }
273
274 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
275 type=in.readShort();
276 from=Util.readAddress(in);
277 }
278
279 }
280
281
282 }
283