Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/UNICAST.java


1   // $Id: UNICAST.java,v 1.45 2005/10/28 13:47:52 belaban Exp $
2   
3   package org.jgroups.protocols;
4   
5   import org.jgroups.*;
6   import org.jgroups.stack.AckReceiverWindow;
7   import org.jgroups.stack.AckSenderWindow;
8   import org.jgroups.stack.Protocol;
9   import org.jgroups.util.BoundedList;
10  import org.jgroups.util.Streamable;
11  import org.jgroups.util.TimeScheduler;
12  import org.jgroups.util.Util;
13  
14  import java.io.*;
15  import java.util.*;
16  
17  
18  /**
19   * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission
20   * of unicast messages (for reliable multicast see NAKACK layer). When a message is sent to a peer for
21   * the first time, we add the pair <peer_addr, Entry> to the hashtable (peer address is the key). All
22   * messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a
23   * message from a peer for the first time, another entry will be created and added to the hashtable
24   * (unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs.<p> This
25   * layer is used to reliably transmit point-to-point messages, that is, either messages sent to a
26   * single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The 
27   * sender uses an <code>AckSenderWindow</code> which retransmits messages for which it hasn't received
28   * an ACK, the receiver uses <code>AckReceiverWindow</code> which keeps track of the lowest seqno
29   * received so far, and keeps messages in order.<p>
30   * Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from
31   * AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow
32   * whenever a message is received: the new message is added and then we try to remove as many messages as
33   * possible (until we stop at a gap, or there are no more messages).
34   * @author Bela Ban
35   */
36  public class UNICAST extends Protocol implements AckSenderWindow.RetransmitCommand {
37      private final Vector     members=new Vector(11);
38      private final HashMap    connections=new HashMap(11);   // Object (sender or receiver) -- Entries
39      private long[]           timeout={400,800,1600,3200};  // for AckSenderWindow: max time to wait for missing acks
40      private Address          local_addr=null;
41      private TimeScheduler    timer=null;                    // used for retransmissions (passed to AckSenderWindow)
42  
43      // if UNICAST is used without GMS, don't consult the membership on retransmit() if use_gms=false
44      // default is true
45      private boolean          use_gms=true;
46  
47      /** A list of members who left, used to determine when to prevent sending messages to left mbrs */
48      private final BoundedList previous_members=new BoundedList(50);
49  
50      private final static String name="UNICAST";
51      private static final long DEFAULT_FIRST_SEQNO=1;
52  
53      private long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;
54      private long num_acks_sent=0, num_acks_received=0, num_xmit_requests_received=0;
55  
56  
57      /** All protocol names have to be unique ! */
58      public String  getName() {return name;}
59  
60      public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
61      public String getMembers() {return members != null? members.toString() : "[]";}
62      public String printConnections() {
63          StringBuffer sb=new StringBuffer();
64          Map.Entry entry;
65          for(Iterator it=connections.entrySet().iterator(); it.hasNext();) {
66              entry=(Map.Entry)it.next();
67              sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
68          }
69          return sb.toString();
70      }
71  
72  
73      public long getNumMessagesSent() {
74          return num_msgs_sent;
75      }
76  
77      public long getNumMessagesReceived() {
78          return num_msgs_received;
79      }
80  
81      public long getNumBytesSent() {
82          return num_bytes_sent;
83      }
84  
85      public long getNumBytesReceived() {
86          return num_bytes_received;
87      }
88  
89      public long getNumAcksSent() {
90          return num_acks_sent;
91      }
92  
93      public long getNumAcksReceived() {
94          return num_acks_received;
95      }
96  
97      public long getNumberOfRetransmitRequestsReceived() {
98          return num_xmit_requests_received;
99      }
100 
101     /** The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) */
102     public int getNumberOfUnackedMessages() {
103         int num=0;
104         Entry entry;
105         synchronized(connections) {
106             for(Iterator it=connections.values().iterator(); it.hasNext();) {
107                 entry=(Entry)it.next();
108                 if(entry.sent_msgs != null)
109                 num+=entry.sent_msgs.size();
110             }
111         }
112         return num;
113     }
114 
115     public void resetStats() {
116         num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=num_acks_sent=num_acks_received=0;
117         num_xmit_requests_received=0;
118     }
119 
120     public Map dumpStats() {
121         Map m=new HashMap();
122         m.put("num_msgs_sent", new Long(num_msgs_sent));
123         m.put("num_msgs_received", new Long(num_msgs_received));
124         m.put("num_bytes_sent", new Long(num_bytes_sent));
125         m.put("num_bytes_received", new Long(num_bytes_received));
126         m.put("num_acks_sent", new Long(num_acks_sent));
127         m.put("num_acks_received", new Long(num_acks_received));
128         m.put("num_xmit_requests_received", new Long(num_xmit_requests_received));
129         return m;
130     }
131 
132     public boolean setProperties(Properties props) {
133         String     str;
134         long[]     tmp;
135 
136         super.setProperties(props);
137         str=props.getProperty("timeout");
138         if(str != null) {
139         tmp=Util.parseCommaDelimitedLongs(str);
140         if(tmp != null && tmp.length > 0)
141         timeout=tmp;
142             props.remove("timeout");
143         }
144 
145         str=props.getProperty("window_size");
146         if(str != null) {
147             props.remove("window_size");
148             log.error("window_size is deprecated and will be ignored");
149         }
150 
151         str=props.getProperty("min_threshold");
152         if(str != null) {
153             props.remove("min_threshold");
154             log.error("min_threshold is deprecated and will be ignored");
155         }
156 
157         str=props.getProperty("use_gms");
158         if(str != null) {
159             use_gms=Boolean.valueOf(str).booleanValue();
160             props.remove("use_gms");
161         }
162 
163         if(props.size() > 0) {
164             log.error("UNICAST.setProperties(): these properties are not recognized: " + props);
165 
166             return false;
167         }
168         return true;
169     }
170 
171     public void start() throws Exception {
172         timer=stack != null ? stack.timer : null;
173         if(timer == null)
174             throw new Exception("timer is null");
175     }
176 
177     public void stop() {
178         removeAllConnections();
179     }
180 
181 
182     public void up(Event evt) {
183         Message        msg;
184         Address        dst, src;
185         UnicastHeader  hdr;
186 
187         switch(evt.getType()) {
188 
189         case Event.MSG:
190             msg=(Message)evt.getArg();
191             dst=msg.getDest();
192 
193             if(dst == null || dst.isMulticastAddress())  // only handle unicast messages
194                 break;  // pass up
195 
196             // changed from removeHeader(): we cannot remove the header because if we do loopback=true at the
197             // transport level, we will not have the header on retransmit ! (bela Aug 22 2006)
198             hdr=(UnicastHeader)msg.getHeader(name);
199             if(hdr == null) break;
200             src=msg.getSrc();
201             switch(hdr.type) {
202             case UnicastHeader.DATA:      // received regular message
203                 handleDataReceived(src, hdr.seqno, msg);
204                 sendAck(src, hdr.seqno); // only send an ACK if added to the received_msgs table (bela Aug 2006)
205                 return; // we pass the deliverable message up in handleDataReceived()
206             case UnicastHeader.ACK:  // received ACK for previously sent message
207                 handleAckReceived(src, hdr.seqno);
208                 break;
209             default:
210                 log.error("UnicastHeader type " + hdr.type + " not known !");
211                 break;
212             }
213             return;
214 
215         case Event.SET_LOCAL_ADDRESS:
216             local_addr=(Address)evt.getArg();
217             break;
218         }
219 
220         passUp(evt);   // Pass up to the layer above us
221     }
222 
223 
224 
225 
226 
227     public void down(Event evt) {
228         switch (evt.getType()) {
229 
230         case Event.MSG: // Add UnicastHeader, add to AckSenderWindow and pass down
231             Message msg = (Message) evt.getArg();
232             Object dst = msg.getDest();
233 
234             /* only handle unicast messages */
235             if (dst == null || ((Address) dst).isMulticastAddress()) {
236                 break;
237             }
238 
239             if(previous_members.contains(dst)) {
240                 if(log.isTraceEnabled())
241                     log.trace("discarding message to " + dst + " as this member left the group");
242                 return;
243             }
244 
245             Entry entry;
246             synchronized(connections) {
247                 entry=(Entry)connections.get(dst);
248                 if(entry == null) {
249                     entry=new Entry();
250                     connections.put(dst, entry);
251                 }
252             }
253 
254             synchronized(entry) { // threads will only sync if they access the same entry
255                 long seqno=entry.sent_msgs_seqno;
256                 UnicastHeader hdr=new UnicastHeader(UnicastHeader.DATA, seqno);
257                 if(entry.sent_msgs == null) { // first msg to peer 'dst'
258                     entry.sent_msgs=new AckSenderWindow(this, timeout, timer); // use the protocol stack's timer
259                 }
260                 msg.putHeader(name, hdr);
261                 if(trace)
262                     log.trace(new StringBuffer().append(local_addr).append(" --> DATA(").append(dst).append(": #").
263                             append(seqno));
264 
265                 entry.sent_msgs_seqno++;
266                 Message tmp=Global.copy? msg.copy() : msg;
267 
268                 try {
269                     passDown(new Event(Event.MSG, tmp));
270                     num_msgs_sent++;
271                     num_bytes_sent+=msg.getLength();
272                 }
273                 finally {
274                     entry.sent_msgs.add(seqno, tmp);  // add *including* UnicastHeader, adds to retransmitter
275                 }
276             }
277             msg=null;
278             return; // AckSenderWindow will send message for us
279 
280         case Event.VIEW_CHANGE:  // remove connections to peers that are not members anymore !
281             Vector new_members=((View)evt.getArg()).getMembers();
282             Vector left_members;
283             synchronized(members) {
284                 left_members=Util.determineLeftMembers(members, new_members);
285                 members.removeAllElements();
286                 if(new_members != null)
287                     members.addAll(new_members);
288             }
289 
290             // Remove all connections for members that left between the current view and the new view
291             // See DESIGN for details
292             boolean rc;
293             if(use_gms && left_members.size() > 0) {
294                 Object mbr;
295                 for(int i=0; i < left_members.size(); i++) {
296                     mbr=left_members.elementAt(i);
297                     rc=removeConnection(mbr);
298                     if(rc && trace)
299                         log.trace("removed " + mbr + " from connection table, member(s) " + left_members + " left");
300                 }
301             }
302             break;
303         }
304 
305         passDown(evt);          // Pass on to the layer below us
306     }
307 
308 
309     /** Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false */
310     private boolean removeConnection(Object mbr) {
311         Entry entry;
312 
313         synchronized(connections) {
314             entry=(Entry)connections.remove(mbr);
315             if(!previous_members.contains(mbr))
316                 previous_members.add(mbr);
317         }
318         if(entry != null) {
319             entry.reset();
320             return true;
321         }
322         else
323             return false;
324     }
325 
326 
327     private void removeAllConnections() {
328         Entry entry;
329 
330         synchronized(connections) {
331             for(Iterator it=connections.values().iterator(); it.hasNext();) {
332                 entry=(Entry)it.next();
333                 entry.reset();
334             }
335             connections.clear();
336         }
337     }
338 
339 
340 
341     /** Called by AckSenderWindow to resend messages for which no ACK has been received yet */
342     public void retransmit(long seqno, Message msg) {
343         Object  dst=msg.getDest();
344 
345         // bela Dec 23 2002:
346         // this will remove a member on a MERGE request, e.g. A and B merge: when A sends the unicast
347         // request to B and there's a retransmit(), B will be removed !
348 
349         //          if(use_gms && !members.contains(dst) && !prev_members.contains(dst)) {
350         //
351         //                  if(warn) log.warn("UNICAST.retransmit()", "seqno=" + seqno + ":  dest " + dst +
352         //                             " is not member any longer; removing entry !");
353 
354         //              synchronized(connections) {
355         //                  removeConnection(dst);
356         //              }
357         //              return;
358         //          }
359 
360         if(trace)
361             log.trace("[" + local_addr + "] --> XMIT(" + dst + ": #" + seqno + ')');
362 
363         if(Global.copy)
364             passDown(new Event(Event.MSG, msg.copy()));
365         else
366             passDown(new Event(Event.MSG, msg));
367         num_xmit_requests_received++;
368     }
369 
370 
371 
372 
373 
374     /**
375      * Check whether the hashtable contains an entry e for <code>sender</code> (create if not). If
376      * e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
377      * add message. Set e.received_msgs to the new window. Else just add the message.
378      */
379     private void handleDataReceived(Object sender, long seqno, Message msg) {
380         Entry    entry;
381 
382         if(trace)
383             log.trace(new StringBuffer().append(local_addr).append(" <-- DATA(").append(sender).append(": #").append(seqno));
384 
385         if(previous_members.contains(sender)) {
386             if(log.isTraceEnabled())
387                 log.trace("discarding message from " + sender + " as this member left the group");
388             return;
389         }
390 
391         synchronized(connections) {
392             entry=(Entry)connections.get(sender);
393             if(entry == null) {
394                 entry=new Entry();
395                 connections.put(sender, entry);
396             }
397             if(entry.received_msgs == null)
398                 entry.received_msgs=new AckReceiverWindow(DEFAULT_FIRST_SEQNO);
399         }
400 
401         entry.received_msgs.add(seqno, msg); // entry.received_msgs is guaranteed to be non-null if we get here
402         num_msgs_received++;
403         num_bytes_received+=msg.getLength();
404 
405         // Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
406         Message  m;
407         while((m=entry.received_msgs.remove()) != null)
408             passUp(new Event(Event.MSG, m));
409     }
410 
411 
412     /** Add the ACK to hashtable.sender.sent_msgs */
413     private void handleAckReceived(Object sender, long seqno) {
414         Entry           entry;
415         AckSenderWindow win;
416 
417         if(trace)
418             log.trace(new StringBuffer().append(local_addr).append(" <-- ACK(").append(sender).
419                       append(": #").append(seqno).append(')'));
420         synchronized(connections) {
421             entry=(Entry)connections.get(sender);
422         }
423         if(entry == null || entry.sent_msgs == null)
424             return;
425         win=entry.sent_msgs;
426         win.ack(seqno); // removes message from retransmission
427         num_acks_received++;
428     }
429 
430 
431 
432     private void sendAck(Address dst, long seqno) {
433         Message ack=new Message(dst, null, null);
434         ack.putHeader(name, new UnicastHeader(UnicastHeader.ACK, seqno));
435         if(trace)
436             log.trace(new StringBuffer().append(local_addr).append(" --> ACK(").append(dst).
437                       append(": #").append(seqno).append(')'));
438         passDown(new Event(Event.MSG, ack));
439         num_acks_sent++;
440     }
441 
442 
443 
444 
445 
446 
447     public static class UnicastHeader extends Header implements Streamable {
448         public static final byte DATA=0;
449         public static final byte ACK=1;
450 
451         byte    type=DATA;
452         long    seqno=0;
453 
454         static final long serialized_size=Global.BYTE_SIZE + Global.LONG_SIZE;
455 
456 
457         public UnicastHeader() {} // used for externalization
458 
459         public UnicastHeader(byte type, long seqno) {
460             this.type=type;
461             this.seqno=seqno;
462         }
463 
464         public String toString() {
465             return "[UNICAST: " + type2Str(type) + ", seqno=" + seqno + ']';
466         }
467 
468         public static String type2Str(byte t) {
469             switch(t) {
470                 case DATA: return "DATA";
471                 case ACK: return "ACK";
472                 default: return "<unknown>";
473             }
474         }
475 
476         public final long size() {
477             return serialized_size;
478         }
479 
480 
481         public void writeExternal(ObjectOutput out) throws IOException {
482             out.writeByte(type);
483             out.writeLong(seqno);
484         }
485 
486 
487 
488         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
489             type=in.readByte();
490             seqno=in.readLong();
491         }
492 
493         public void writeTo(DataOutputStream out) throws IOException {
494             out.writeByte(type);
495             out.writeLong(seqno);
496         }
497 
498         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
499             type=in.readByte();
500             seqno=in.readLong();
501         }
502     }
503 
504     private static final class Entry {
505         AckReceiverWindow  received_msgs=null;  // stores all msgs rcvd by a certain peer in seqno-order
506         AckSenderWindow    sent_msgs=null;      // stores (and retransmits) msgs sent by us to a certain peer
507         long               sent_msgs_seqno=DEFAULT_FIRST_SEQNO;   // seqno for msgs sent by us
508 
509         void reset() {
510             if(sent_msgs != null)
511                 sent_msgs.reset();
512             if(received_msgs != null)
513                 received_msgs.reset();
514         }
515 
516 
517         public String toString() {
518             StringBuffer sb=new StringBuffer();
519             if(sent_msgs != null)
520                 sb.append("sent_msgs=" + sent_msgs + '\n');
521             if(received_msgs != null)
522                 sb.append("received_msgs=" + received_msgs + '\n');
523             return sb.toString();
524         }
525     }
526 
527 
528 }