Source code: org/jgroups/protocols/UNICAST.java
1 // $Id: UNICAST.java,v 1.45 2005/10/28 13:47:52 belaban Exp $
2
3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.AckReceiverWindow;
7 import org.jgroups.stack.AckSenderWindow;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.BoundedList;
10 import org.jgroups.util.Streamable;
11 import org.jgroups.util.TimeScheduler;
12 import org.jgroups.util.Util;
13
14 import java.io.*;
15 import java.util.*;
16
17
18 /**
19 * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission
20 * of unicast messages (for reliable multicast see NAKACK layer). When a message is sent to a peer for
21 * the first time, we add the pair <peer_addr, Entry> to the hashtable (peer address is the key). All
22 * messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a
23 * message from a peer for the first time, another entry will be created and added to the hashtable
24 * (unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs.<p> This
25 * layer is used to reliably transmit point-to-point messages, that is, either messages sent to a
26 * single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The
27 * sender uses an <code>AckSenderWindow</code> which retransmits messages for which it hasn't received
28 * an ACK, the receiver uses <code>AckReceiverWindow</code> which keeps track of the lowest seqno
29 * received so far, and keeps messages in order.<p>
30 * Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from
31 * AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow
32 * whenever a message is received: the new message is added and then we try to remove as many messages as
33 * possible (until we stop at a gap, or there are no more messages).
34 * @author Bela Ban
35 */
36 public class UNICAST extends Protocol implements AckSenderWindow.RetransmitCommand {
37 private final Vector members=new Vector(11);
38 private final HashMap connections=new HashMap(11); // Object (sender or receiver) -- Entries
39 private long[] timeout={400,800,1600,3200}; // for AckSenderWindow: max time to wait for missing acks
40 private Address local_addr=null;
41 private TimeScheduler timer=null; // used for retransmissions (passed to AckSenderWindow)
42
43 // if UNICAST is used without GMS, don't consult the membership on retransmit() if use_gms=false
44 // default is true
45 private boolean use_gms=true;
46
47 /** A list of members who left, used to determine when to prevent sending messages to left mbrs */
48 private final BoundedList previous_members=new BoundedList(50);
49
50 private final static String name="UNICAST";
51 private static final long DEFAULT_FIRST_SEQNO=1;
52
53 private long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;
54 private long num_acks_sent=0, num_acks_received=0, num_xmit_requests_received=0;
55
56
57 /** All protocol names have to be unique ! */
58 public String getName() {return name;}
59
60 public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
61 public String getMembers() {return members != null? members.toString() : "[]";}
62 public String printConnections() {
63 StringBuffer sb=new StringBuffer();
64 Map.Entry entry;
65 for(Iterator it=connections.entrySet().iterator(); it.hasNext();) {
66 entry=(Map.Entry)it.next();
67 sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
68 }
69 return sb.toString();
70 }
71
72
73 public long getNumMessagesSent() {
74 return num_msgs_sent;
75 }
76
77 public long getNumMessagesReceived() {
78 return num_msgs_received;
79 }
80
81 public long getNumBytesSent() {
82 return num_bytes_sent;
83 }
84
85 public long getNumBytesReceived() {
86 return num_bytes_received;
87 }
88
89 public long getNumAcksSent() {
90 return num_acks_sent;
91 }
92
93 public long getNumAcksReceived() {
94 return num_acks_received;
95 }
96
97 public long getNumberOfRetransmitRequestsReceived() {
98 return num_xmit_requests_received;
99 }
100
101 /** The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) */
102 public int getNumberOfUnackedMessages() {
103 int num=0;
104 Entry entry;
105 synchronized(connections) {
106 for(Iterator it=connections.values().iterator(); it.hasNext();) {
107 entry=(Entry)it.next();
108 if(entry.sent_msgs != null)
109 num+=entry.sent_msgs.size();
110 }
111 }
112 return num;
113 }
114
115 public void resetStats() {
116 num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=num_acks_sent=num_acks_received=0;
117 num_xmit_requests_received=0;
118 }
119
120 public Map dumpStats() {
121 Map m=new HashMap();
122 m.put("num_msgs_sent", new Long(num_msgs_sent));
123 m.put("num_msgs_received", new Long(num_msgs_received));
124 m.put("num_bytes_sent", new Long(num_bytes_sent));
125 m.put("num_bytes_received", new Long(num_bytes_received));
126 m.put("num_acks_sent", new Long(num_acks_sent));
127 m.put("num_acks_received", new Long(num_acks_received));
128 m.put("num_xmit_requests_received", new Long(num_xmit_requests_received));
129 return m;
130 }
131
132 public boolean setProperties(Properties props) {
133 String str;
134 long[] tmp;
135
136 super.setProperties(props);
137 str=props.getProperty("timeout");
138 if(str != null) {
139 tmp=Util.parseCommaDelimitedLongs(str);
140 if(tmp != null && tmp.length > 0)
141 timeout=tmp;
142 props.remove("timeout");
143 }
144
145 str=props.getProperty("window_size");
146 if(str != null) {
147 props.remove("window_size");
148 log.error("window_size is deprecated and will be ignored");
149 }
150
151 str=props.getProperty("min_threshold");
152 if(str != null) {
153 props.remove("min_threshold");
154 log.error("min_threshold is deprecated and will be ignored");
155 }
156
157 str=props.getProperty("use_gms");
158 if(str != null) {
159 use_gms=Boolean.valueOf(str).booleanValue();
160 props.remove("use_gms");
161 }
162
163 if(props.size() > 0) {
164 log.error("UNICAST.setProperties(): these properties are not recognized: " + props);
165
166 return false;
167 }
168 return true;
169 }
170
171 public void start() throws Exception {
172 timer=stack != null ? stack.timer : null;
173 if(timer == null)
174 throw new Exception("timer is null");
175 }
176
177 public void stop() {
178 removeAllConnections();
179 }
180
181
182 public void up(Event evt) {
183 Message msg;
184 Address dst, src;
185 UnicastHeader hdr;
186
187 switch(evt.getType()) {
188
189 case Event.MSG:
190 msg=(Message)evt.getArg();
191 dst=msg.getDest();
192
193 if(dst == null || dst.isMulticastAddress()) // only handle unicast messages
194 break; // pass up
195
196 // changed from removeHeader(): we cannot remove the header because if we do loopback=true at the
197 // transport level, we will not have the header on retransmit ! (bela Aug 22 2006)
198 hdr=(UnicastHeader)msg.getHeader(name);
199 if(hdr == null) break;
200 src=msg.getSrc();
201 switch(hdr.type) {
202 case UnicastHeader.DATA: // received regular message
203 handleDataReceived(src, hdr.seqno, msg);
204 sendAck(src, hdr.seqno); // only send an ACK if added to the received_msgs table (bela Aug 2006)
205 return; // we pass the deliverable message up in handleDataReceived()
206 case UnicastHeader.ACK: // received ACK for previously sent message
207 handleAckReceived(src, hdr.seqno);
208 break;
209 default:
210 log.error("UnicastHeader type " + hdr.type + " not known !");
211 break;
212 }
213 return;
214
215 case Event.SET_LOCAL_ADDRESS:
216 local_addr=(Address)evt.getArg();
217 break;
218 }
219
220 passUp(evt); // Pass up to the layer above us
221 }
222
223
224
225
226
227 public void down(Event evt) {
228 switch (evt.getType()) {
229
230 case Event.MSG: // Add UnicastHeader, add to AckSenderWindow and pass down
231 Message msg = (Message) evt.getArg();
232 Object dst = msg.getDest();
233
234 /* only handle unicast messages */
235 if (dst == null || ((Address) dst).isMulticastAddress()) {
236 break;
237 }
238
239 if(previous_members.contains(dst)) {
240 if(log.isTraceEnabled())
241 log.trace("discarding message to " + dst + " as this member left the group");
242 return;
243 }
244
245 Entry entry;
246 synchronized(connections) {
247 entry=(Entry)connections.get(dst);
248 if(entry == null) {
249 entry=new Entry();
250 connections.put(dst, entry);
251 }
252 }
253
254 synchronized(entry) { // threads will only sync if they access the same entry
255 long seqno=entry.sent_msgs_seqno;
256 UnicastHeader hdr=new UnicastHeader(UnicastHeader.DATA, seqno);
257 if(entry.sent_msgs == null) { // first msg to peer 'dst'
258 entry.sent_msgs=new AckSenderWindow(this, timeout, timer); // use the protocol stack's timer
259 }
260 msg.putHeader(name, hdr);
261 if(trace)
262 log.trace(new StringBuffer().append(local_addr).append(" --> DATA(").append(dst).append(": #").
263 append(seqno));
264
265 entry.sent_msgs_seqno++;
266 Message tmp=Global.copy? msg.copy() : msg;
267
268 try {
269 passDown(new Event(Event.MSG, tmp));
270 num_msgs_sent++;
271 num_bytes_sent+=msg.getLength();
272 }
273 finally {
274 entry.sent_msgs.add(seqno, tmp); // add *including* UnicastHeader, adds to retransmitter
275 }
276 }
277 msg=null;
278 return; // AckSenderWindow will send message for us
279
280 case Event.VIEW_CHANGE: // remove connections to peers that are not members anymore !
281 Vector new_members=((View)evt.getArg()).getMembers();
282 Vector left_members;
283 synchronized(members) {
284 left_members=Util.determineLeftMembers(members, new_members);
285 members.removeAllElements();
286 if(new_members != null)
287 members.addAll(new_members);
288 }
289
290 // Remove all connections for members that left between the current view and the new view
291 // See DESIGN for details
292 boolean rc;
293 if(use_gms && left_members.size() > 0) {
294 Object mbr;
295 for(int i=0; i < left_members.size(); i++) {
296 mbr=left_members.elementAt(i);
297 rc=removeConnection(mbr);
298 if(rc && trace)
299 log.trace("removed " + mbr + " from connection table, member(s) " + left_members + " left");
300 }
301 }
302 break;
303 }
304
305 passDown(evt); // Pass on to the layer below us
306 }
307
308
309 /** Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false */
310 private boolean removeConnection(Object mbr) {
311 Entry entry;
312
313 synchronized(connections) {
314 entry=(Entry)connections.remove(mbr);
315 if(!previous_members.contains(mbr))
316 previous_members.add(mbr);
317 }
318 if(entry != null) {
319 entry.reset();
320 return true;
321 }
322 else
323 return false;
324 }
325
326
327 private void removeAllConnections() {
328 Entry entry;
329
330 synchronized(connections) {
331 for(Iterator it=connections.values().iterator(); it.hasNext();) {
332 entry=(Entry)it.next();
333 entry.reset();
334 }
335 connections.clear();
336 }
337 }
338
339
340
341 /** Called by AckSenderWindow to resend messages for which no ACK has been received yet */
342 public void retransmit(long seqno, Message msg) {
343 Object dst=msg.getDest();
344
345 // bela Dec 23 2002:
346 // this will remove a member on a MERGE request, e.g. A and B merge: when A sends the unicast
347 // request to B and there's a retransmit(), B will be removed !
348
349 // if(use_gms && !members.contains(dst) && !prev_members.contains(dst)) {
350 //
351 // if(warn) log.warn("UNICAST.retransmit()", "seqno=" + seqno + ": dest " + dst +
352 // " is not member any longer; removing entry !");
353
354 // synchronized(connections) {
355 // removeConnection(dst);
356 // }
357 // return;
358 // }
359
360 if(trace)
361 log.trace("[" + local_addr + "] --> XMIT(" + dst + ": #" + seqno + ')');
362
363 if(Global.copy)
364 passDown(new Event(Event.MSG, msg.copy()));
365 else
366 passDown(new Event(Event.MSG, msg));
367 num_xmit_requests_received++;
368 }
369
370
371
372
373
374 /**
375 * Check whether the hashtable contains an entry e for <code>sender</code> (create if not). If
376 * e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
377 * add message. Set e.received_msgs to the new window. Else just add the message.
378 */
379 private void handleDataReceived(Object sender, long seqno, Message msg) {
380 Entry entry;
381
382 if(trace)
383 log.trace(new StringBuffer().append(local_addr).append(" <-- DATA(").append(sender).append(": #").append(seqno));
384
385 if(previous_members.contains(sender)) {
386 if(log.isTraceEnabled())
387 log.trace("discarding message from " + sender + " as this member left the group");
388 return;
389 }
390
391 synchronized(connections) {
392 entry=(Entry)connections.get(sender);
393 if(entry == null) {
394 entry=new Entry();
395 connections.put(sender, entry);
396 }
397 if(entry.received_msgs == null)
398 entry.received_msgs=new AckReceiverWindow(DEFAULT_FIRST_SEQNO);
399 }
400
401 entry.received_msgs.add(seqno, msg); // entry.received_msgs is guaranteed to be non-null if we get here
402 num_msgs_received++;
403 num_bytes_received+=msg.getLength();
404
405 // Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
406 Message m;
407 while((m=entry.received_msgs.remove()) != null)
408 passUp(new Event(Event.MSG, m));
409 }
410
411
412 /** Add the ACK to hashtable.sender.sent_msgs */
413 private void handleAckReceived(Object sender, long seqno) {
414 Entry entry;
415 AckSenderWindow win;
416
417 if(trace)
418 log.trace(new StringBuffer().append(local_addr).append(" <-- ACK(").append(sender).
419 append(": #").append(seqno).append(')'));
420 synchronized(connections) {
421 entry=(Entry)connections.get(sender);
422 }
423 if(entry == null || entry.sent_msgs == null)
424 return;
425 win=entry.sent_msgs;
426 win.ack(seqno); // removes message from retransmission
427 num_acks_received++;
428 }
429
430
431
432 private void sendAck(Address dst, long seqno) {
433 Message ack=new Message(dst, null, null);
434 ack.putHeader(name, new UnicastHeader(UnicastHeader.ACK, seqno));
435 if(trace)
436 log.trace(new StringBuffer().append(local_addr).append(" --> ACK(").append(dst).
437 append(": #").append(seqno).append(')'));
438 passDown(new Event(Event.MSG, ack));
439 num_acks_sent++;
440 }
441
442
443
444
445
446
447 public static class UnicastHeader extends Header implements Streamable {
448 public static final byte DATA=0;
449 public static final byte ACK=1;
450
451 byte type=DATA;
452 long seqno=0;
453
454 static final long serialized_size=Global.BYTE_SIZE + Global.LONG_SIZE;
455
456
457 public UnicastHeader() {} // used for externalization
458
459 public UnicastHeader(byte type, long seqno) {
460 this.type=type;
461 this.seqno=seqno;
462 }
463
464 public String toString() {
465 return "[UNICAST: " + type2Str(type) + ", seqno=" + seqno + ']';
466 }
467
468 public static String type2Str(byte t) {
469 switch(t) {
470 case DATA: return "DATA";
471 case ACK: return "ACK";
472 default: return "<unknown>";
473 }
474 }
475
476 public final long size() {
477 return serialized_size;
478 }
479
480
481 public void writeExternal(ObjectOutput out) throws IOException {
482 out.writeByte(type);
483 out.writeLong(seqno);
484 }
485
486
487
488 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
489 type=in.readByte();
490 seqno=in.readLong();
491 }
492
493 public void writeTo(DataOutputStream out) throws IOException {
494 out.writeByte(type);
495 out.writeLong(seqno);
496 }
497
498 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
499 type=in.readByte();
500 seqno=in.readLong();
501 }
502 }
503
504 private static final class Entry {
505 AckReceiverWindow received_msgs=null; // stores all msgs rcvd by a certain peer in seqno-order
506 AckSenderWindow sent_msgs=null; // stores (and retransmits) msgs sent by us to a certain peer
507 long sent_msgs_seqno=DEFAULT_FIRST_SEQNO; // seqno for msgs sent by us
508
509 void reset() {
510 if(sent_msgs != null)
511 sent_msgs.reset();
512 if(received_msgs != null)
513 received_msgs.reset();
514 }
515
516
517 public String toString() {
518 StringBuffer sb=new StringBuffer();
519 if(sent_msgs != null)
520 sb.append("sent_msgs=" + sent_msgs + '\n');
521 if(received_msgs != null)
522 sb.append("received_msgs=" + received_msgs + '\n');
523 return sb.toString();
524 }
525 }
526
527
528 }