Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/pbcast/NAKACK.java


1   // $Id: NAKACK.java,v 1.60 2005/11/08 11:08:08 belaban Exp $
2   
3   package org.jgroups.protocols.pbcast;
4   
5   import org.jgroups.*;
6   import org.jgroups.stack.NakReceiverWindow;
7   import org.jgroups.stack.Protocol;
8   import org.jgroups.stack.Retransmitter;
9   import org.jgroups.util.*;
10  
11  import java.util.*;
12  import java.io.*;
13  
14  
15  /**
16   * Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno).
17   * Receivers deliver messages ordered according to seqno and request retransmission of missing messages. Retransmitted
18   * messages are bundled into bigger ones, e.g. when getting an xmit request for messages 1-10, instead of sending 10
19   * unicast messages, we bundle all 10 messages into 1 and send it. However, since this protocol typically sits below
20   * FRAG, we cannot count on FRAG to fragement/defragment the (possibly) large message into smaller ones. Therefore we
21   * only bundle messages up to max_xmit_size bytes to prevent too large messages. For example, if the bundled message
22   * size was a total of 34000 bytes, and max_xmit_size=16000, we'd send 3 messages: 2 16K and a 2K message. <em>Note that
23   * max_xmit_size should be the same value as FRAG.frag_size (or smaller).</em><br/> Retransmit requests are always sent
24   * to the sender. If the sender dies, and not everyone has received its messages, they will be lost. In the future, this
25   * may be changed to have receivers store all messages, so that retransmit requests can be answered by any member.
26   * Trivial to implement, but not done yet. For most apps, the default retransmit properties are sufficient, if not use
27   * vsync.
28   *
29   * @author Bela Ban
30   */
31  public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand, NakReceiverWindow.Listener {
32      private long[]  retransmit_timeout={600, 1200, 2400, 4800}; // time(s) to wait before requesting retransmission
33      private boolean is_server=false;
34      private Address local_addr=null;
35      private final Vector  members=new Vector(11);
36      private long    seqno=0;                                   // current message sequence number (starts with 0)
37      private long    max_xmit_size=8192;                        // max size of a retransmit message (otherwise send multiple)
38      private int     gc_lag=20;                                 // number of msgs garbage collection lags behind
39  
40      /**
41       * Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a
42       * message, the sender only retransmits once.
43       */
44      private boolean use_mcast_xmit=false;
45  
46      /**
47       * Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be
48       * set to false
49       */
50      private boolean xmit_from_random_member=false;
51  
52  
53      /**
54       * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
55       * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
56       * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
57       * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
58       * around, and don't need to wait for garbage collection to remove them.
59       */
60      private boolean discard_delivered_msgs=false;
61  
62      /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
63       * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
64       */
65      private int max_xmit_buf_size=0;
66  
67  
68      /**
69       * Hashtable<Address,NakReceiverWindow>. Stores received messages (keyed by sender). Note that this is no long term
70       * storage; messages are just stored until they can be delivered (ie., until the correct FIFO order is established)
71       */
72      private final HashMap received_msgs=new HashMap(11);
73  
74      /** TreeMap<Long,Message>. Map of messages sent by me (keyed and sorted on sequence number) */
75      private final TreeMap sent_msgs=new TreeMap();
76  
77      private boolean leaving=false;
78      private TimeScheduler timer=null;
79      private static final String name="NAKACK";
80  
81      private long xmit_reqs_received;
82      private long xmit_reqs_sent;
83      private long xmit_rsps_received;
84      private long xmit_rsps_sent;
85      private long missing_msgs_received;
86  
87      /** Captures stats on XMIT_REQS, XMIT_RSPS per sender */
88      private HashMap sent=new HashMap();
89  
90      /** Captures stats on XMIT_REQS, XMIT_RSPS per receiver */
91      private HashMap received=new HashMap();
92  
93      private int stats_list_size=20;
94  
95      /** BoundedList<XmitRequest>. Keeps track of the last stats_list_size XMIT requests */
96      private BoundedList receive_history;
97  
98      /** BoundedList<MissingMessage>. Keeps track of the last stats_list_size missing messages received */
99      private BoundedList send_history;
100 
101 
102 
103 
104 
105     public NAKACK() {
106     }
107 
108 
109     public String getName() {
110         return name;
111     }
112 
113     public long getXmitRequestsReceived() {return xmit_reqs_received;}
114     public long getXmitRequestsSent() {return xmit_reqs_sent;}
115     public long getXmitResponsesReceived() {return xmit_rsps_received;}
116     public long getXmitResponsesSent() {return xmit_rsps_sent;}
117     public long getMissingMessagesReceived() {return missing_msgs_received;}
118 
119     public int getPendingRetransmissionRequests() {
120         int num=0;
121         NakReceiverWindow win;
122         synchronized(received_msgs) {
123             for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
124                 win=(NakReceiverWindow)it.next();
125                 num+=win.size();
126             }
127         }
128         return num;
129     }
130 
131     public int getSentTableSize() {
132         return sent_msgs.size();
133     }
134 
135     public int getReceivedTableSize() {
136         int ret=0;
137         NakReceiverWindow win;
138         Set s=new LinkedHashSet(received_msgs.values());
139         for(Iterator it=s.iterator(); it.hasNext();) {
140             win=(NakReceiverWindow)it.next();
141             ret+=win.size();
142         }
143         return ret;
144     }
145 
146     public void resetStats() {
147         xmit_reqs_received=xmit_reqs_sent=xmit_rsps_received=xmit_rsps_sent=missing_msgs_received=0;
148         sent.clear();
149         received.clear();
150         if(receive_history !=null)
151             receive_history.removeAll();
152         if(send_history != null)
153             send_history.removeAll();
154     }
155 
156     public void init() throws Exception {
157         if(stats) {
158             send_history=new BoundedList(stats_list_size);
159             receive_history=new BoundedList(stats_list_size);
160         }
161     }
162 
163 
164     public int getGcLag() {
165         return gc_lag;
166     }
167 
168     public void setGcLag(int gc_lag) {
169         this.gc_lag=gc_lag;
170     }
171 
172     public boolean isUseMcastXmit() {
173         return use_mcast_xmit;
174     }
175 
176     public void setUseMcastXmit(boolean use_mcast_xmit) {
177         this.use_mcast_xmit=use_mcast_xmit;
178     }
179 
180     public boolean isXmitFromRandomMember() {
181         return xmit_from_random_member;
182     }
183 
184     public void setXmitFromRandomMember(boolean xmit_from_random_member) {
185         this.xmit_from_random_member=xmit_from_random_member;
186     }
187 
188     public boolean isDiscardDeliveredMsgs() {
189         return discard_delivered_msgs;
190     }
191 
192     public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {
193         this.discard_delivered_msgs=discard_delivered_msgs;
194     }
195 
196     public int getMaxXmitBufSize() {
197         return max_xmit_buf_size;
198     }
199 
200     public void setMaxXmitBufSize(int max_xmit_buf_size) {
201         this.max_xmit_buf_size=max_xmit_buf_size;
202     }
203 
204     public long getMaxXmitSize() {
205         return max_xmit_size;
206     }
207 
208     public void setMaxXmitSize(long max_xmit_size) {
209         this.max_xmit_size=max_xmit_size;
210     }
211 
212     public boolean setProperties(Properties props) {
213         String str;
214         long[] tmp;
215 
216         super.setProperties(props);
217         str=props.getProperty("retransmit_timeout");
218         if(str != null) {
219             tmp=Util.parseCommaDelimitedLongs(str);
220             props.remove("retransmit_timeout");
221             if(tmp != null && tmp.length > 0) {
222                 retransmit_timeout=tmp;
223             }
224         }
225 
226         str=props.getProperty("gc_lag");
227         if(str != null) {
228             gc_lag=Integer.parseInt(str);
229             if(gc_lag < 0) {
230                 log.error("NAKACK.setProperties(): gc_lag cannot be negative, setting it to 0");
231             }
232             props.remove("gc_lag");
233         }
234 
235         str=props.getProperty("max_xmit_size");
236         if(str != null) {
237             max_xmit_size=Long.parseLong(str);
238             props.remove("max_xmit_size");
239         }
240 
241         str=props.getProperty("use_mcast_xmit");
242         if(str != null) {
243             use_mcast_xmit=Boolean.valueOf(str).booleanValue();
244             props.remove("use_mcast_xmit");
245         }
246 
247         str=props.getProperty("discard_delivered_msgs");
248         if(str != null) {
249             discard_delivered_msgs=Boolean.valueOf(str).booleanValue();
250             props.remove("discard_delivered_msgs");
251         }
252 
253         str=props.getProperty("xmit_from_random_member");
254         if(str != null) {
255             xmit_from_random_member=Boolean.valueOf(str).booleanValue();
256             props.remove("xmit_from_random_member");
257         }
258 
259         str=props.getProperty("max_xmit_buf_size");
260         if(str != null) {
261             max_xmit_buf_size=Integer.parseInt(str);
262             props.remove("max_xmit_buf_size");
263         }
264 
265         str=props.getProperty("stats_list_size");
266         if(str != null) {
267             stats_list_size=Integer.parseInt(str);
268             props.remove("stats_list_size");
269         }
270 
271         if(xmit_from_random_member) {
272             if(discard_delivered_msgs) {
273                 discard_delivered_msgs=false;
274                 log.warn("xmit_from_random_member set to true: changed discard_delivered_msgs to false");
275             }
276         }
277 
278         if(props.size() > 0) {
279             log.error("NAKACK.setProperties(): these properties are not recognized: " + props);
280 
281             return false;
282         }
283         return true;
284     }
285 
286     public Map dumpStats() {
287         Map retval=super.dumpStats();
288         if(retval == null)
289             retval=new HashMap();
290 
291         retval.put("xmit_reqs_received", new Long(xmit_reqs_received));
292         retval.put("xmit_reqs_sent", new Long(xmit_reqs_sent));
293         retval.put("xmit_rsps_received", new Long(xmit_rsps_received));
294         retval.put("xmit_rsps_sent", new Long(xmit_rsps_sent));
295         retval.put("missing_msgs_received", new Long(missing_msgs_received));
296 
297         retval.put("sent_msgs", printSentMsgs());
298 
299         StringBuffer sb=new StringBuffer();
300         Map.Entry entry;
301         Address addr;
302         Object w;
303         synchronized(received_msgs) {
304             for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
305                 entry=(Map.Entry)it.next();
306                 addr=(Address)entry.getKey();
307                 w=entry.getValue();
308                 sb.append(addr).append(": ").append(w.toString()).append('\n');
309             }
310         }
311 
312         retval.put("received_msgs", sb.toString());
313         return retval;        
314     }
315 
316     public String printStats() {
317         Map.Entry entry;
318         Object key, val;
319         StringBuffer sb=new StringBuffer();
320         sb.append("sent:\n");
321         for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {
322             entry=(Map.Entry)it.next();
323             key=entry.getKey();
324             if(key == null) key="<mcast dest>";
325             val=entry.getValue();
326             sb.append(key).append(": ").append(val).append("\n");
327         }
328         sb.append("\nreceived:\n");
329         for(Iterator it=received.entrySet().iterator(); it.hasNext();) {
330             entry=(Map.Entry)it.next();
331             key=entry.getKey();
332             val=entry.getValue();
333             sb.append(key).append(": ").append(val).append("\n");
334         }
335 
336         sb.append("\nXMIT_REQS sent:\n");
337         XmitRequest tmp;
338         for(Enumeration en=send_history.elements(); en.hasMoreElements();) {
339             tmp=(XmitRequest)en.nextElement();
340             sb.append(tmp).append("\n");
341         }
342 
343         sb.append("\nMissing messages received\n");
344         MissingMessage missing;
345         for(Enumeration en=receive_history.elements(); en.hasMoreElements();) {
346             missing=(MissingMessage)en.nextElement();
347             sb.append(missing).append("\n");
348         }
349 
350         return sb.toString();
351     }
352 
353 
354 
355     public Vector providedUpServices() {
356         Vector retval=new Vector(5);
357         retval.addElement(new Integer(Event.GET_DIGEST));
358         retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
359         retval.addElement(new Integer(Event.GET_DIGEST_STATE));
360         retval.addElement(new Integer(Event.SET_DIGEST));
361         retval.addElement(new Integer(Event.MERGE_DIGEST));
362         return retval;
363     }
364 
365 
366     public Vector providedDownServices() {
367         Vector retval=new Vector(2);
368         retval.addElement(new Integer(Event.GET_DIGEST));
369         retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
370         return retval;
371     }
372 
373 
374     public void start() throws Exception {
375         timer=stack != null ? stack.timer : null;
376         if(timer == null) {
377             throw new Exception("NAKACK.up(): timer is null");
378         }
379     }
380 
381     public void stop() {
382         removeAll();  // clears sent_msgs and destroys all NakReceiverWindows
383     }
384 
385 
386     /**
387      * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>passDown()</code> in this
388      * method as the event is passed down by default by the superclass after this method returns !</b>
389      */
390     public void down(Event evt) {
391         Digest  digest;
392         Vector  mbrs;
393 
394         switch(evt.getType()) {
395 
396         case Event.MSG:
397             Message msg=(Message)evt.getArg();
398             Address dest=msg.getDest();
399             if(dest != null && !dest.isMulticastAddress()) {
400                 break; // unicast address: not null and not mcast, pass down unchanged
401             }
402             send(evt, msg);
403             return;    // don't pass down the stack
404 
405         case Event.STABLE:  // generated by STABLE layer. Delete stable messages passed in arg
406             stable((Digest)evt.getArg());
407             return;  // do not pass down further (Bela Aug 7 2001)
408 
409         case Event.GET_DIGEST:
410             digest=getDigest();
411             passUp(new Event(Event.GET_DIGEST_OK, digest != null ? digest.copy() : null));
412             return;
413 
414         case Event.GET_DIGEST_STABLE:
415             digest=getDigestHighestDeliveredMsgs();
416             passUp(new Event(Event.GET_DIGEST_STABLE_OK, digest != null ? digest.copy() : null));
417             return;
418 
419         case Event.GET_DIGEST_STATE:
420             digest=getDigest();
421             passUp(new Event(Event.GET_DIGEST_STATE_OK, digest != null ? digest.copy() : null));
422             return;
423 
424         case Event.SET_DIGEST:
425             setDigest((Digest)evt.getArg());
426             return;
427 
428         case Event.MERGE_DIGEST:
429             mergeDigest((Digest)evt.getArg());
430             return;
431 
432         case Event.CONFIG:
433             passDown(evt);
434             if(log.isDebugEnabled()) {
435                 log.debug("received CONFIG event: " + evt.getArg());
436             }
437             handleConfigEvent((HashMap)evt.getArg());
438             return;
439 
440         case Event.TMP_VIEW:
441             mbrs=((View)evt.getArg()).getMembers();
442             members.removeAllElements();
443             members.addAll(mbrs);
444             adjustReceivers();
445             break;
446 
447         case Event.VIEW_CHANGE:
448             mbrs=((View)evt.getArg()).getMembers();
449             members.removeAllElements();
450             members.addAll(mbrs);
451             adjustReceivers();
452             is_server=true;  // check vids from now on
453 
454             Set tmp=new LinkedHashSet(members);
455             tmp.add(null); // for null destination (= mcast)
456             sent.keySet().retainAll(tmp);
457             received.keySet().retainAll(tmp);
458             break;
459 
460         case Event.BECOME_SERVER:
461             is_server=true;
462             break;
463 
464         case Event.DISCONNECT:
465             leaving=true;
466             removeAll();
467             seqno=0;
468             break;
469         }
470 
471         passDown(evt);
472     }
473 
474 
475 
476     /**
477      * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this
478      * method as the event is passed up by default by the superclass after this method returns !</b>
479      */
480     public void up(Event evt) {
481         NakAckHeader hdr;
482         Message msg;
483         Digest digest;
484 
485         switch(evt.getType()) {
486 
487 
488         case Event.MSG:
489             msg=(Message)evt.getArg();
490             hdr=(NakAckHeader)msg.getHeader(name);
491             if(hdr == null)
492                 break;  // pass up (e.g. unicast msg)
493 
494             // discard messages while not yet server (i.e., until JOIN has returned)
495             if(!is_server) {
496                 if(trace)
497                     log.trace("message was discarded (not yet server)");
498                 return;
499             }
500 
501             // Changed by bela Jan 29 2003: we must not remove the header, otherwise
502             // further xmit requests will fail !
503             //hdr=(NakAckHeader)msg.removeHeader(getName());
504 
505             switch(hdr.type) {
506 
507             case NakAckHeader.MSG:
508                 handleMessage(msg, hdr);
509                 return;        // transmitter passes message up for us !
510 
511             case NakAckHeader.XMIT_REQ:
512                 if(hdr.range == null) {
513                     if(log.isErrorEnabled()) {
514                         log.error("XMIT_REQ: range of xmit msg is null; discarding request from " + msg.getSrc());
515                     }
516                     return;
517                 }
518                 handleXmitReq(msg.getSrc(), hdr.range.low, hdr.range.high, hdr.sender);
519                 return;
520 
521             case NakAckHeader.XMIT_RSP:
522                 if(trace)
523                     log.trace("received missing messages " + hdr.range);
524                 handleXmitRsp(msg);
525                 return;
526 
527             default:
528                 if(log.isErrorEnabled()) {
529                     log.error("NakAck header type " + hdr.type + " not known !");
530                 }
531                 return;
532             }
533 
534         case Event.STABLE:  // generated by STABLE layer. Delete stable messages passed in arg
535             stable((Digest)evt.getArg());
536             return;  // do not pass up further (Bela Aug 7 2001)
537 
538         case Event.GET_DIGEST:
539             digest=getDigestHighestDeliveredMsgs();
540             passDown(new Event(Event.GET_DIGEST_OK, digest));
541             return;
542 
543         case Event.GET_DIGEST_STABLE:
544             digest=getDigestHighestDeliveredMsgs();
545             passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest));
546             return;
547 
548         case Event.SET_LOCAL_ADDRESS:
549             local_addr=(Address)evt.getArg();
550             break;
551 
552         case Event.CONFIG:
553             passUp(evt);
554             if(log.isDebugEnabled()) {
555                 log.debug("received CONFIG event: " + evt.getArg());
556             }
557             handleConfigEvent((HashMap)evt.getArg());
558             return;
559         }
560         passUp(evt);
561     }
562 
563 
564 
565 
566 
567     /* --------------------------------- Private Methods --------------------------------------- */
568 
569     private synchronized long getNextSeqno() {
570         return seqno++;
571     }
572 
573 
574     /**
575      * Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't
576      * store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a
577      * message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details.
578      */
579     private void send(Event evt, Message msg) {
580         long msg_id=getNextSeqno();
581         if(trace)
582             log.trace("sending msg #" + msg_id);
583 
584         msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG, msg_id));
585         synchronized(sent_msgs) {
586             if(Global.copy) {
587                 sent_msgs.put(new Long(msg_id), msg.copy());
588             }
589             else {
590                 sent_msgs.put(new Long(msg_id), msg);
591             }
592         }
593         passDown(evt);
594     }
595 
596 
597     /**
598      * Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many
599      * messages as possible from the NRW and passes them up the stack. Discards messages from non-members.
600      */
601     private void handleMessage(Message msg, NakAckHeader hdr) {
602         NakReceiverWindow win;
603         Message msg_to_deliver;
604         Address sender=msg.getSrc();
605 
606         if(sender == null) {
607             if(log.isErrorEnabled())
608                 log.error("sender of message is null");
609             return;
610         }
611 
612         if(trace) {
613             StringBuffer sb=new StringBuffer('[');
614             sb.append(local_addr).append("] received ").append(sender).append('#').append(hdr.seqno);
615             log.trace(sb.toString());
616         }
617 
618         // msg is potentially re-sent later as result of XMIT_REQ reception; that's why hdr is added !
619 
620         // Changed by bela Jan 29 2003: we currently don't resend from received msgs, just from sent_msgs !
621         // msg.putHeader(getName(), hdr);
622 
623         synchronized(received_msgs) {
624             win=(NakReceiverWindow)received_msgs.get(sender);
625         }
626         if(win == null) {  // discard message if there is no entry for sender
627             if(leaving)
628                 return;
629             if(warn) {
630                 StringBuffer sb=new StringBuffer('[');
631                 sb.append(local_addr).append("] discarded message from non-member ").append(sender);
632                 if(warn)
633                     log.warn(sb.toString());
634             }
635             return;
636         }
637         win.add(hdr.seqno, msg);  // add in order, then remove and pass up as many msgs as possible
638 
639         while((msg_to_deliver=win.remove()) != null) {
640 
641             // Changed by bela Jan 29 2003: not needed (see above)
642             //msg_to_deliver.removeHeader(getName());
643             passUp(new Event(Event.MSG, msg_to_deliver));
644         }
645     }
646 
647 
648     /**
649      * Retransmit from sent-table, called when XMIT_REQ is received. Bundles all messages to be xmitted into one large
650      * message and sends them back with an XMIT_RSP header. Note that since we cannot count on a fragmentation layer
651      * below us, we have to make sure the message doesn't exceed max_xmit_size bytes. If this is the case, we split the
652      * message into multiple, smaller-chunked messages. But in most cases this still yields fewer messages than if each
653      * requested message was retransmitted separately.
654      *
655      * @param xmit_requester        The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
656      * @param first_seqno The first sequence number to be retransmitted (<= last_seqno)
657      * @param last_seqno  The last sequence number to be retransmitted (>= first_seqno)
658      * @param original_sender The member who originally sent the messsage. Guaranteed to be non-null
659      */
660     private void handleXmitReq(Address xmit_requester, long first_seqno, long last_seqno, Address original_sender) {
661         Message m, tmp;
662         LinkedList list;
663         long size=0, marker=first_seqno, len;
664         NakReceiverWindow win=null;
665         boolean      amISender; // am I the original sender ?
666 
667         if(trace) {
668             StringBuffer sb=new StringBuffer();
669             sb.append(local_addr).append(": received xmit request from ").append(xmit_requester).append(" for ");
670             sb.append(original_sender).append(" [").append(first_seqno).append(" - ").append(last_seqno).append("]");
671             log.trace(sb.toString());
672         }
673 
674         if(first_seqno > last_seqno) {
675             if(log.isErrorEnabled())
676                 log.error("first_seqno (" + first_seqno + ") > last_seqno (" + last_seqno + "): not able to retransmit");
677             return;
678         }
679 
680         if(stats) {
681             xmit_reqs_received+=last_seqno - first_seqno +1;
682             updateStats(received, xmit_requester, 1, 0, 0);
683         }
684 
685         amISender=local_addr.equals(original_sender);
686         if(!amISender)
687             win=(NakReceiverWindow)received_msgs.get(original_sender);
688 
689         list=new LinkedList();
690         for(long i=first_seqno; i <= last_seqno; i++) {
691             if(amISender) {
692                 m=(Message)sent_msgs.get(new Long(i)); // no need to synchronize
693             }
694             else {
695                 m=win != null? win.get(i) : null;
696             }
697             if(m == null) {
698                 if(log.isErrorEnabled()) {
699                     StringBuffer sb=new StringBuffer();
700                     sb.append("(requester=").append(xmit_requester).append(", local_addr=").append(this.local_addr);
701                     sb.append(") message ").append(original_sender).append("::").append(i);
702                     sb.append(" not found in ").append((amISender? "sent" : "received")).append(" msgs. ");
703                     if(win != null) {
704                         sb.append("Received messages from ").append(original_sender).append(": ").append(win.toString());
705                     }
706                     else {
707                         sb.append("\nSent messages: ").append(printSentMsgs());
708                     }
709                     log.error(sb.toString());
710                 }
711                 continue;
712             }
713             len=m.size();
714             size+=len;
715             if(size > max_xmit_size && list.size() > 0) { // changed from >= to > (yaron-r, bug #943709)
716                 // yaronr: added &&listSize()>0 since protocols between FRAG and NAKACK add headers, and message exceeds size.
717 
718                 // size has reached max_xmit_size. go ahead and send message (excluding the current message)
719                 if(trace)
720                     log.trace("xmitting msgs [" + marker + '-' + (i - 1) + "] to " + xmit_requester);
721                 sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, i - 1);
722                 marker=i;
723                 list.clear();
724                 // fixed Dec 15 2003 (bela, patch from Joel Dice (dicej)), see explanantion under
725                 // bug report #854887
726                 size=len;
727             }
728             if(Global.copy) {
729                 tmp=m.copy();
730             }
731             else {
732                 tmp=m;
733             }
734             // tmp.setDest(xmit_requester);
735             // tmp.setSrc(local_addr);
736             if(tmp.getSrc() == null)
737                 tmp.setSrc(local_addr);
738             list.add(tmp);
739         }
740 
741         if(list.size() > 0) {
742             if(trace)
743                 log.trace("xmitting msgs [" + marker + '-' + last_seqno + "] to " + xmit_requester);
744             sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, last_seqno);
745             list.clear();
746         }
747     }
748 
749     private static void updateStats(HashMap map, Address key, int req, int rsp, int missing) {
750         Entry entry=(Entry)map.get(key);
751         if(entry == null) {
752             entry=new Entry();
753             map.put(key, entry);
754         }
755         entry.xmit_reqs+=req;
756         entry.xmit_rsps+=rsp;
757         entry.missing_msgs_rcvd+=missing;
758     }
759 
760     private void sendXmitRsp(Address dest, LinkedList xmit_list, long first_seqno, long last_seqno) {
761         Buffer buf;
762         if(xmit_list == null || xmit_list.size() == 0) {
763             if(log.isErrorEnabled())
764                 log.error("xmit_list is empty");
765             return;
766         }
767         if(use_mcast_xmit)
768             dest=null;
769 
770         if(stats) {
771             xmit_rsps_sent+=xmit_list.size();
772             updateStats(sent, dest, 0, 1, 0);
773         }
774 
775         try {
776             buf=Util.msgListToByteBuffer(xmit_list);
777             Message msg=new Message(dest, null, buf.getBuf(), buf.getOffset(), buf.getLength());
778             msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP, first_seqno, last_seqno));
779             passDown(new Event(Event.MSG, msg));
780         }
781         catch(IOException ex) {
782             log.error("failed marshalling xmit list", ex);
783         }
784     }
785 
786 
787     private void handleXmitRsp(Message msg) {
788         LinkedList list;
789         Message m;
790 
791         if(msg == null) {
792             if(warn)
793                 log.warn("message is null");
794             return;
795         }
796         try {
797             list=Util.byteBufferToMessageList(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
798             if(list != null) {
799                 if(stats) {
800                     xmit_rsps_received+=list.size();
801                     updateStats(received, msg.getSrc(), 0, 1, 0);
802                 }
803                 for(Iterator it=list.iterator(); it.hasNext();) {
804                     m=(Message)it.next();
805                     up(new Event(Event.MSG, m));
806                 }
807                 list.clear();
808             }
809         }
810         catch(Exception ex) {
811             if(log.isErrorEnabled()) {
812                 log.error("message did not contain a list (LinkedList) of retransmitted messages: " + ex);
813             }
814         }
815     }
816 
817 
818 
819 
820     /**
821      * Remove old members from NakReceiverWindows and add new members (starting seqno=0). Essentially removes all
822      * entries from received_msgs that are not in <code>members</code>
823      */
824     private void adjustReceivers() {
825         Address sender;
826         NakReceiverWindow win;
827 
828         synchronized(received_msgs) {
829 
830             // 1. Remove all senders in received_msgs that are not members anymore
831             for(Iterator it=received_msgs.keySet().iterator(); it.hasNext();) {
832                 sender=(Address)it.next();
833                 if(!members.contains(sender)) {
834                     win=(NakReceiverWindow)received_msgs.get(sender);
835                     win.reset();
836                     if(log.isDebugEnabled()) {
837                         log.debug("removing " + sender + " from received_msgs (not member anymore)");
838                     }
839                     it.remove();
840                 }
841             }
842 
843             // 2. Add newly joined members to received_msgs (starting seqno=0)
844             for(int i=0; i < members.size(); i++) {
845                 sender=(Address)members.elementAt(i);
846                 if(!received_msgs.containsKey(sender)) {
847                     win=createNakReceiverWindow(sender, 0);
848                     received_msgs.put(sender, win);
849                 }
850             }
851         }
852     }
853 
854 
855     /**
856      * Returns a message digest: for each member P the highest seqno received from P is added to the digest.
857      */
858     private Digest getDigest() {
859         Digest digest;
860         Address sender;
861         Range range;
862 
863         digest=new Digest(members.size());
864         for(int i=0; i < members.size(); i++) {
865             sender=(Address)members.elementAt(i);
866             range=getLowestAndHighestSeqno(sender, false);  // get the highest received seqno
867             if(range == null) {
868                 if(log.isErrorEnabled()) {
869                     log.error("range is null");
870                 }
871                 continue;
872             }
873             digest.add(sender, range.low, range.high);  // add another entry to the digest
874         }
875         return digest;
876     }
877 
878 
879     /**
880      * Returns a message digest: for each member P the highest seqno received from P <em>without a gap</em> is added to
881      * the digest. E.g. if the seqnos received from P are [+3 +4 +5 -6 +7 +8], then 5 will be returned. Also, the
882      * highest seqno <em>seen</em> is added. The max of all highest seqnos seen will be used (in STABLE) to determine
883      * whether the last seqno from a sender was received (see "Last Message Dropped" topic in DESIGN).
884      */
885     private Digest getDigestHighestDeliveredMsgs() {
886         Digest digest;
887         Address sender;
888         Range range;
889         long high_seqno_seen;
890 
891         digest=new Digest(members.size());
892         for(int i=0; i < members.size(); i++) {
893             sender=(Address)members.elementAt(i);
894             range=getLowestAndHighestSeqno(sender, true);  // get the highest deliverable seqno
895             if(range == null) {
896                 if(log.isErrorEnabled()) {
897                     log.error("range is null");
898                 }
899                 continue;
900             }
901             high_seqno_seen=getHighSeqnoSeen(sender);
902             digest.add(sender, range.low, range.high, high_seqno_seen);  // add another entry to the digest
903         }
904         return digest;
905     }
906 
907 
908     /**
909      * Creates a NakReceiverWindow for each sender in the digest according to the sender's seqno. If NRW already exists,
910      * reset it.
911      */
912     private void setDigest(Digest d) {
913         if(d == null || d.senders == null) {
914             if(log.isErrorEnabled()) {
915                 log.error("digest or digest.senders is null");
916             }
917             return;
918         }
919 
920         clear();
921 
922         Map.Entry entry;
923         Address sender;
924         org.jgroups.protocols.pbcast.Digest.Entry val;
925         long initial_seqno;
926         NakReceiverWindow win;
927 
928         for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
929             entry=(Map.Entry)it.next();
930             sender=(Address)entry.getKey();
931             val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
932 
933             if(sender == null || val == null) {
934                 if(warn) {
935                     log.warn("sender or value is null");
936                 }
937                 continue;
938             }
939             initial_seqno=val.high_seqno;
940             win=createNakReceiverWindow(sender, initial_seqno);
941             synchronized(received_msgs) {
942                 received_msgs.put(sender, win);
943             }
944         }
945     }
946 
947 
948     /**
949      * For all members of the digest, adjust the NakReceiverWindows in the received_msgs hashtable. If the member
950      * already exists, sets its seqno to be the max of the seqno and the seqno of the member in the digest. If no entry
951      * exists, create one with the initial seqno set to the seqno of the member in the digest.
952      */
953     private void mergeDigest(Digest d) {
954         if(d == null || d.senders == null) {
955             if(log.isErrorEnabled()) {
956                 log.error("digest or digest.senders is null");
957             }
958             return;
959         }
960 
961         Map.Entry entry;
962         Address sender;
963         org.jgroups.protocols.pbcast.Digest.Entry val;
964         NakReceiverWindow win;
965         long initial_seqno;
966 
967         for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
968             entry=(Map.Entry)it.next();
969             sender=(Address)entry.getKey();
970             val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
971 
972             if(sender == null || val == null) {
973                 if(warn) {
974                     log.warn("sender or value is null");
975                 }
976                 continue;
977             }
978             initial_seqno=val.high_seqno;
979             synchronized(received_msgs) {
980                 win=(NakReceiverWindow)received_msgs.get(sender);
981                 if(win == null) {
982                     win=createNakReceiverWindow(sender, initial_seqno);
983                     received_msgs.put(sender, win);
984                 }
985                 else {
986                     if(win.getHighestReceived() < initial_seqno) {
987                         win.reset();
988                         received_msgs.remove(sender);
989                         win=createNakReceiverWindow(sender, initial_seqno);
990                         received_msgs.put(sender, win);
991                     }
992                 }
993             }
994         }
995     }
996 
997 
998     private NakReceiverWindow createNakReceiverWindow(Address sender, long initial_seqno) {
999         NakReceiverWindow win=new NakReceiverWindow(sender, this, initial_seqno, timer);
1000        win.setRetransmitTimeouts(retransmit_timeout);
1001        win.setDiscardDeliveredMessages(discard_delivered_msgs);
1002        win.setMaxXmitBufSize(this.max_xmit_buf_size);
1003        if(stats)
1004            win.setListener(this);
1005        return win;
1006    }
1007
1008
1009    /**
1010     * Returns the lowest seqno still in cache (so it can be retransmitted) and the highest seqno received so far.
1011     *
1012     * @param sender       The address for which the highest and lowest seqnos are to be retrieved
1013     * @param stop_at_gaps If true, the highest seqno *deliverable* will be returned. If false, the highest seqno
1014     *                     *received* will be returned. E.g. for [+3 +4 +5 -6 +7 +8], the highest_seqno_received is 8,
1015     *                     whereas the higheset_seqno_seen (deliverable) is 5.
1016     */
1017    private Range getLowestAndHighestSeqno(Address sender, boolean stop_at_gaps) {
1018        Range r=null;
1019        NakReceiverWindow win;
1020
1021        if(sender == null) {
1022            if(log.isErrorEnabled()) {
1023                log.error("sender is null");
1024            }
1025            return r;
1026        }
1027        synchronized(received_msgs) {
1028            win=(NakReceiverWindow)received_msgs.get(sender);
1029        }
1030        if(win == null) {
1031            if(log.isErrorEnabled()) {
1032                log.error("sender " + sender + " not found in received_msgs");
1033            }
1034            return r;
1035        }
1036        if(stop_at_gaps) {
1037            r=new Range(win.getLowestSeen(), win.getHighestSeen());       // deliverable messages (no gaps)
1038        }
1039        else {
1040            r=new Range(win.getLowestSeen(), win.getHighestReceived() + 1); // received messages
1041        }
1042        return r;
1043    }
1044
1045
1046    /**
1047     * Returns the highest seqno seen from sender. E.g. if we received 1, 2, 4, 5 from P, then 5 will be returned
1048     * (doesn't take gaps into account). If we are the sender, we will return the highest seqno <em>sent</em> rather
1049     * then <em>received</em>
1050     */
1051    private long getHighSeqnoSeen(Address sender) {
1052        NakReceiverWindow win;
1053        long ret=0;
1054
1055        if(sender == null) {
1056            if(log.isErrorEnabled()) {
1057                log.error("sender is null");
1058            }
1059            return ret;
1060        }
1061        if(sender.equals(local_addr)) {
1062            return seqno - 1;
1063        }
1064
1065        synchronized(received_msgs) {
1066            win=(NakReceiverWindow)received_msgs.get(sender);
1067        }
1068        if(win == null) {
1069            if(log.isErrorEnabled()) {
1070                log.error("sender " + sender + " not found in received_msgs");
1071            }
1072            return ret;
1073        }
1074        ret=win.getHighestReceived();
1075        return ret;
1076    }
1077
1078
1079    /**
1080     * Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest
1081     * which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update received_msgs:
1082     * for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the
1083     * NakReceiverWindow corresponding to P which are <= seqno at digest[P].
1084     */
1085    private void stable(Digest d) {
1086        NakReceiverWindow recv_win;
1087        long my_highest_rcvd;        // highest seqno received in my digest for a sender P
1088        long stability_highest_rcvd; // highest seqno received in the stability vector for a sender P
1089
1090        if(members == null || local_addr == null || d == null) {
1091            if(warn)
1092                log.warn("members, local_addr or digest are null !");
1093            return;
1094        }
1095
1096        if(trace) {
1097            log.trace("received stable digest " + d);
1098        }
1099
1100        Map.Entry entry;
1101        Address sender;
1102        org.jgroups.protocols.pbcast.Digest.Entry val;
1103        long high_seqno_delivered, high_seqno_received;
1104
1105        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
1106            entry=(Map.Entry)it.next();
1107            sender=(Address)entry.getKey();
1108            if(sender == null)
1109                continue;
1110            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
1111            high_seqno_delivered=val.high_seqno;
1112            high_seqno_received=val.high_seqno_seen;
1113
1114
1115            // check whether the last seqno received for a sender P in the stability vector is > last seqno
1116            // received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic
1117            // in DESIGN)
1118            synchronized(received_msgs) {
1119                recv_win=(NakReceiverWindow)received_msgs.get(sender);
1120            }
1121            if(recv_win != null) {
1122                my_highest_rcvd=recv_win.getHighestReceived();
1123                stability_highest_rcvd=high_seqno_received;
1124
1125                if(stability_highest_rcvd >= 0 && stability_highest_rcvd > my_highest_rcvd) {
1126                    if(trace) {
1127                        log.trace("my_highest_rcvd (" + my_highest_rcvd + ") < stability_highest_rcvd (" +
1128                                stability_highest_rcvd + "): requesting retransmission of " +
1129                                sender + '#' + stability_highest_rcvd);
1130                    }
1131                    retransmit(stability_highest_rcvd, stability_highest_rcvd, sender);
1132                }
1133            }
1134
1135            high_seqno_delivered-=gc_lag;
1136            if(high_seqno_delivered < 0) {
1137                continue;
1138            }
1139
1140            if(trace)
1141                log.trace("deleting msgs <= " + high_seqno_delivered + " from " + sender);
1142
1143            // garbage collect from sent_msgs if sender was myself
1144            if(sender.equals(local_addr)) {
1145                synchronized(sent_msgs) {
1146                    // gets us a subset from [lowest seqno - seqno]
1147                    SortedMap stable_keys=sent_msgs.headMap(new Long(high_seqno_delivered));
1148                    if(stable_keys != null) {
1149                        stable_keys.clear(); // this will modify sent_msgs directly
1150                    }
1151                }
1152            }
1153
1154            // delete *delivered* msgs that are stable
1155            // recv_win=(NakReceiverWindow)received_msgs.get(sender);
1156            if(recv_win != null)
1157                recv_win.stable(high_seqno_delivered);  // delete all messages with seqnos <= seqno
1158        }
1159    }
1160
1161
1162
1163    /* ---------------------- Interface Retransmitter.RetransmitCommand ---------------------- */
1164
1165
1166    /**
1167     * Implementation of Retransmitter.RetransmitCommand. Called by retransmission thread when gap is detected.
1168     */
1169    public void retransmit(long first_seqno, long last_seqno, Address sender) {
1170        NakAckHeader hdr;
1171        Message retransmit_msg;
1172        Address dest=sender; // to whom do we send the XMIT request ?
1173
1174        if(xmit_from_random_member && !local_addr.equals(sender)) {
1175            Address random_member=(Address)Util.pickRandomElement(members);
1176            if(random_member != null && !local_addr.equals(random_member)) {
1177                dest=random_member;
1178                if(trace)
1179                    log.trace("picked random member " + dest + " to send XMIT request to");
1180            }
1181        }
1182
1183        hdr=new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno, last_seqno, sender);
1184        retransmit_msg=new Message(dest, null, null);
1185        if(trace)
1186            log.trace(local_addr + ": sending XMIT_REQ ([" + first_seqno + ", " + last_seqno + "]) to " + dest);
1187        retransmit_msg.putHeader(name, hdr);
1188        passDown(new Event(Event.MSG, retransmit_msg));
1189        if(stats) {
1190            xmit_reqs_sent+=last_seqno - first_seqno +1;
1191            updateStats(sent, dest, 1, 0, 0);
1192            for(long i=first_seqno; i <= last_seqno; i++) {
1193                XmitRequest req=new XmitRequest(sender, i, dest);
1194                send_history.add(req);
1195            }
1196        }
1197    }
1198    /* ------------------- End of Interface Retransmitter.RetransmitCommand -------------------- */
1199
1200
1201
1202    /* ----------------------- Interface NakReceiverWindow.Listener ---------------------- */
1203    public void missingMessageReceived(long seqno, Message msg) {
1204        if(stats) {
1205            missing_msgs_received++;
1206            updateStats(received, msg.getSrc(), 0, 0, 1);
1207            MissingMessage missing=new MissingMessage(msg.getSrc(), seqno);
1208            receive_history.add(missing);
1209        }
1210    }
1211    /* ------------------- End of Interface NakReceiverWindow.Listener ------------------- */
1212
1213    private void clear() {
1214        NakReceiverWindow win;
1215
1216        // changed April 21 2004 (bela): SourceForge bug# 938584. We cannot delete our own messages sent between
1217        // a join() and a getState(). Otherwise retransmission requests from members who missed those msgs might
1218        // fail. Not to worry though: those msgs will be cleared by STABLE (message garbage collection)
1219
1220        // sent_msgs.clear();
1221
1222        synchronized(received_msgs) {
1223            for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
1224                win=(NakReceiverWindow)it.next();
1225                win.reset();
1226            }
1227            received_msgs.clear();
1228        }
1229    }
1230
1231
1232    private void removeAll() {
1233        NakReceiverWindow win;
1234
1235        synchronized(sent_msgs) {
1236            sent_msgs.clear();
1237        }
1238
1239        synchronized(received_msgs) {
1240            for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {
1241                win=(NakReceiverWindow)it.next();
1242                win.destroy();
1243            }
1244            received_msgs.clear();
1245        }
1246    }
1247
1248
1249   public String printMessages() {
1250        StringBuffer ret=new StringBuffer();
1251        Map.Entry entry;
1252        Address addr;
1253        Object w;
1254
1255       ret.append("\nsent_msgs: ").append(printSentMsgs());
1256        ret.append("\nreceived_msgs:\n");
1257        synchronized(received_msgs) {
1258            for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
1259                entry=(Map.Entry)it.next();
1260                addr=(Address)entry.getKey();
1261                w=entry.getValue();
1262                ret.append(addr).append(": ").append(w.toString()).append('\n');
1263            }
1264        }
1265        return ret.toString();
1266    }
1267
1268
1269    public String printSentMsgs() {
1270        StringBuffer sb=new StringBuffer();
1271        Long min_seqno, max_seqno;
1272        synchronized(sent_msgs) {
1273            min_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.firstKey() : new Long(0);
1274            max_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.lastKey() : new Long(0);
1275        }
1276        sb.append('[').append(min_seqno).append(" - ").append(max_seqno).append("] (" + sent_msgs.size() + ")");
1277        return sb.toString();
1278    }
1279
1280
1281    private void handleConfigEvent(HashMap map) {
1282        if(map == null) {
1283            return;
1284        }
1285        if(map.containsKey("frag_size")) {
1286            max_xmit_size=((Integer)map.get("frag_size")).intValue();
1287            if(log.isInfoEnabled()) {
1288                log.info("max_xmit_size=" + max_xmit_size);
1289            }
1290        }
1291    }
1292
1293
1294    static class Entry {
1295        long xmit_reqs, xmit_rsps, missing_msgs_rcvd;
1296
1297        public String toString() {
1298            StringBuffer sb=new StringBuffer();
1299            sb.append(xmit_reqs).append(" xmit_reqs").append(", ").append(xmit_rsps).append(" xmit_rsps");
1300            sb.append(", ").append(missing_msgs_rcvd).append(" missing msgs");
1301            return sb.toString();
1302        }
1303    }
1304
1305    static class XmitRequest {
1306        Address original_sender; // original sender of message
1307        long    seq, timestamp=System.currentTimeMillis();
1308        Address xmit_dest;       // destination to which XMIT_REQ is sent, usually the original sender
1309
1310        XmitRequest(Address original_sender, long seqno, Address xmit_dest) {
1311            this.original_sender=original_sender;
1312            this.xmit_dest=xmit_dest;
1313            this.seq=seqno;
1314        }
1315
1316        public String toString() {
1317            StringBuffer sb=new StringBuffer();
1318            sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq);
1319            sb.append(" (XMIT_REQ sent to ").append(xmit_dest).append(")");
1320            return sb.toString();
1321        }
1322    }
1323
1324    static class MissingMessage {
1325        Address original_sender;
1326        long    seq, timestamp=System.currentTimeMillis();
1327
1328        MissingMessage(Address original_sender, long seqno) {
1329            this.original_sender=original_sender;
1330            this.seq=seqno;
1331        }
1332
1333        public String toString() {
1334            StringBuffer sb=new StringBuffer();
1335            sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq);
1336            return sb.toString();
1337        }
1338    }
1339
1340    /* ----------------------------- End of Private Methods ------------------------------------ */
1341
1342
1343}