Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/VERIFY_SUSPECT.java


1   // $Id: VERIFY_SUSPECT.java,v 1.15 2005/08/11 12:43:47 belaban Exp $
2   
3   package org.jgroups.protocols;
4   
5   import org.jgroups.Address;
6   import org.jgroups.Event;
7   import org.jgroups.Header;
8   import org.jgroups.Message;
9   import org.jgroups.stack.Protocol;
10  import org.jgroups.util.Util;
11  import org.jgroups.util.Streamable;
12  
13  import java.io.*;
14  import java.util.Enumeration;
15  import java.util.Hashtable;
16  import java.util.Properties;
17  import java.util.Vector;
18  
19  
20  /**
21   * Catches SUSPECT events traveling up the stack. Verifies that the suspected member is really dead. If yes,
22   * passes SUSPECT event up the stack, otherwise discards it. Has to be placed somewhere above the FD layer and
23   * below the GMS layer (receiver of the SUSPECT event). Note that SUSPECT events may be reordered by this protocol.
24   */
25  public class VERIFY_SUSPECT extends Protocol implements Runnable {
26      Address local_addr=null;
27      long timeout=2000;   // number of millisecs to wait for an are-you-dead msg
28      int num_msgs=1;     // number of are-you-alive msgs and i-am-not-dead responses (for redundancy)
29      final Vector members=null;
30      final Hashtable suspects=new Hashtable();  // keys=Addresses, vals=time in mcses since added
31      Thread timer=null;
32      final String name="VERIFY_SUSPECT";
33  
34  
35      public String getName() {
36          return name;
37      }
38  
39  
40      public boolean setProperties(Properties props) {
41          String str;
42  
43          super.setProperties(props);
44          str=props.getProperty("timeout");
45          if(str != null) {
46              timeout=Long.parseLong(str);
47              props.remove("timeout");
48          }
49  
50          str=props.getProperty("num_msgs");
51          if(str != null) {
52              num_msgs=Integer.parseInt(str);
53              if(num_msgs <= 0) {
54                  if(warn) log.warn("num_msgs is invalid (" +
55                          num_msgs + "): setting it to 1");
56                  num_msgs=1;
57              }
58              props.remove("num_msgs");
59          }
60  
61          if(props.size() > 0) {
62              log.error("VERIFY_SUSPECT.setProperties(): the following properties are not recognized: " + props);
63  
64              return false;
65          }
66          return true;
67      }
68  
69  
70      public void up(Event evt) {
71          Address suspected_mbr;
72          Message msg, rsp;
73          Object obj;
74          VerifyHeader hdr;
75  
76          switch(evt.getType()) {
77  
78          case Event.SET_LOCAL_ADDRESS:
79              local_addr=(Address)evt.getArg();
80              break;
81  
82          case Event.SUSPECT:  // it all starts here ...
83              suspected_mbr=(Address)evt.getArg();
84              if(suspected_mbr == null) {
85                  if(log.isErrorEnabled()) log.error("suspected member is null");
86                  return;
87              }
88              suspect(suspected_mbr);
89              return;  // don't pass up; we will decide later (after verification) whether to pass it up
90  
91  
92          case Event.MSG:
93              msg=(Message)evt.getArg();
94              obj=msg.getHeader(name);
95              if(obj == null || !(obj instanceof VerifyHeader))
96                  break;
97              hdr=(VerifyHeader)msg.removeHeader(name);
98              switch(hdr.type) {
99              case VerifyHeader.ARE_YOU_DEAD:
100                 if(hdr.from == null) {
101                     if(log.isErrorEnabled()) log.error("ARE_YOU_DEAD: hdr.from is null");
102                 }
103                 else {
104                     for(int i=0; i < num_msgs; i++) {
105                         rsp=new Message(hdr.from, null, null);
106                         rsp.putHeader(name, new VerifyHeader(VerifyHeader.I_AM_NOT_DEAD, local_addr));
107                         passDown(new Event(Event.MSG, rsp));
108                     }
109                 }
110                 return;
111             case VerifyHeader.I_AM_NOT_DEAD:
112                 if(hdr.from == null) {
113                     if(log.isErrorEnabled()) log.error("I_AM_NOT_DEAD: hdr.from is null");
114                     return;
115                 }
116                 unsuspect(hdr.from);
117                 return;
118             }
119             return;
120         }
121         passUp(evt);
122     }
123 
124 
125     /**
126      * Will be started when a suspect is added to the suspects hashtable. Continually iterates over the
127      * entries and removes entries whose time have elapsed. For each removed entry, a SUSPECT event is passed
128      * up the stack (because elapsed time means verification of member's liveness failed). Computes the shortest
129      * time to wait (min of all timeouts) and waits(time) msecs. Will be woken up when entry is removed (in case
130      * of successful verification of that member's liveness). Terminates when no entry remains in the hashtable.
131      */
132     public void run() {
133         Address mbr;
134         long val, curr_time, diff;
135 
136         while(timer != null && Thread.currentThread().equals(timer) && suspects.size() > 0) {
137             diff=0;
138 
139             synchronized(suspects) {
140                 for(Enumeration e=suspects.keys(); e.hasMoreElements();) {
141                     mbr=(Address)e.nextElement();
142                     val=((Long)suspects.get(mbr)).longValue();
143                     curr_time=System.currentTimeMillis();
144                     diff=curr_time - val;
145                     if(diff >= timeout) {  // haven't been unsuspected, pass up SUSPECT
146                         if(trace)
147                             log.trace("diff=" + diff + ", mbr " + mbr + " is dead (passing up SUSPECT event)");
148                         passUp(new Event(Event.SUSPECT, mbr));
149                         suspects.remove(mbr);
150                         continue;
151                     }
152                     diff=Math.max(diff, timeout - diff);
153                 }
154             }
155 
156             if(diff > 0)
157                 Util.sleep(diff);
158         }
159         timer=null;
160     }
161 
162 
163 
164     /* --------------------------------- Private Methods ----------------------------------- */
165 
166 
167     /**
168      * Sends ARE_YOU_DEAD message to suspected_mbr, wait for return or timeout
169      */
170     void suspect(Address mbr) {
171         Message msg;
172         if(mbr == null) return;
173 
174         synchronized(suspects) {
175             if(suspects.containsKey(mbr))
176                 return;
177             suspects.put(mbr, new Long(System.currentTimeMillis()));
178             if(trace) log.trace("verifying that " + mbr + " is dead");
179             for(int i=0; i < num_msgs; i++) {
180                 msg=new Message(mbr, null, null);
181                 msg.putHeader(name, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr));
182                 passDown(new Event(Event.MSG, msg));
183             }
184         }
185         if(timer == null)
186             startTimer();
187     }
188 
189     void unsuspect(Address mbr) {
190         if(mbr == null) return;
191         synchronized(suspects) {
192             if(suspects.containsKey(mbr)) {
193                 if(trace) log.trace("member " + mbr + " is not dead !");
194                 suspects.remove(mbr);
195                 passDown(new Event(Event.UNSUSPECT, mbr));
196                 passUp(new Event(Event.UNSUSPECT, mbr));
197             }
198         }
199     }
200 
201 
202     void startTimer() {
203         if(timer == null || !timer.isAlive()) {
204             timer=new Thread(this, "VERIFY_SUSPECT.TimerThread");
205             timer.setDaemon(true);
206             timer.start();
207         }
208     }
209 
210     public void stop() {
211         Thread tmp;
212         if(timer != null && timer.isAlive()) {
213             tmp=timer;
214             timer=null;
215             tmp.interrupt();
216             tmp=null;
217         }
218         timer=null;
219     }
220     /* ----------------------------- End of Private Methods -------------------------------- */
221 
222 
223 
224 
225 
226     public static class VerifyHeader extends Header implements Streamable {
227         static final short ARE_YOU_DEAD=1;  // 'from' is sender of verify msg
228         static final short I_AM_NOT_DEAD=2;  // 'from' is suspected member
229 
230         short type=ARE_YOU_DEAD;
231         Address from=null;     // member who wants to verify that suspected_mbr is dead
232 
233 
234         public VerifyHeader() {
235         } // used for externalization
236 
237         VerifyHeader(short type) {
238             this.type=type;
239         }
240 
241         VerifyHeader(short type, Address from) {
242             this(type);
243             this.from=from;
244         }
245 
246 
247         public String toString() {
248             switch(type) {
249                 case ARE_YOU_DEAD:
250                     return "[VERIFY_SUSPECT: ARE_YOU_DEAD]";
251                 case I_AM_NOT_DEAD:
252                     return "[VERIFY_SUSPECT: I_AM_NOT_DEAD]";
253                 default:
254                     return "[VERIFY_SUSPECT: unknown type (" + type + ")]";
255             }
256         }
257 
258         public void writeExternal(ObjectOutput out) throws IOException {
259             out.writeShort(type);
260             out.writeObject(from);
261         }
262 
263 
264         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
265             type=in.readShort();
266             from=(Address)in.readObject();
267         }
268 
269         public void writeTo(DataOutputStream out) throws IOException {
270             out.writeShort(type);
271             Util.writeAddress(from, out);
272         }
273 
274         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
275             type=in.readShort();
276             from=Util.readAddress(in);
277         }
278 
279     }
280 
281 
282 }
283