Source code: org/jgroups/protocols/FRAG.java
1 // $Id: FRAG.java,v 1.28 2005/10/28 07:42:41 belaban Exp $
2
3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Message;
8 import org.jgroups.View;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.ExposedByteArrayOutputStream;
11 import org.jgroups.util.Util;
12
13 import java.io.ByteArrayInputStream;
14 import java.io.DataInputStream;
15 import java.io.DataOutputStream;
16 import java.util.*;
17
18
19
20 /**
21 * Fragmentation layer. Fragments messages larger than FRAG_SIZE into smaller packets.
22 * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended
23 * to the messages as a header (and removed at the receiving side).<p>
24 * Each fragment is identified by (a) the sender (part of the message to which the header is appended),
25 * (b) the fragmentation ID (which is unique per FRAG layer (monotonically increasing) and (c) the
26 * fragement ID which ranges from 0 to number_of_fragments-1.<p>
27 * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and
28 * multicast messages.
29 * @author Bela Ban
30 * @author Filip Hanik
31 * @version $Id: FRAG.java,v 1.28 2005/10/28 07:42:41 belaban Exp $
32 */
33 public class FRAG extends Protocol {
34 private int frag_size=8192; // conservative value
35
36 /*the fragmentation list contains a fragmentation table per sender
37 *this way it becomes easier to clean up if a sender (member) leaves or crashes
38 */
39 private final FragmentationList fragment_list=new FragmentationList();
40 private int curr_id=1;
41 private final ExposedByteArrayOutputStream bos=new ExposedByteArrayOutputStream(1024);
42 private final Vector members=new Vector(11);
43 private final static String name="FRAG";
44
45 long num_sent_msgs=0;
46 long num_sent_frags=0;
47 long num_received_msgs=0;
48 long num_received_frags=0;
49
50
51 public String getName() {
52 return name;
53 }
54
55 public int getFragSize() {return frag_size;}
56 public void setFragSize(int s) {frag_size=s;}
57 public long getNumberOfSentMessages() {return num_sent_msgs;}
58 public long getNumberOfSentFragments() {return num_sent_frags;}
59 public long getNumberOfReceivedMessages() {return num_received_msgs;}
60 public long getNumberOfReceivedFragments() {return num_received_frags;}
61
62 /**
63 * Setup the Protocol instance acording to the configuration string
64 */
65 public boolean setProperties(Properties props) {
66 String str;
67
68 super.setProperties(props);
69 str=props.getProperty("frag_size");
70 if(str != null) {
71 frag_size=Integer.parseInt(str);
72 props.remove("frag_size");
73 }
74
75 if(props.size() > 0) {
76 log.error("FRAG.setProperties(): the following properties are not recognized: " + props);
77 return false;
78 }
79 return true;
80 }
81
82 public void resetStats() {
83 super.resetStats();
84 num_sent_msgs=num_sent_frags=num_received_msgs=num_received_frags=0;
85 }
86
87
88 /**
89 * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only
90 * add a header if framentation is needed !
91 */
92 public void down(Event evt) {
93 switch(evt.getType()) {
94
95 case Event.MSG:
96 Message msg=(Message)evt.getArg();
97 long size=msg.size();
98 num_sent_msgs++;
99 if(size > frag_size) {
100 if(trace) {
101 StringBuffer sb=new StringBuffer("message size is ");
102 sb.append(size).append(", will fragment (frag_size=").append(frag_size).append(')');
103 log.trace(sb.toString());
104 }
105 fragment(msg); // Fragment and pass down
106 return;
107 }
108 break;
109
110 case Event.VIEW_CHANGE:
111 //don't do anything if this dude is sending out the view change
112 //we are receiving a view change,
113 //in here we check for the
114 View view=(View)evt.getArg();
115 Vector new_mbrs=view.getMembers(), left_mbrs;
116 Address mbr;
117
118 left_mbrs=Util.determineLeftMembers(members, new_mbrs);
119 members.clear();
120 members.addAll(new_mbrs);
121
122 for(int i=0; i < left_mbrs.size(); i++) {
123 mbr=(Address)left_mbrs.elementAt(i);
124 //the new view doesn't contain the sender, he must have left,
125 //hence we will clear all his fragmentation tables
126 fragment_list.remove(mbr);
127 if(trace)
128 log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table");
129 }
130 break;
131
132 case Event.CONFIG:
133 passDown(evt);
134 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
135 handleConfigEvent((HashMap)evt.getArg());
136 return;
137 }
138
139 passDown(evt); // Pass on to the layer below us
140 }
141
142
143 /**
144 * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack.
145 */
146 public void up(Event evt) {
147 switch(evt.getType()) {
148
149 case Event.MSG:
150 Message msg=(Message)evt.getArg();
151 Object obj=msg.getHeader(name);
152 if(obj != null && obj instanceof FragHeader) { // needs to be defragmented
153 unfragment(msg); // Unfragment and possibly pass up
154 return;
155 }
156 else {
157 num_received_msgs++;
158 }
159 break;
160
161 case Event.CONFIG:
162 passUp(evt);
163 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
164 handleConfigEvent((HashMap)evt.getArg());
165 return;
166 }
167
168 passUp(evt); // Pass up to the layer above us by default
169 }
170
171
172 /**
173 * Send all fragments as separate messages (with same ID !).
174 * Example:
175 * <pre>
176 * Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}
177 * would be fragmented into:
178 * <p/>
179 * [2344,3,0]{dst,src,buf1},
180 * [2344,3,1]{dst,src,buf2} and
181 * [2344,3,2]{dst,src,buf3}
182 * </pre>
183 */
184 private void fragment(Message msg) {
185 DataOutputStream out=null;
186 byte[] buffer;
187 byte[] fragments[];
188 Event evt;
189 FragHeader hdr;
190 Message frag_msg;
191 Address dest=msg.getDest(), src=msg.getSrc();
192 long id=curr_id++; // used as seqnos
193 int num_frags;
194
195 try {
196 // Write message into a byte buffer and fragment it
197 bos.reset();
198 out=new DataOutputStream(bos);
199 msg.writeTo(out);
200 out.flush();
201 buffer=bos.getRawBuffer();
202 fragments=Util.fragmentBuffer(buffer, frag_size, bos.size());
203 num_frags=fragments.length;
204 num_sent_frags+=num_frags;
205
206 if(trace) {
207 StringBuffer sb=new StringBuffer();
208 sb.append("fragmenting packet to ").append(dest != null ? dest.toString() : "<all members>");
209 sb.append(" (size=").append(buffer.length).append(") into ").append(num_frags);
210 sb.append(" fragment(s) [frag_size=").append(frag_size).append(']');
211 log.trace(sb.toString());
212 }
213
214 for(int i=0; i < num_frags; i++) {
215 frag_msg=new Message(dest, src, fragments[i]);
216 hdr=new FragHeader(id, i, num_frags);
217 frag_msg.putHeader(name, hdr);
218 evt=new Event(Event.MSG, frag_msg);
219 passDown(evt);
220 }
221 }
222 catch(Exception e) {
223 log.error("exception occurred trying to fragment message", e);
224 }
225 finally {
226 Util.closeOutputStream(out);
227 }
228 }
229
230
231 /**
232 * 1. Get all the fragment buffers
233 * 2. When all are received -> Assemble them into one big buffer
234 * 3. Read headers and byte buffer from big buffer
235 * 4. Set headers and buffer in msg
236 * 5. Pass msg up the stack
237 */
238 private void unfragment(Message msg) {
239 FragmentationTable frag_table;
240 Address sender=msg.getSrc();
241 Message assembled_msg;
242 FragHeader hdr=(FragHeader)msg.removeHeader(name);
243 byte[] m;
244 ByteArrayInputStream bis;
245 DataInputStream in=null;
246
247 frag_table=fragment_list.get(sender);
248 if(frag_table == null) {
249 frag_table=new FragmentationTable(sender);
250 try {
251 fragment_list.add(sender, frag_table);
252 }
253 catch(IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread
254 frag_table=fragment_list.get(sender);
255 }
256 }
257 num_received_frags++;
258 m=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer());
259 if(m != null) {
260 try {
261 bis=new ByteArrayInputStream(m);
262 in=new DataInputStream(bis);
263 assembled_msg=new Message(false);
264 assembled_msg.readFrom(in);
265 if(trace) log.trace("assembled_msg is " + assembled_msg);
266 assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
267 num_received_msgs++;
268 passUp(new Event(Event.MSG, assembled_msg));
269 }
270 catch(Exception e) {
271 log.error("exception is " + e);
272 }
273 finally {
274 Util.closeInputStream(in);
275 }
276 }
277 }
278
279
280 void handleConfigEvent(HashMap map) {
281 if(map == null) return;
282 if(map.containsKey("frag_size")) {
283 frag_size=((Integer)map.get("frag_size")).intValue();
284 if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);
285 }
286 }
287
288
289
290
291 /**
292 * A fragmentation list keeps a list of fragmentation tables
293 * sorted by an Address ( the sender ).
294 * This way, if the sender disappears or leaves the group half way
295 * sending the content, we can simply remove this members fragmentation
296 * table and clean up the memory of the receiver.
297 * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table
298 */
299 static class FragmentationList {
300 /* initialize the hashtable to hold all the fragmentation tables
301 * 11 is the best growth capacity to start with<br/>
302 * HashMap<Address,FragmentationTable>
303 */
304 private final HashMap frag_tables=new HashMap(11);
305
306
307 /**
308 * Adds a fragmentation table for this particular sender
309 * If this sender already has a fragmentation table, an IllegalArgumentException
310 * will be thrown.
311 * @param sender - the address of the sender, cannot be null
312 * @param table - the fragmentation table of this sender, cannot be null
313 * @throws IllegalArgumentException if an entry for this sender already exist
314 */
315 public void add(Address sender, FragmentationTable table) throws IllegalArgumentException {
316 FragmentationTable healthCheck;
317
318 synchronized(frag_tables) {
319 healthCheck=(FragmentationTable)frag_tables.get(sender);
320 if(healthCheck == null) {
321 frag_tables.put(sender, table);
322 }
323 else {
324 throw new IllegalArgumentException("Sender <" + sender + "> already exists in the fragementation list.");
325 }
326 }
327 }
328
329 /**
330 * returns a fragmentation table for this sender
331 * returns null if the sender doesn't have a fragmentation table
332 * @return the fragmentation table for this sender, or null if no table exist
333 */
334 public FragmentationTable get(Address sender) {
335 synchronized(frag_tables) {
336 return (FragmentationTable)frag_tables.get(sender);
337 }
338 }
339
340
341 /**
342 * returns true if this sender already holds a
343 * fragmentation for this sender, false otherwise
344 * @param sender - the sender, cannot be null
345 * @return true if this sender already has a fragmentation table
346 */
347 public boolean containsSender(Address sender) {
348 synchronized(frag_tables) {
349 return frag_tables.containsKey(sender);
350 }
351 }
352
353 /**
354 * removes the fragmentation table from the list.
355 * after this operation, the fragementation list will no longer
356 * hold a reference to this sender's fragmentation table
357 * @param sender - the sender who's fragmentation table you wish to remove, cannot be null
358 * @return true if the table was removed, false if the sender doesn't have an entry
359 */
360 public boolean remove(Address sender) {
361 synchronized(frag_tables) {
362 boolean result=containsSender(sender);
363 frag_tables.remove(sender);
364 return result;
365 }
366 }
367
368 /**
369 * returns a list of all the senders that have fragmentation tables opened.
370 * @return an array of all the senders in the fragmentation list
371 */
372 public Address[] getSenders() {
373 Address[] result;
374 int index=0;
375
376 synchronized(frag_tables) {
377 result=new Address[frag_tables.size()];
378 for(Iterator it=frag_tables.keySet().iterator(); it.hasNext();) {
379 result[index++]=(Address)it.next();
380 }
381 }
382 return result;
383 }
384
385 public String toString() {
386 Map.Entry entry;
387 StringBuffer buf=new StringBuffer("Fragmentation list contains ");
388 synchronized(frag_tables) {
389 buf.append(frag_tables.size()).append(" tables\n");
390 for(Iterator it=frag_tables.entrySet().iterator(); it.hasNext();) {
391 entry=(Map.Entry)it.next();
392 buf.append(entry.getKey()).append(": " ).append(entry.getValue()).append("\n");
393 }
394 }
395 return buf.toString();
396 }
397
398 }
399
400 /**
401 * Keeps track of the fragments that are received.
402 * Reassembles fragements into entire messages when all fragments have been received.
403 * The fragmentation holds a an array of byte arrays for a unique sender
404 * The first dimension of the array is the order of the fragmentation, in case the arrive out of order
405 */
406 static class FragmentationTable {
407 private final Address sender;
408 /* the hashtable that holds the fragmentation entries for this sender*/
409 private final Hashtable h=new Hashtable(11); // keys: frag_ids, vals: Entrys
410
411
412 FragmentationTable(Address sender) {
413 this.sender=sender;
414 }
415
416
417 /**
418 * inner class represents an entry for a message
419 * each entry holds an array of byte arrays sorted
420 * once all the byte buffer entries have been filled
421 * the fragmentation is considered complete.
422 */
423 static class Entry {
424 //the total number of fragment in this message
425 int tot_frags=0;
426 // each fragment is a byte buffer
427 byte[] fragments[]=null;
428 //the number of fragments we have received
429 int number_of_frags_recvd=0;
430 // the message ID
431 long msg_id=-1;
432
433 /**
434 * Creates a new entry
435 *
436 * @param tot_frags the number of fragments to expect for this message
437 */
438 Entry(long msg_id, int tot_frags) {
439 this.msg_id=msg_id;
440 this.tot_frags=tot_frags;
441 fragments=new byte[tot_frags][];
442 for(int i=0; i < tot_frags; i++) {
443 fragments[i]=null;
444 }
445 }
446
447 /**
448 * adds on fragmentation buffer to the message
449 *
450 * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)
451 * @param frag the byte buffer containing the data for this fragmentation, should not be null
452 */
453 public void set(int frag_id, byte[] frag) {
454 fragments[frag_id]=frag;
455 number_of_frags_recvd++;
456 }
457
458 /**
459 * returns true if this fragmentation is complete
460 * ie, all fragmentations have been received for this buffer
461 */
462 public boolean isComplete() {
463 /*first make the simple check*/
464 if(number_of_frags_recvd < tot_frags) {
465 return false;
466 }
467 /*then double check just in case*/
468 for(int i=0; i < fragments.length; i++) {
469 if(fragments[i] == null)
470 return false;
471 }
472 /*all fragmentations have been received*/
473 return true;
474 }
475
476 /**
477 * Assembles all the fragmentations into one buffer
478 * this method does not check if the fragmentation is complete
479 *
480 * @return the complete message in one buffer
481 */
482 public byte[] assembleBuffer() {
483 return Util.defragmentBuffer(fragments);
484 }
485
486 /**
487 * debug only
488 */
489 public String toString() {
490 StringBuffer ret=new StringBuffer();
491 ret.append("[tot_frags=" + tot_frags + ", number_of_frags_recvd=" + number_of_frags_recvd + ']');
492 return ret.toString();
493 }
494
495 public int hashCode() {
496 return super.hashCode();
497 }
498 }
499
500
501 /**
502 * Creates a new entry if not yet present. Adds the fragment.
503 * If all fragements for a given message have been received,
504 * an entire message is reassembled and returned.
505 * Otherwise null is returned.
506 *
507 * @param id - the message ID, unique for a sender
508 * @param frag_id the index of this fragmentation (0..tot_frags-1)
509 * @param tot_frags the total number of fragmentations expected
510 * @param fragment - the byte buffer for this fragment
511 */
512 public synchronized byte[] add(long id, int frag_id, int tot_frags, byte[] fragment) {
513
514 /*initialize the return value to default not complete */
515 byte[] retval=null;
516
517 Entry e=(Entry)h.get(new Long(id));
518
519 if(e == null) { // Create new entry if not yet present
520 e=new Entry(id, tot_frags);
521 h.put(new Long(id), e);
522 }
523
524 e.set(frag_id, fragment);
525 if(e.isComplete()) {
526 retval=e.assembleBuffer();
527 h.remove(new Long(id));
528 }
529
530 return retval;
531 }
532
533 public void reset() {
534 }
535
536 public String toString() {
537 StringBuffer buf=new StringBuffer("Fragmentation Table Sender:").append(sender).append("\n\t");
538 java.util.Enumeration e=this.h.elements();
539 while(e.hasMoreElements()) {
540 Entry entry=(Entry)e.nextElement();
541 int count=0;
542 for(int i=0; i < entry.fragments.length; i++) {
543 if(entry.fragments[i] != null) {
544 count++;
545 }
546 }
547 buf.append("Message ID:").append(entry.msg_id).append("\n\t");
548 buf.append("Total Frags:").append(entry.tot_frags).append("\n\t");
549 buf.append("Frags Received:").append(count).append("\n\n");
550 }
551 return buf.toString();
552 }
553 }
554
555 }
556
557