Source code: org/jgroups/protocols/VIEW_SYNC.java
1 package org.jgroups.protocols;
2
3
4 import org.jgroups.*;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.util.Streamable;
7 import org.jgroups.util.TimeScheduler;
8 import org.jgroups.util.Util;
9
10 import java.io.*;
11 import java.util.Properties;
12 import java.util.Vector;
13
14
15
16
17 /**
18 * Periodically sends the view to the group. When a view is received which is greater than the current view, we
19 * install it. Otherwise we simply discard it. This is used to solve the problem for unreliable view
20 * dissemination outlined in JGroups/doc/ReliableViewInstallation.txt. This protocol is supposed to be just below GMS.
21 * @author Bela Ban
22 * @version $Id: VIEW_SYNC.java,v 1.4 2005/10/27 08:30:14 belaban Exp $
23 */
24 public class VIEW_SYNC extends Protocol {
25 Address local_addr=null;
26 final Vector mbrs=new Vector();
27 View my_view=null;
28 ViewId my_vid=null;
29
30 /** Sends a VIEW_SYNC message to the group every 20 seconds on average. 0 disables sending of VIEW_SYNC messages */
31 long avg_send_interval=60000;
32
33 private int num_views_sent=0;
34 private int num_views_adjusted=0;
35
36 ViewSendTask view_send_task=null; // bcasts periodic STABLE message (added to timer below)
37 final Object view_send_task_mutex=new Object(); // to sync on stable_task
38 TimeScheduler timer=null; // to send periodic STABLE msgs (and STABILITY messages)
39 static final String name="VIEW_SYNC";
40
41
42
43 public String getName() {
44 return name;
45 }
46
47 public long getAverageSendInterval() {
48 return avg_send_interval;
49 }
50
51 public void setAverageSendInterval(long gossip_interval) {
52 avg_send_interval=gossip_interval;
53 }
54
55 public int getNumViewsSent() {
56 return num_views_sent;
57 }
58
59 public int getNumViewsAdjusted() {
60 return num_views_adjusted;
61 }
62
63 public void resetStats() {
64 super.resetStats();
65 num_views_adjusted=num_views_sent=0;
66 }
67
68
69
70 public boolean setProperties(Properties props) {
71 String str;
72
73 super.setProperties(props);
74
75 str=props.getProperty("avg_send_interval");
76 if(str != null) {
77 avg_send_interval=Long.parseLong(str);
78 props.remove("avg_send_interval");
79 }
80
81 if(props.size() > 0) {
82 log.error("these properties are not recognized: " + props);
83 return false;
84 }
85 return true;
86 }
87
88
89 public void start() throws Exception {
90 if(stack != null && stack.timer != null)
91 timer=stack.timer;
92 else
93 throw new Exception("timer cannot be retrieved from protocol stack");
94 }
95
96 public void stop() {
97 stopViewSender();
98 }
99
100 /** Sends a VIEW_SYNC_REQ to all members, every member replies with a VIEW multicast */
101 public void sendViewRequest() {
102 Message msg=new Message(null, null, null);
103 ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC_REQ, null);
104 msg.putHeader(name, hdr);
105 passDown(new Event(Event.MSG, msg));
106 }
107
108 // public void sendFakeViewForTestingOnly() {
109 // ViewId fake_vid=new ViewId(local_addr, my_vid.getId() +2);
110 // View fake_view=new View(fake_vid, new Vector(my_view.getMembers()));
111 // System.out.println("sending fake view " + fake_view);
112 // my_view=fake_view;
113 // my_vid=fake_vid;
114 // sendView();
115 // }
116
117
118 public void up(Event evt) {
119 Message msg;
120 ViewSyncHeader hdr;
121 int type=evt.getType();
122
123 switch(type) {
124
125 case Event.MSG:
126 msg=(Message)evt.getArg();
127 hdr=(ViewSyncHeader)msg.removeHeader(name);
128 if(hdr == null)
129 break;
130 Address sender=msg.getSrc();
131 switch(hdr.type) {
132 case ViewSyncHeader.VIEW_SYNC:
133 handleView(hdr.view, sender);
134 break;
135 case ViewSyncHeader.VIEW_SYNC_REQ:
136 if(!sender.equals(local_addr))
137 sendView();
138 break;
139 default:
140 if(log.isErrorEnabled()) log.error("ViewSyncHeader type " + hdr.type + " not known");
141 }
142 return;
143
144
145 case Event.VIEW_CHANGE:
146 View view=(View)evt.getArg();
147 handleViewChange(view);
148 break;
149
150 case Event.SET_LOCAL_ADDRESS:
151 local_addr=(Address)evt.getArg();
152 break;
153 }
154 passUp(evt);
155 }
156
157
158
159 public void down(Event evt) {
160 switch(evt.getType()) {
161 case Event.VIEW_CHANGE:
162 View v=(View)evt.getArg();
163 handleViewChange(v);
164 break;
165
166 }
167 passDown(evt);
168 }
169
170
171
172 /* --------------------------------------- Private Methods ---------------------------------------- */
173
174 private void handleView(View v, Address sender) {
175 Vector members=v.getMembers();
176 if(!members.contains(local_addr)) {
177 if(log.isWarnEnabled())
178 log.warn("discarding view as I (" + local_addr + ") am not member of view (" + v + ")");
179 return;
180 }
181
182 ViewId vid=v.getVid();
183 int rc=vid.compareTo(my_vid);
184 if(rc > 0) { // foreign view is greater than my own view; update my own view !
185 if(log.isTraceEnabled())
186 log.trace("view from " + sender + " (" + vid + ") is greater than my own view (" + my_vid + ");" +
187 " will update my own view");
188
189 Message view_change=new Message(local_addr, local_addr, null);
190 org.jgroups.protocols.pbcast.GMS.GmsHeader hdr;
191 hdr=new org.jgroups.protocols.pbcast.GMS.GmsHeader(org.jgroups.protocols.pbcast.GMS.GmsHeader.VIEW, v);
192 view_change.putHeader(GMS.name, hdr);
193 passUp(new Event(Event.MSG, view_change));
194 num_views_adjusted++;
195 }
196 }
197
198 private void handleViewChange(View view) {
199 my_view=(View)view.clone();
200 my_vid=my_view.getVid();
201 if(my_view.size() > 1 && (view_send_task == null || !view_send_task.running()))
202 startViewSender();
203 }
204
205 private void sendView() {
206 View tmp=(View)(my_view != null? my_view.clone() : null);
207 if(tmp == null) return;
208 Message msg=new Message(null, null, null); // send to the group
209 ViewSyncHeader hdr=new ViewSyncHeader(ViewSyncHeader.VIEW_SYNC, tmp);
210 msg.putHeader(name, hdr);
211 passDown(new Event(Event.MSG, msg));
212 num_views_sent++;
213 }
214
215 void startViewSender() {
216 // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
217 // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
218 // 1 cycle: on the next message or view, we will start the task
219 if(view_send_task != null)
220 return;
221 synchronized(view_send_task_mutex) {
222 if(view_send_task != null && view_send_task.running()) {
223 return; // already running
224 }
225 view_send_task=new ViewSendTask();
226 timer.add(view_send_task, true); // fixed-rate scheduling
227 }
228 if(trace)
229 log.trace("view send task started");
230 }
231
232
233 void stopViewSender() {
234 // contrary to startViewSender(), we don't need double-checked locking here because this method is not
235 // called frequently
236 synchronized(view_send_task_mutex) {
237 if(view_send_task != null) {
238 view_send_task.stop();
239 if(trace)
240 log.trace("view send task stopped");
241 view_send_task=null;
242 }
243 }
244 }
245
246
247
248
249
250
251 /* ------------------------------------End of Private Methods ------------------------------------- */
252
253
254
255
256
257
258
259 public static class ViewSyncHeader extends Header implements Streamable {
260 public static final int VIEW_SYNC = 1; // contains a view
261 public static final int VIEW_SYNC_REQ = 2; // request to all members to send their views
262
263 int type=0;
264 View view=null;
265
266 public ViewSyncHeader() {
267 }
268
269
270 public ViewSyncHeader(int type, View view) {
271 this.type=type;
272 this.view=view;
273 }
274
275
276 static String type2String(int t) {
277 switch(t) {
278 case VIEW_SYNC:
279 return "VIEW_SYNC";
280 case VIEW_SYNC_REQ:
281 return "VIEW_SYNC_REQ";
282 default:
283 return "<unknown>";
284 }
285 }
286
287 public String toString() {
288 StringBuffer sb=new StringBuffer();
289 sb.append('[');
290 sb.append(type2String(type));
291 sb.append("]");
292 if(view != null)
293 sb.append(", view= ").append(view);
294 return sb.toString();
295 }
296
297
298 public void writeExternal(ObjectOutput out) throws IOException {
299 out.writeInt(type);
300 if(view == null) {
301 out.writeBoolean(false);
302 return;
303 }
304 out.writeBoolean(true);
305 view.writeExternal(out);
306 }
307
308
309 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
310 type=in.readInt();
311 boolean available=in.readBoolean();
312 if(available) {
313 view=new View();
314 view.readExternal(in);
315 }
316 }
317
318 public long size() {
319 long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
320 if(view != null)
321 retval+=view.serializedSize();
322 return retval;
323 }
324
325 public void writeTo(DataOutputStream out) throws IOException {
326 out.writeInt(type);
327 Util.writeStreamable(view, out);
328 }
329
330 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
331 type=in.readInt();
332 view=(View)Util.readStreamable(View.class, in);
333 }
334
335
336 }
337
338
339
340
341 /**
342 Periodically multicasts a View_SYNC message
343 */
344 private class ViewSendTask implements TimeScheduler.Task {
345 boolean stopped=false;
346
347 public void stop() {
348 stopped=true;
349 }
350
351 public boolean running() { // syntactic sugar
352 return !stopped;
353 }
354
355 public boolean cancelled() {
356 return stopped;
357 }
358
359 public long nextInterval() {
360 long interval=computeSleepTime();
361 if(interval <= 0)
362 return 10000;
363 else
364 return interval;
365 }
366
367
368 public void run() {
369 sendView();
370 }
371
372 long computeSleepTime() {
373 return getRandom((mbrs.size() * avg_send_interval * 2));
374 }
375
376 long getRandom(long range) {
377 return (long)((Math.random() * range) % range);
378 }
379 }
380
381
382
383 }