Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/VIEW_SYNC.java


1   package org.jgroups.protocols;
2   
3   
4   import org.jgroups.*;
5   import org.jgroups.stack.Protocol;
6   import org.jgroups.util.Streamable;
7   import org.jgroups.util.TimeScheduler;
8   import org.jgroups.util.Util;
9   
10  import java.io.*;
11  import java.util.Properties;
12  import java.util.Vector;
13  
14  
15  
16  
17  /**
18   * Periodically sends the view to the group. When a view is received which is greater than the current view, we
19   * install it. Otherwise we simply discard it. This is used to solve the problem for unreliable view
20   * dissemination outlined in JGroups/doc/ReliableViewInstallation.txt. This protocol is supposed to be just below GMS.
21   * @author Bela Ban
22   * @version $Id: VIEW_SYNC.java,v 1.4 2005/10/27 08:30:14 belaban Exp $
23   */
24  public class VIEW_SYNC extends Protocol {
25      Address             local_addr=null;
26      final Vector        mbrs=new Vector();
27      View                my_view=null;
28      ViewId              my_vid=null;
29  
30      /** Sends a VIEW_SYNC message to the group every 20 seconds on average. 0 disables sending of VIEW_SYNC messages */
31      long                avg_send_interval=60000;
32  
33      private int         num_views_sent=0;
34      private int         num_views_adjusted=0;
35  
36      ViewSendTask        view_send_task=null;             // bcasts periodic STABLE message (added to timer below)
37      final Object        view_send_task_mutex=new Object(); // to sync on stable_task
38      TimeScheduler       timer=null;                   // to send periodic STABLE msgs (and STABILITY messages)
39      static final String name="VIEW_SYNC";
40  
41  
42  
43      public String getName() {
44          return name;
45      }
46  
47      public long getAverageSendInterval() {
48          return avg_send_interval;
49      }
50  
51      public void setAverageSendInterval(long gossip_interval) {
52          avg_send_interval=gossip_interval;
53      }
54  
55      public int getNumViewsSent() {
56          return num_views_sent;
57      }
58  
59      public int getNumViewsAdjusted() {
60          return num_views_adjusted;
61      }
62  
63      public void resetStats() {
64          super.resetStats();
65          num_views_adjusted=num_views_sent=0;
66      }
67  
68  
69  
70      public boolean setProperties(Properties props) {
71          String str;
72  
73          super.setProperties(props);
74  
75          str=props.getProperty("avg_send_interval");
76          if(str != null) {
77              avg_send_interval=Long.parseLong(str);
78              props.remove("avg_send_interval");
79          }
80  
81          if(props.size() > 0) {
82              log.error("these properties are not recognized: " + props);
83              return false;
84          }
85          return true;
86      }
87  
88  
89      public void start() throws Exception {
90          if(stack != null && stack.timer != null)
91              timer=stack.timer;
92          else
93              throw new Exception("timer cannot be retrieved from protocol stack");
94      }
95  
96      public void stop() {
97          stopViewSender();
98      }
99  
100     /** Sends a VIEW_SYNC_REQ to all members, every member replies with a VIEW multicast */
101     public void sendViewRequest() {
102         Message msg=new Message(null, null, null);
103         ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC_REQ, null);
104         msg.putHeader(name, hdr);
105         passDown(new Event(Event.MSG, msg));
106     }
107 
108 //    public void sendFakeViewForTestingOnly() {
109 //        ViewId fake_vid=new ViewId(local_addr, my_vid.getId() +2);
110 //        View fake_view=new View(fake_vid, new Vector(my_view.getMembers()));
111 //        System.out.println("sending fake view " + fake_view);
112 //        my_view=fake_view;
113 //        my_vid=fake_vid;
114 //        sendView();
115 //    }
116 
117 
118     public void up(Event evt) {
119         Message msg;
120         ViewSyncHeader hdr;
121         int type=evt.getType();
122 
123         switch(type) {
124 
125         case Event.MSG:
126             msg=(Message)evt.getArg();
127             hdr=(ViewSyncHeader)msg.removeHeader(name);
128             if(hdr == null)
129                 break;
130             Address sender=msg.getSrc();
131             switch(hdr.type) {
132             case ViewSyncHeader.VIEW_SYNC:
133                 handleView(hdr.view, sender);
134                 break;
135             case ViewSyncHeader.VIEW_SYNC_REQ:
136                 if(!sender.equals(local_addr))
137                     sendView();
138                 break;
139             default:
140                 if(log.isErrorEnabled()) log.error("ViewSyncHeader type " + hdr.type + " not known");
141             }
142             return;
143 
144 
145         case Event.VIEW_CHANGE:
146             View view=(View)evt.getArg();
147             handleViewChange(view);
148             break;
149 
150         case Event.SET_LOCAL_ADDRESS:
151             local_addr=(Address)evt.getArg();
152             break;
153         }
154         passUp(evt);
155     }
156 
157 
158 
159     public void down(Event evt) {
160         switch(evt.getType()) {
161         case Event.VIEW_CHANGE:
162             View v=(View)evt.getArg();
163             handleViewChange(v);
164             break;
165 
166         }
167         passDown(evt);
168     }
169 
170 
171 
172     /* --------------------------------------- Private Methods ---------------------------------------- */
173 
174     private void handleView(View v, Address sender) {
175         Vector members=v.getMembers();
176         if(!members.contains(local_addr)) {
177             if(log.isWarnEnabled())
178             log.warn("discarding view as I (" + local_addr + ") am not member of view (" + v + ")");
179             return;
180         }
181 
182         ViewId vid=v.getVid();
183         int rc=vid.compareTo(my_vid);
184         if(rc > 0) { // foreign view is greater than my own view; update my own view !
185             if(log.isTraceEnabled())
186                 log.trace("view from " + sender + " (" + vid + ") is greater than my own view (" + my_vid + ");" +
187                 " will update my own view");
188 
189             Message view_change=new Message(local_addr, local_addr, null);
190             org.jgroups.protocols.pbcast.GMS.GmsHeader hdr;
191             hdr=new org.jgroups.protocols.pbcast.GMS.GmsHeader(org.jgroups.protocols.pbcast.GMS.GmsHeader.VIEW, v);
192             view_change.putHeader(GMS.name, hdr);
193             passUp(new Event(Event.MSG, view_change));
194             num_views_adjusted++;
195         }
196     }
197 
198     private void handleViewChange(View view) {
199         my_view=(View)view.clone();
200         my_vid=my_view.getVid();
201         if(my_view.size() > 1 && (view_send_task == null || !view_send_task.running()))
202             startViewSender();
203     }
204 
205     private void sendView() {
206         View tmp=(View)(my_view != null? my_view.clone() : null);
207         if(tmp == null) return;
208         Message msg=new Message(null, null, null); // send to the group
209         ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC, tmp);
210         msg.putHeader(name, hdr);
211         passDown(new Event(Event.MSG, msg));
212         num_views_sent++;
213     }
214 
215     void startViewSender() {
216         // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
217         // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
218         // 1 cycle: on the next message or view, we will start the task
219         if(view_send_task != null)
220             return;
221         synchronized(view_send_task_mutex) {
222             if(view_send_task != null && view_send_task.running()) {
223                 return;  // already running
224             }
225             view_send_task=new ViewSendTask();
226             timer.add(view_send_task, true); // fixed-rate scheduling
227         }
228         if(trace)
229             log.trace("view send task started");
230     }
231 
232 
233     void stopViewSender() {
234         // contrary to startViewSender(), we don't need double-checked locking here because this method is not
235         // called frequently
236         synchronized(view_send_task_mutex) {
237             if(view_send_task != null) {
238                 view_send_task.stop();
239                 if(trace)
240                     log.trace("view send task stopped");
241                 view_send_task=null;
242             }
243         }
244     }
245 
246 
247 
248 
249 
250 
251     /* ------------------------------------End of Private Methods ------------------------------------- */
252 
253 
254 
255 
256 
257 
258 
259     public static class ViewSyncHeader extends Header implements Streamable {
260         public static final int VIEW_SYNC     = 1; // contains a view
261         public static final int VIEW_SYNC_REQ = 2; // request to all members to send their views
262 
263         int   type=0;
264         View  view=null;
265 
266         public ViewSyncHeader() {
267         }
268 
269 
270         public ViewSyncHeader(int type, View view) {
271             this.type=type;
272             this.view=view;
273         }
274 
275 
276         static String type2String(int t) {
277             switch(t) {
278                 case VIEW_SYNC:
279                     return "VIEW_SYNC";
280                 case VIEW_SYNC_REQ:
281                     return "VIEW_SYNC_REQ";
282                 default:
283                     return "<unknown>";
284             }
285         }
286 
287         public String toString() {
288             StringBuffer sb=new StringBuffer();
289             sb.append('[');
290             sb.append(type2String(type));
291             sb.append("]");
292             if(view != null)
293                 sb.append(", view= ").append(view);
294             return sb.toString();
295         }
296 
297 
298         public void writeExternal(ObjectOutput out) throws IOException {
299             out.writeInt(type);
300             if(view == null) {
301                 out.writeBoolean(false);
302                 return;
303             }
304             out.writeBoolean(true);
305             view.writeExternal(out);
306         }
307 
308 
309         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
310             type=in.readInt();
311             boolean available=in.readBoolean();
312             if(available) {
313                 view=new View();
314                 view.readExternal(in);
315             }
316         }
317 
318         public long size() {
319             long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
320             if(view != null)
321                 retval+=view.serializedSize();
322             return retval;
323         }
324 
325         public void writeTo(DataOutputStream out) throws IOException {
326             out.writeInt(type);
327             Util.writeStreamable(view, out);
328         }
329 
330         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
331             type=in.readInt();
332             view=(View)Util.readStreamable(View.class, in);
333         }
334 
335 
336     }
337 
338 
339 
340 
341     /**
342      Periodically multicasts a View_SYNC message
343      */
344     private class ViewSendTask implements TimeScheduler.Task {
345         boolean stopped=false;
346 
347         public void stop() {
348             stopped=true;
349         }
350 
351         public boolean running() { // syntactic sugar
352             return !stopped;
353         }
354 
355         public boolean cancelled() {
356             return stopped;
357         }
358 
359         public long nextInterval() {
360             long interval=computeSleepTime();
361             if(interval <= 0)
362                 return 10000;
363             else
364                 return interval;
365         }
366 
367 
368         public void run() {
369             sendView();
370         }
371 
372         long computeSleepTime() {
373             return getRandom((mbrs.size() * avg_send_interval * 2));
374         }
375 
376         long getRandom(long range) {
377             return (long)((Math.random() * range) % range);
378         }
379     }
380 
381 
382 
383 }