Source code: org/jgroups/protocols/pbcast/NAKACK.java
1 // $Id: NAKACK.java,v 1.60 2005/11/08 11:08:08 belaban Exp $
2
3 package org.jgroups.protocols.pbcast;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.NakReceiverWindow;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.stack.Retransmitter;
9 import org.jgroups.util.*;
10
11 import java.util.*;
12 import java.io.*;
13
14
15 /**
16 * Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno).
17 * Receivers deliver messages ordered according to seqno and request retransmission of missing messages. Retransmitted
18 * messages are bundled into bigger ones, e.g. when getting an xmit request for messages 1-10, instead of sending 10
19 * unicast messages, we bundle all 10 messages into 1 and send it. However, since this protocol typically sits below
20 * FRAG, we cannot count on FRAG to fragement/defragment the (possibly) large message into smaller ones. Therefore we
21 * only bundle messages up to max_xmit_size bytes to prevent too large messages. For example, if the bundled message
22 * size was a total of 34000 bytes, and max_xmit_size=16000, we'd send 3 messages: 2 16K and a 2K message. <em>Note that
23 * max_xmit_size should be the same value as FRAG.frag_size (or smaller).</em><br/> Retransmit requests are always sent
24 * to the sender. If the sender dies, and not everyone has received its messages, they will be lost. In the future, this
25 * may be changed to have receivers store all messages, so that retransmit requests can be answered by any member.
26 * Trivial to implement, but not done yet. For most apps, the default retransmit properties are sufficient, if not use
27 * vsync.
28 *
29 * @author Bela Ban
30 */
31 public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand, NakReceiverWindow.Listener {
32 private long[] retransmit_timeout={600, 1200, 2400, 4800}; // time(s) to wait before requesting retransmission
33 private boolean is_server=false;
34 private Address local_addr=null;
35 private final Vector members=new Vector(11);
36 private long seqno=0; // current message sequence number (starts with 0)
37 private long max_xmit_size=8192; // max size of a retransmit message (otherwise send multiple)
38 private int gc_lag=20; // number of msgs garbage collection lags behind
39
40 /**
41 * Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a
42 * message, the sender only retransmits once.
43 */
44 private boolean use_mcast_xmit=false;
45
46 /**
47 * Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be
48 * set to false
49 */
50 private boolean xmit_from_random_member=false;
51
52
53 /**
54 * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
55 * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
56 * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
57 * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
58 * around, and don't need to wait for garbage collection to remove them.
59 */
60 private boolean discard_delivered_msgs=false;
61
62 /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
63 * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
64 */
65 private int max_xmit_buf_size=0;
66
67
68 /**
69 * Hashtable<Address,NakReceiverWindow>. Stores received messages (keyed by sender). Note that this is no long term
70 * storage; messages are just stored until they can be delivered (ie., until the correct FIFO order is established)
71 */
72 private final HashMap received_msgs=new HashMap(11);
73
74 /** TreeMap<Long,Message>. Map of messages sent by me (keyed and sorted on sequence number) */
75 private final TreeMap sent_msgs=new TreeMap();
76
77 private boolean leaving=false;
78 private TimeScheduler timer=null;
79 private static final String name="NAKACK";
80
81 private long xmit_reqs_received;
82 private long xmit_reqs_sent;
83 private long xmit_rsps_received;
84 private long xmit_rsps_sent;
85 private long missing_msgs_received;
86
87 /** Captures stats on XMIT_REQS, XMIT_RSPS per sender */
88 private HashMap sent=new HashMap();
89
90 /** Captures stats on XMIT_REQS, XMIT_RSPS per receiver */
91 private HashMap received=new HashMap();
92
93 private int stats_list_size=20;
94
95 /** BoundedList<XmitRequest>. Keeps track of the last stats_list_size XMIT requests */
96 private BoundedList receive_history;
97
98 /** BoundedList<MissingMessage>. Keeps track of the last stats_list_size missing messages received */
99 private BoundedList send_history;
100
101
102
103
104
105 public NAKACK() {
106 }
107
108
109 public String getName() {
110 return name;
111 }
112
113 public long getXmitRequestsReceived() {return xmit_reqs_received;}
114 public long getXmitRequestsSent() {return xmit_reqs_sent;}
115 public long getXmitResponsesReceived() {return xmit_rsps_received;}
116 public long getXmitResponsesSent() {return xmit_rsps_sent;}
117 public long getMissingMessagesReceived() {return missing_msgs_received;}
118
119 public int getPendingRetransmissionRequests() {
120 int num=0;
121 NakReceiverWindow win;
122 synchronized(received_msgs) {
123 for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
124 win=(NakReceiverWindow)it.next();
125 num+=win.size();
126 }
127 }
128 return num;
129 }
130
131 public int getSentTableSize() {
132 return sent_msgs.size();
133 }
134
135 public int getReceivedTableSize() {
136 int ret=0;
137 NakReceiverWindow win;
138 Set s=new LinkedHashSet(received_msgs.values());
139 for(Iterator it=s.iterator(); it.hasNext();) {
140 win=(NakReceiverWindow)it.next();
141 ret+=win.size();
142 }
143 return ret;
144 }
145
146 public void resetStats() {
147 xmit_reqs_received=xmit_reqs_sent=xmit_rsps_received=xmit_rsps_sent=missing_msgs_received=0;
148 sent.clear();
149 received.clear();
150 if(receive_history !=null)
151 receive_history.removeAll();
152 if(send_history != null)
153 send_history.removeAll();
154 }
155
156 public void init() throws Exception {
157 if(stats) {
158 send_history=new BoundedList(stats_list_size);
159 receive_history=new BoundedList(stats_list_size);
160 }
161 }
162
163
164 public int getGcLag() {
165 return gc_lag;
166 }
167
168 public void setGcLag(int gc_lag) {
169 this.gc_lag=gc_lag;
170 }
171
172 public boolean isUseMcastXmit() {
173 return use_mcast_xmit;
174 }
175
176 public void setUseMcastXmit(boolean use_mcast_xmit) {
177 this.use_mcast_xmit=use_mcast_xmit;
178 }
179
180 public boolean isXmitFromRandomMember() {
181 return xmit_from_random_member;
182 }
183
184 public void setXmitFromRandomMember(boolean xmit_from_random_member) {
185 this.xmit_from_random_member=xmit_from_random_member;
186 }
187
188 public boolean isDiscardDeliveredMsgs() {
189 return discard_delivered_msgs;
190 }
191
192 public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {
193 this.discard_delivered_msgs=discard_delivered_msgs;
194 }
195
196 public int getMaxXmitBufSize() {
197 return max_xmit_buf_size;
198 }
199
200 public void setMaxXmitBufSize(int max_xmit_buf_size) {
201 this.max_xmit_buf_size=max_xmit_buf_size;
202 }
203
204 public long getMaxXmitSize() {
205 return max_xmit_size;
206 }
207
208 public void setMaxXmitSize(long max_xmit_size) {
209 this.max_xmit_size=max_xmit_size;
210 }
211
212 public boolean setProperties(Properties props) {
213 String str;
214 long[] tmp;
215
216 super.setProperties(props);
217 str=props.getProperty("retransmit_timeout");
218 if(str != null) {
219 tmp=Util.parseCommaDelimitedLongs(str);
220 props.remove("retransmit_timeout");
221 if(tmp != null && tmp.length > 0) {
222 retransmit_timeout=tmp;
223 }
224 }
225
226 str=props.getProperty("gc_lag");
227 if(str != null) {
228 gc_lag=Integer.parseInt(str);
229 if(gc_lag < 0) {
230 log.error("NAKACK.setProperties(): gc_lag cannot be negative, setting it to 0");
231 }
232 props.remove("gc_lag");
233 }
234
235 str=props.getProperty("max_xmit_size");
236 if(str != null) {
237 max_xmit_size=Long.parseLong(str);
238 props.remove("max_xmit_size");
239 }
240
241 str=props.getProperty("use_mcast_xmit");
242 if(str != null) {
243 use_mcast_xmit=Boolean.valueOf(str).booleanValue();
244 props.remove("use_mcast_xmit");
245 }
246
247 str=props.getProperty("discard_delivered_msgs");
248 if(str != null) {
249 discard_delivered_msgs=Boolean.valueOf(str).booleanValue();
250 props.remove("discard_delivered_msgs");
251 }
252
253 str=props.getProperty("xmit_from_random_member");
254 if(str != null) {
255 xmit_from_random_member=Boolean.valueOf(str).booleanValue();
256 props.remove("xmit_from_random_member");
257 }
258
259 str=props.getProperty("max_xmit_buf_size");
260 if(str != null) {
261 max_xmit_buf_size=Integer.parseInt(str);
262 props.remove("max_xmit_buf_size");
263 }
264
265 str=props.getProperty("stats_list_size");
266 if(str != null) {
267 stats_list_size=Integer.parseInt(str);
268 props.remove("stats_list_size");
269 }
270
271 if(xmit_from_random_member) {
272 if(discard_delivered_msgs) {
273 discard_delivered_msgs=false;
274 log.warn("xmit_from_random_member set to true: changed discard_delivered_msgs to false");
275 }
276 }
277
278 if(props.size() > 0) {
279 log.error("NAKACK.setProperties(): these properties are not recognized: " + props);
280
281 return false;
282 }
283 return true;
284 }
285
286 public Map dumpStats() {
287 Map retval=super.dumpStats();
288 if(retval == null)
289 retval=new HashMap();
290
291 retval.put("xmit_reqs_received", new Long(xmit_reqs_received));
292 retval.put("xmit_reqs_sent", new Long(xmit_reqs_sent));
293 retval.put("xmit_rsps_received", new Long(xmit_rsps_received));
294 retval.put("xmit_rsps_sent", new Long(xmit_rsps_sent));
295 retval.put("missing_msgs_received", new Long(missing_msgs_received));
296
297 retval.put("sent_msgs", printSentMsgs());
298
299 StringBuffer sb=new StringBuffer();
300 Map.Entry entry;
301 Address addr;
302 Object w;
303 synchronized(received_msgs) {
304 for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
305 entry=(Map.Entry)it.next();
306 addr=(Address)entry.getKey();
307 w=entry.getValue();
308 sb.append(addr).append(": ").append(w.toString()).append('\n');
309 }
310 }
311
312 retval.put("received_msgs", sb.toString());
313 return retval;
314 }
315
316 public String printStats() {
317 Map.Entry entry;
318 Object key, val;
319 StringBuffer sb=new StringBuffer();
320 sb.append("sent:\n");
321 for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {
322 entry=(Map.Entry)it.next();
323 key=entry.getKey();
324 if(key == null) key="<mcast dest>";
325 val=entry.getValue();
326 sb.append(key).append(": ").append(val).append("\n");
327 }
328 sb.append("\nreceived:\n");
329 for(Iterator it=received.entrySet().iterator(); it.hasNext();) {
330 entry=(Map.Entry)it.next();
331 key=entry.getKey();
332 val=entry.getValue();
333 sb.append(key).append(": ").append(val).append("\n");
334 }
335
336 sb.append("\nXMIT_REQS sent:\n");
337 XmitRequest tmp;
338 for(Enumeration en=send_history.elements(); en.hasMoreElements();) {
339 tmp=(XmitRequest)en.nextElement();
340 sb.append(tmp).append("\n");
341 }
342
343 sb.append("\nMissing messages received\n");
344 MissingMessage missing;
345 for(Enumeration en=receive_history.elements(); en.hasMoreElements();) {
346 missing=(MissingMessage)en.nextElement();
347 sb.append(missing).append("\n");
348 }
349
350 return sb.toString();
351 }
352
353
354
355 public Vector providedUpServices() {
356 Vector retval=new Vector(5);
357 retval.addElement(new Integer(Event.GET_DIGEST));
358 retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
359 retval.addElement(new Integer(Event.GET_DIGEST_STATE));
360 retval.addElement(new Integer(Event.SET_DIGEST));
361 retval.addElement(new Integer(Event.MERGE_DIGEST));
362 return retval;
363 }
364
365
366 public Vector providedDownServices() {
367 Vector retval=new Vector(2);
368 retval.addElement(new Integer(Event.GET_DIGEST));
369 retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
370 return retval;
371 }
372
373
374 public void start() throws Exception {
375 timer=stack != null ? stack.timer : null;
376 if(timer == null) {
377 throw new Exception("NAKACK.up(): timer is null");
378 }
379 }
380
381 public void stop() {
382 removeAll(); // clears sent_msgs and destroys all NakReceiverWindows
383 }
384
385
386 /**
387 * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>passDown()</code> in this
388 * method as the event is passed down by default by the superclass after this method returns !</b>
389 */
390 public void down(Event evt) {
391 Digest digest;
392 Vector mbrs;
393
394 switch(evt.getType()) {
395
396 case Event.MSG:
397 Message msg=(Message)evt.getArg();
398 Address dest=msg.getDest();
399 if(dest != null && !dest.isMulticastAddress()) {
400 break; // unicast address: not null and not mcast, pass down unchanged
401 }
402 send(evt, msg);
403 return; // don't pass down the stack
404
405 case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
406 stable((Digest)evt.getArg());
407 return; // do not pass down further (Bela Aug 7 2001)
408
409 case Event.GET_DIGEST:
410 digest=getDigest();
411 passUp(new Event(Event.GET_DIGEST_OK, digest != null ? digest.copy() : null));
412 return;
413
414 case Event.GET_DIGEST_STABLE:
415 digest=getDigestHighestDeliveredMsgs();
416 passUp(new Event(Event.GET_DIGEST_STABLE_OK, digest != null ? digest.copy() : null));
417 return;
418
419 case Event.GET_DIGEST_STATE:
420 digest=getDigest();
421 passUp(new Event(Event.GET_DIGEST_STATE_OK, digest != null ? digest.copy() : null));
422 return;
423
424 case Event.SET_DIGEST:
425 setDigest((Digest)evt.getArg());
426 return;
427
428 case Event.MERGE_DIGEST:
429 mergeDigest((Digest)evt.getArg());
430 return;
431
432 case Event.CONFIG:
433 passDown(evt);
434 if(log.isDebugEnabled()) {
435 log.debug("received CONFIG event: " + evt.getArg());
436 }
437 handleConfigEvent((HashMap)evt.getArg());
438 return;
439
440 case Event.TMP_VIEW:
441 mbrs=((View)evt.getArg()).getMembers();
442 members.removeAllElements();
443 members.addAll(mbrs);
444 adjustReceivers();
445 break;
446
447 case Event.VIEW_CHANGE:
448 mbrs=((View)evt.getArg()).getMembers();
449 members.removeAllElements();
450 members.addAll(mbrs);
451 adjustReceivers();
452 is_server=true; // check vids from now on
453
454 Set tmp=new LinkedHashSet(members);
455 tmp.add(null); // for null destination (= mcast)
456 sent.keySet().retainAll(tmp);
457 received.keySet().retainAll(tmp);
458 break;
459
460 case Event.BECOME_SERVER:
461 is_server=true;
462 break;
463
464 case Event.DISCONNECT:
465 leaving=true;
466 removeAll();
467 seqno=0;
468 break;
469 }
470
471 passDown(evt);
472 }
473
474
475
476 /**
477 * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this
478 * method as the event is passed up by default by the superclass after this method returns !</b>
479 */
480 public void up(Event evt) {
481 NakAckHeader hdr;
482 Message msg;
483 Digest digest;
484
485 switch(evt.getType()) {
486
487
488 case Event.MSG:
489 msg=(Message)evt.getArg();
490 hdr=(NakAckHeader)msg.getHeader(name);
491 if(hdr == null)
492 break; // pass up (e.g. unicast msg)
493
494 // discard messages while not yet server (i.e., until JOIN has returned)
495 if(!is_server) {
496 if(trace)
497 log.trace("message was discarded (not yet server)");
498 return;
499 }
500
501 // Changed by bela Jan 29 2003: we must not remove the header, otherwise
502 // further xmit requests will fail !
503 //hdr=(NakAckHeader)msg.removeHeader(getName());
504
505 switch(hdr.type) {
506
507 case NakAckHeader.MSG:
508 handleMessage(msg, hdr);
509 return; // transmitter passes message up for us !
510
511 case NakAckHeader.XMIT_REQ:
512 if(hdr.range == null) {
513 if(log.isErrorEnabled()) {
514 log.error("XMIT_REQ: range of xmit msg is null; discarding request from " + msg.getSrc());
515 }
516 return;
517 }
518 handleXmitReq(msg.getSrc(), hdr.range.low, hdr.range.high, hdr.sender);
519 return;
520
521 case NakAckHeader.XMIT_RSP:
522 if(trace)
523 log.trace("received missing messages " + hdr.range);
524 handleXmitRsp(msg);
525 return;
526
527 default:
528 if(log.isErrorEnabled()) {
529 log.error("NakAck header type " + hdr.type + " not known !");
530 }
531 return;
532 }
533
534 case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
535 stable((Digest)evt.getArg());
536 return; // do not pass up further (Bela Aug 7 2001)
537
538 case Event.GET_DIGEST:
539 digest=getDigestHighestDeliveredMsgs();
540 passDown(new Event(Event.GET_DIGEST_OK, digest));
541 return;
542
543 case Event.GET_DIGEST_STABLE:
544 digest=getDigestHighestDeliveredMsgs();
545 passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest));
546 return;
547
548 case Event.SET_LOCAL_ADDRESS:
549 local_addr=(Address)evt.getArg();
550 break;
551
552 case Event.CONFIG:
553 passUp(evt);
554 if(log.isDebugEnabled()) {
555 log.debug("received CONFIG event: " + evt.getArg());
556 }
557 handleConfigEvent((HashMap)evt.getArg());
558 return;
559 }
560 passUp(evt);
561 }
562
563
564
565
566
567 /* --------------------------------- Private Methods --------------------------------------- */
568
569 private synchronized long getNextSeqno() {
570 return seqno++;
571 }
572
573
574 /**
575 * Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't
576 * store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a
577 * message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details.
578 */
579 private void send(Event evt, Message msg) {
580 long msg_id=getNextSeqno();
581 if(trace)
582 log.trace("sending msg #" + msg_id);
583
584 msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG, msg_id));
585 synchronized(sent_msgs) {
586 if(Global.copy) {
587 sent_msgs.put(new Long(msg_id), msg.copy());
588 }
589 else {
590 sent_msgs.put(new Long(msg_id), msg);
591 }
592 }
593 passDown(evt);
594 }
595
596
597 /**
598 * Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many
599 * messages as possible from the NRW and passes them up the stack. Discards messages from non-members.
600 */
601 private void handleMessage(Message msg, NakAckHeader hdr) {
602 NakReceiverWindow win;
603 Message msg_to_deliver;
604 Address sender=msg.getSrc();
605
606 if(sender == null) {
607 if(log.isErrorEnabled())
608 log.error("sender of message is null");
609 return;
610 }
611
612 if(trace) {
613 StringBuffer sb=new StringBuffer('[');
614 sb.append(local_addr).append("] received ").append(sender).append('#').append(hdr.seqno);
615 log.trace(sb.toString());
616 }
617
618 // msg is potentially re-sent later as result of XMIT_REQ reception; that's why hdr is added !
619
620 // Changed by bela Jan 29 2003: we currently don't resend from received msgs, just from sent_msgs !
621 // msg.putHeader(getName(), hdr);
622
623 synchronized(received_msgs) {
624 win=(NakReceiverWindow)received_msgs.get(sender);
625 }
626 if(win == null) { // discard message if there is no entry for sender
627 if(leaving)
628 return;
629 if(warn) {
630 StringBuffer sb=new StringBuffer('[');
631 sb.append(local_addr).append("] discarded message from non-member ").append(sender);
632 if(warn)
633 log.warn(sb.toString());
634 }
635 return;
636 }
637 win.add(hdr.seqno, msg); // add in order, then remove and pass up as many msgs as possible
638
639 while((msg_to_deliver=win.remove()) != null) {
640
641 // Changed by bela Jan 29 2003: not needed (see above)
642 //msg_to_deliver.removeHeader(getName());
643 passUp(new Event(Event.MSG, msg_to_deliver));
644 }
645 }
646
647
648 /**
649 * Retransmit from sent-table, called when XMIT_REQ is received. Bundles all messages to be xmitted into one large
650 * message and sends them back with an XMIT_RSP header. Note that since we cannot count on a fragmentation layer
651 * below us, we have to make sure the message doesn't exceed max_xmit_size bytes. If this is the case, we split the
652 * message into multiple, smaller-chunked messages. But in most cases this still yields fewer messages than if each
653 * requested message was retransmitted separately.
654 *
655 * @param xmit_requester The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
656 * @param first_seqno The first sequence number to be retransmitted (<= last_seqno)
657 * @param last_seqno The last sequence number to be retransmitted (>= first_seqno)
658 * @param original_sender The member who originally sent the messsage. Guaranteed to be non-null
659 */
660 private void handleXmitReq(Address xmit_requester, long first_seqno, long last_seqno, Address original_sender) {
661 Message m, tmp;
662 LinkedList list;
663 long size=0, marker=first_seqno, len;
664 NakReceiverWindow win=null;
665 boolean amISender; // am I the original sender ?
666
667 if(trace) {
668 StringBuffer sb=new StringBuffer();
669 sb.append(local_addr).append(": received xmit request from ").append(xmit_requester).append(" for ");
670 sb.append(original_sender).append(" [").append(first_seqno).append(" - ").append(last_seqno).append("]");
671 log.trace(sb.toString());
672 }
673
674 if(first_seqno > last_seqno) {
675 if(log.isErrorEnabled())
676 log.error("first_seqno (" + first_seqno + ") > last_seqno (" + last_seqno + "): not able to retransmit");
677 return;
678 }
679
680 if(stats) {
681 xmit_reqs_received+=last_seqno - first_seqno +1;
682 updateStats(received, xmit_requester, 1, 0, 0);
683 }
684
685 amISender=local_addr.equals(original_sender);
686 if(!amISender)
687 win=(NakReceiverWindow)received_msgs.get(original_sender);
688
689 list=new LinkedList();
690 for(long i=first_seqno; i <= last_seqno; i++) {
691 if(amISender) {
692 m=(Message)sent_msgs.get(new Long(i)); // no need to synchronize
693 }
694 else {
695 m=win != null? win.get(i) : null;
696 }
697 if(m == null) {
698 if(log.isErrorEnabled()) {
699 StringBuffer sb=new StringBuffer();
700 sb.append("(requester=").append(xmit_requester).append(", local_addr=").append(this.local_addr);
701 sb.append(") message ").append(original_sender).append("::").append(i);
702 sb.append(" not found in ").append((amISender? "sent" : "received")).append(" msgs. ");
703 if(win != null) {
704 sb.append("Received messages from ").append(original_sender).append(": ").append(win.toString());
705 }
706 else {
707 sb.append("\nSent messages: ").append(printSentMsgs());
708 }
709 log.error(sb.toString());
710 }
711 continue;
712 }
713 len=m.size();
714 size+=len;
715 if(size > max_xmit_size && list.size() > 0) { // changed from >= to > (yaron-r, bug #943709)
716 // yaronr: added &&listSize()>0 since protocols between FRAG and NAKACK add headers, and message exceeds size.
717
718 // size has reached max_xmit_size. go ahead and send message (excluding the current message)
719 if(trace)
720 log.trace("xmitting msgs [" + marker + '-' + (i - 1) + "] to " + xmit_requester);
721 sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, i - 1);
722 marker=i;
723 list.clear();
724 // fixed Dec 15 2003 (bela, patch from Joel Dice (dicej)), see explanantion under
725 // bug report #854887
726 size=len;
727 }
728 if(Global.copy) {
729 tmp=m.copy();
730 }
731 else {
732 tmp=m;
733 }
734 // tmp.setDest(xmit_requester);
735 // tmp.setSrc(local_addr);
736 if(tmp.getSrc() == null)
737 tmp.setSrc(local_addr);
738 list.add(tmp);
739 }
740
741 if(list.size() > 0) {
742 if(trace)
743 log.trace("xmitting msgs [" + marker + '-' + last_seqno + "] to " + xmit_requester);
744 sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, last_seqno);
745 list.clear();
746 }
747 }
748
749 private static void updateStats(HashMap map, Address key, int req, int rsp, int missing) {
750 Entry entry=(Entry)map.get(key);
751 if(entry == null) {
752 entry=new Entry();
753 map.put(key, entry);
754 }
755 entry.xmit_reqs+=req;
756 entry.xmit_rsps+=rsp;
757 entry.missing_msgs_rcvd+=missing;
758 }
759
760 private void sendXmitRsp(Address dest, LinkedList xmit_list, long first_seqno, long last_seqno) {
761 Buffer buf;
762 if(xmit_list == null || xmit_list.size() == 0) {
763 if(log.isErrorEnabled())
764 log.error("xmit_list is empty");
765 return;
766 }
767 if(use_mcast_xmit)
768 dest=null;
769
770 if(stats) {
771 xmit_rsps_sent+=xmit_list.size();
772 updateStats(sent, dest, 0, 1, 0);
773 }
774
775 try {
776 buf=Util.msgListToByteBuffer(xmit_list);
777 Message msg=new Message(dest, null, buf.getBuf(), buf.getOffset(), buf.getLength());
778 msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP, first_seqno, last_seqno));
779 passDown(new Event(Event.MSG, msg));
780 }
781 catch(IOException ex) {
782 log.error("failed marshalling xmit list", ex);
783 }
784 }
785
786
787 private void handleXmitRsp(Message msg) {
788 LinkedList list;
789 Message m;
790
791 if(msg == null) {
792 if(warn)
793 log.warn("message is null");
794 return;
795 }
796 try {
797 list=Util.byteBufferToMessageList(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
798 if(list != null) {
799 if(stats) {
800 xmit_rsps_received+=list.size();
801 updateStats(received, msg.getSrc(), 0, 1, 0);
802 }
803 for(Iterator it=list.iterator(); it.hasNext();) {
804 m=(Message)it.next();
805 up(new Event(Event.MSG, m));
806 }
807 list.clear();
808 }
809 }
810 catch(Exception ex) {
811 if(log.isErrorEnabled()) {
812 log.error("message did not contain a list (LinkedList) of retransmitted messages: " + ex);
813 }
814 }
815 }
816
817
818
819
820 /**
821 * Remove old members from NakReceiverWindows and add new members (starting seqno=0). Essentially removes all
822 * entries from received_msgs that are not in <code>members</code>
823 */
824 private void adjustReceivers() {
825 Address sender;
826 NakReceiverWindow win;
827
828 synchronized(received_msgs) {
829
830 // 1. Remove all senders in received_msgs that are not members anymore
831 for(Iterator it=received_msgs.keySet().iterator(); it.hasNext();) {
832 sender=(Address)it.next();
833 if(!members.contains(sender)) {
834 win=(NakReceiverWindow)received_msgs.get(sender);
835 win.reset();
836 if(log.isDebugEnabled()) {
837 log.debug("removing " + sender + " from received_msgs (not member anymore)");
838 }
839 it.remove();
840 }
841 }
842
843 // 2. Add newly joined members to received_msgs (starting seqno=0)
844 for(int i=0; i < members.size(); i++) {
845 sender=(Address)members.elementAt(i);
846 if(!received_msgs.containsKey(sender)) {
847 win=createNakReceiverWindow(sender, 0);
848 received_msgs.put(sender, win);
849 }
850 }
851 }
852 }
853
854
855 /**
856 * Returns a message digest: for each member P the highest seqno received from P is added to the digest.
857 */
858 private Digest getDigest() {
859 Digest digest;
860 Address sender;
861 Range range;
862
863 digest=new Digest(members.size());
864 for(int i=0; i < members.size(); i++) {
865 sender=(Address)members.elementAt(i);
866 range=getLowestAndHighestSeqno(sender, false); // get the highest received seqno
867 if(range == null) {
868 if(log.isErrorEnabled()) {
869 log.error("range is null");
870 }
871 continue;
872 }
873 digest.add(sender, range.low, range.high); // add another entry to the digest
874 }
875 return digest;
876 }
877
878
879 /**
880 * Returns a message digest: for each member P the highest seqno received from P <em>without a gap</em> is added to
881 * the digest. E.g. if the seqnos received from P are [+3 +4 +5 -6 +7 +8], then 5 will be returned. Also, the
882 * highest seqno <em>seen</em> is added. The max of all highest seqnos seen will be used (in STABLE) to determine
883 * whether the last seqno from a sender was received (see "Last Message Dropped" topic in DESIGN).
884 */
885 private Digest getDigestHighestDeliveredMsgs() {
886 Digest digest;
887 Address sender;
888 Range range;
889 long high_seqno_seen;
890
891 digest=new Digest(members.size());
892 for(int i=0; i < members.size(); i++) {
893 sender=(Address)members.elementAt(i);
894 range=getLowestAndHighestSeqno(sender, true); // get the highest deliverable seqno
895 if(range == null) {
896 if(log.isErrorEnabled()) {
897 log.error("range is null");
898 }
899 continue;
900 }
901 high_seqno_seen=getHighSeqnoSeen(sender);
902 digest.add(sender, range.low, range.high, high_seqno_seen); // add another entry to the digest
903 }
904 return digest;
905 }
906
907
908 /**
909 * Creates a NakReceiverWindow for each sender in the digest according to the sender's seqno. If NRW already exists,
910 * reset it.
911 */
912 private void setDigest(Digest d) {
913 if(d == null || d.senders == null) {
914 if(log.isErrorEnabled()) {
915 log.error("digest or digest.senders is null");
916 }
917 return;
918 }
919
920 clear();
921
922 Map.Entry entry;
923 Address sender;
924 org.jgroups.protocols.pbcast.Digest.Entry val;
925 long initial_seqno;
926 NakReceiverWindow win;
927
928 for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
929 entry=(Map.Entry)it.next();
930 sender=(Address)entry.getKey();
931 val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
932
933 if(sender == null || val == null) {
934 if(warn) {
935 log.warn("sender or value is null");
936 }
937 continue;
938 }
939 initial_seqno=val.high_seqno;
940 win=createNakReceiverWindow(sender, initial_seqno);
941 synchronized(received_msgs) {
942 received_msgs.put(sender, win);
943 }
944 }
945 }
946
947
948 /**
949 * For all members of the digest, adjust the NakReceiverWindows in the received_msgs hashtable. If the member
950 * already exists, sets its seqno to be the max of the seqno and the seqno of the member in the digest. If no entry
951 * exists, create one with the initial seqno set to the seqno of the member in the digest.
952 */
953 private void mergeDigest(Digest d) {
954 if(d == null || d.senders == null) {
955 if(log.isErrorEnabled()) {
956 log.error("digest or digest.senders is null");
957 }
958 return;
959 }
960
961 Map.Entry entry;
962 Address sender;
963 org.jgroups.protocols.pbcast.Digest.Entry val;
964 NakReceiverWindow win;
965 long initial_seqno;
966
967 for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
968 entry=(Map.Entry)it.next();
969 sender=(Address)entry.getKey();
970 val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
971
972 if(sender == null || val == null) {
973 if(warn) {
974 log.warn("sender or value is null");
975 }
976 continue;
977 }
978 initial_seqno=val.high_seqno;
979 synchronized(received_msgs) {
980 win=(NakReceiverWindow)received_msgs.get(sender);
981 if(win == null) {
982 win=createNakReceiverWindow(sender, initial_seqno);
983 received_msgs.put(sender, win);
984 }
985 else {
986 if(win.getHighestReceived() < initial_seqno) {
987 win.reset();
988 received_msgs.remove(sender);
989 win=createNakReceiverWindow(sender, initial_seqno);
990 received_msgs.put(sender, win);
991 }
992 }
993 }
994 }
995 }
996
997
998 private NakReceiverWindow createNakReceiverWindow(Address sender, long initial_seqno) {
999 NakReceiverWindow win=new NakReceiverWindow(sender, this, initial_seqno, timer);
1000 win.setRetransmitTimeouts(retransmit_timeout);
1001 win.setDiscardDeliveredMessages(discard_delivered_msgs);
1002 win.setMaxXmitBufSize(this.max_xmit_buf_size);
1003 if(stats)
1004 win.setListener(this);
1005 return win;
1006 }
1007
1008
1009 /**
1010 * Returns the lowest seqno still in cache (so it can be retransmitted) and the highest seqno received so far.
1011 *
1012 * @param sender The address for which the highest and lowest seqnos are to be retrieved
1013 * @param stop_at_gaps If true, the highest seqno *deliverable* will be returned. If false, the highest seqno
1014 * *received* will be returned. E.g. for [+3 +4 +5 -6 +7 +8], the highest_seqno_received is 8,
1015 * whereas the higheset_seqno_seen (deliverable) is 5.
1016 */
1017 private Range getLowestAndHighestSeqno(Address sender, boolean stop_at_gaps) {
1018 Range r=null;
1019 NakReceiverWindow win;
1020
1021 if(sender == null) {
1022 if(log.isErrorEnabled()) {
1023 log.error("sender is null");
1024 }
1025 return r;
1026 }
1027 synchronized(received_msgs) {
1028 win=(NakReceiverWindow)received_msgs.get(sender);
1029 }
1030 if(win == null) {
1031 if(log.isErrorEnabled()) {
1032 log.error("sender " + sender + " not found in received_msgs");
1033 }
1034 return r;
1035 }
1036 if(stop_at_gaps) {
1037 r=new Range(win.getLowestSeen(), win.getHighestSeen()); // deliverable messages (no gaps)
1038 }
1039 else {
1040 r=new Range(win.getLowestSeen(), win.getHighestReceived() + 1); // received messages
1041 }
1042 return r;
1043 }
1044
1045
1046 /**
1047 * Returns the highest seqno seen from sender. E.g. if we received 1, 2, 4, 5 from P, then 5 will be returned
1048 * (doesn't take gaps into account). If we are the sender, we will return the highest seqno <em>sent</em> rather
1049 * then <em>received</em>
1050 */
1051 private long getHighSeqnoSeen(Address sender) {
1052 NakReceiverWindow win;
1053 long ret=0;
1054
1055 if(sender == null) {
1056 if(log.isErrorEnabled()) {
1057 log.error("sender is null");
1058 }
1059 return ret;
1060 }
1061 if(sender.equals(local_addr)) {
1062 return seqno - 1;
1063 }
1064
1065 synchronized(received_msgs) {
1066 win=(NakReceiverWindow)received_msgs.get(sender);
1067 }
1068 if(win == null) {
1069 if(log.isErrorEnabled()) {
1070 log.error("sender " + sender + " not found in received_msgs");
1071 }
1072 return ret;
1073 }
1074 ret=win.getHighestReceived();
1075 return ret;
1076 }
1077
1078
1079 /**
1080 * Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest
1081 * which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update received_msgs:
1082 * for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the
1083 * NakReceiverWindow corresponding to P which are <= seqno at digest[P].
1084 */
1085 private void stable(Digest d) {
1086 NakReceiverWindow recv_win;
1087 long my_highest_rcvd; // highest seqno received in my digest for a sender P
1088 long stability_highest_rcvd; // highest seqno received in the stability vector for a sender P
1089
1090 if(members == null || local_addr == null || d == null) {
1091 if(warn)
1092 log.warn("members, local_addr or digest are null !");
1093 return;
1094 }
1095
1096 if(trace) {
1097 log.trace("received stable digest " + d);
1098 }
1099
1100 Map.Entry entry;
1101 Address sender;
1102 org.jgroups.protocols.pbcast.Digest.Entry val;
1103 long high_seqno_delivered, high_seqno_received;
1104
1105 for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
1106 entry=(Map.Entry)it.next();
1107 sender=(Address)entry.getKey();
1108 if(sender == null)
1109 continue;
1110 val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
1111 high_seqno_delivered=val.high_seqno;
1112 high_seqno_received=val.high_seqno_seen;
1113
1114
1115 // check whether the last seqno received for a sender P in the stability vector is > last seqno
1116 // received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic
1117 // in DESIGN)
1118 synchronized(received_msgs) {
1119 recv_win=(NakReceiverWindow)received_msgs.get(sender);
1120 }
1121 if(recv_win != null) {
1122 my_highest_rcvd=recv_win.getHighestReceived();
1123 stability_highest_rcvd=high_seqno_received;
1124
1125 if(stability_highest_rcvd >= 0 && stability_highest_rcvd > my_highest_rcvd) {
1126 if(trace) {
1127 log.trace("my_highest_rcvd (" + my_highest_rcvd + ") < stability_highest_rcvd (" +
1128 stability_highest_rcvd + "): requesting retransmission of " +
1129 sender + '#' + stability_highest_rcvd);
1130 }
1131 retransmit(stability_highest_rcvd, stability_highest_rcvd, sender);
1132 }
1133 }
1134
1135 high_seqno_delivered-=gc_lag;
1136 if(high_seqno_delivered < 0) {
1137 continue;
1138 }
1139
1140 if(trace)
1141 log.trace("deleting msgs <= " + high_seqno_delivered + " from " + sender);
1142
1143 // garbage collect from sent_msgs if sender was myself
1144 if(sender.equals(local_addr)) {
1145 synchronized(sent_msgs) {
1146 // gets us a subset from [lowest seqno - seqno]
1147 SortedMap stable_keys=sent_msgs.headMap(new Long(high_seqno_delivered));
1148 if(stable_keys != null) {
1149 stable_keys.clear(); // this will modify sent_msgs directly
1150 }
1151 }
1152 }
1153
1154 // delete *delivered* msgs that are stable
1155 // recv_win=(NakReceiverWindow)received_msgs.get(sender);
1156 if(recv_win != null)
1157 recv_win.stable(high_seqno_delivered); // delete all messages with seqnos <= seqno
1158 }
1159 }
1160
1161
1162
1163 /* ---------------------- Interface Retransmitter.RetransmitCommand ---------------------- */
1164
1165
1166 /**
1167 * Implementation of Retransmitter.RetransmitCommand. Called by retransmission thread when gap is detected.
1168 */
1169 public void retransmit(long first_seqno, long last_seqno, Address sender) {
1170 NakAckHeader hdr;
1171 Message retransmit_msg;
1172 Address dest=sender; // to whom do we send the XMIT request ?
1173
1174 if(xmit_from_random_member && !local_addr.equals(sender)) {
1175 Address random_member=(Address)Util.pickRandomElement(members);
1176 if(random_member != null && !local_addr.equals(random_member)) {
1177 dest=random_member;
1178 if(trace)
1179 log.trace("picked random member " + dest + " to send XMIT request to");
1180 }
1181 }
1182
1183 hdr=new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno, last_seqno, sender);
1184 retransmit_msg=new Message(dest, null, null);
1185 if(trace)
1186 log.trace(local_addr + ": sending XMIT_REQ ([" + first_seqno + ", " + last_seqno + "]) to " + dest);
1187 retransmit_msg.putHeader(name, hdr);
1188 passDown(new Event(Event.MSG, retransmit_msg));
1189 if(stats) {
1190 xmit_reqs_sent+=last_seqno - first_seqno +1;
1191 updateStats(sent, dest, 1, 0, 0);
1192 for(long i=first_seqno; i <= last_seqno; i++) {
1193 XmitRequest req=new XmitRequest(sender, i, dest);
1194 send_history.add(req);
1195 }
1196 }
1197 }
1198 /* ------------------- End of Interface Retransmitter.RetransmitCommand -------------------- */
1199
1200
1201
1202 /* ----------------------- Interface NakReceiverWindow.Listener ---------------------- */
1203 public void missingMessageReceived(long seqno, Message msg) {
1204 if(stats) {
1205 missing_msgs_received++;
1206 updateStats(received, msg.getSrc(), 0, 0, 1);
1207 MissingMessage missing=new MissingMessage(msg.getSrc(), seqno);
1208 receive_history.add(missing);
1209 }
1210 }
1211 /* ------------------- End of Interface NakReceiverWindow.Listener ------------------- */
1212
1213 private void clear() {
1214 NakReceiverWindow win;
1215
1216 // changed April 21 2004 (bela): SourceForge bug# 938584. We cannot delete our own messages sent between
1217 // a join() and a getState(). Otherwise retransmission requests from members who missed those msgs might
1218 // fail. Not to worry though: those msgs will be cleared by STABLE (message garbage collection)
1219
1220 // sent_msgs.clear();
1221
1222 synchronized(received_msgs) {
1223 for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
1224 win=(NakReceiverWindow)it.next();
1225 win.reset();
1226 }
1227 received_msgs.clear();
1228 }
1229 }
1230
1231
1232 private void removeAll() {
1233 NakReceiverWindow win;
1234
1235 synchronized(sent_msgs) {
1236 sent_msgs.clear();
1237 }
1238
1239 synchronized(received_msgs) {
1240 for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
1241 win=(NakReceiverWindow)it.next();
1242 win.destroy();
1243 }
1244 received_msgs.clear();
1245 }
1246 }
1247
1248
1249 public String printMessages() {
1250 StringBuffer ret=new StringBuffer();
1251 Map.Entry entry;
1252 Address addr;
1253 Object w;
1254
1255 ret.append("\nsent_msgs: ").append(printSentMsgs());
1256 ret.append("\nreceived_msgs:\n");
1257 synchronized(received_msgs) {
1258 for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
1259 entry=(Map.Entry)it.next();
1260 addr=(Address)entry.getKey();
1261 w=entry.getValue();
1262 ret.append(addr).append(": ").append(w.toString()).append('\n');
1263 }
1264 }
1265 return ret.toString();
1266 }
1267
1268
1269 public String printSentMsgs() {
1270 StringBuffer sb=new StringBuffer();
1271 Long min_seqno, max_seqno;
1272 synchronized(sent_msgs) {
1273 min_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.firstKey() : new Long(0);
1274 max_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.lastKey() : new Long(0);
1275 }
1276 sb.append('[').append(min_seqno).append(" - ").append(max_seqno).append("] (" + sent_msgs.size() + ")");
1277 return sb.toString();
1278 }
1279
1280
1281 private void handleConfigEvent(HashMap map) {
1282 if(map == null) {
1283 return;
1284 }
1285 if(map.containsKey("frag_size")) {
1286 max_xmit_size=((Integer)map.get("frag_size")).intValue();
1287 if(log.isInfoEnabled()) {
1288 log.info("max_xmit_size=" + max_xmit_size);
1289 }
1290 }
1291 }
1292
1293
1294 static class Entry {
1295 long xmit_reqs, xmit_rsps, missing_msgs_rcvd;
1296
1297 public String toString() {
1298 StringBuffer sb=new StringBuffer();
1299 sb.append(xmit_reqs).append(" xmit_reqs").append(", ").append(xmit_rsps).append(" xmit_rsps");
1300 sb.append(", ").append(missing_msgs_rcvd).append(" missing msgs");
1301 return sb.toString();
1302 }
1303 }
1304
1305 static class XmitRequest {
1306 Address original_sender; // original sender of message
1307 long seq, timestamp=System.currentTimeMillis();
1308 Address xmit_dest; // destination to which XMIT_REQ is sent, usually the original sender
1309
1310 XmitRequest(Address original_sender, long seqno, Address xmit_dest) {
1311 this.original_sender=original_sender;
1312 this.xmit_dest=xmit_dest;
1313 this.seq=seqno;
1314 }
1315
1316 public String toString() {
1317 StringBuffer sb=new StringBuffer();
1318 sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq);
1319 sb.append(" (XMIT_REQ sent to ").append(xmit_dest).append(")");
1320 return sb.toString();
1321 }
1322 }
1323
1324 static class MissingMessage {
1325 Address original_sender;
1326 long seq, timestamp=System.currentTimeMillis();
1327
1328 MissingMessage(Address original_sender, long seqno) {
1329 this.original_sender=original_sender;
1330 this.seq=seqno;
1331 }
1332
1333 public String toString() {
1334 StringBuffer sb=new StringBuffer();
1335 sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq);
1336 return sb.toString();
1337 }
1338 }
1339
1340 /* ----------------------------- End of Private Methods ------------------------------------ */
1341
1342
1343}