Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/protocols/FRAG.java


1   // $Id: FRAG.java,v 1.28 2005/10/28 07:42:41 belaban Exp $
2   
3   package org.jgroups.protocols;
4   
5   import org.jgroups.Address;
6   import org.jgroups.Event;
7   import org.jgroups.Message;
8   import org.jgroups.View;
9   import org.jgroups.stack.Protocol;
10  import org.jgroups.util.ExposedByteArrayOutputStream;
11  import org.jgroups.util.Util;
12  
13  import java.io.ByteArrayInputStream;
14  import java.io.DataInputStream;
15  import java.io.DataOutputStream;
16  import java.util.*;
17  
18  
19  
20  /**
21   * Fragmentation layer. Fragments messages larger than FRAG_SIZE into smaller packets.
22   * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended
23   * to the messages as a header (and removed at the receiving side).<p>
24   * Each fragment is identified by (a) the sender (part of the message to which the header is appended),
25   * (b) the fragmentation ID (which is unique per FRAG layer (monotonically increasing) and (c) the
26   * fragement ID which ranges from 0 to number_of_fragments-1.<p>
27   * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and
28   * multicast messages.
29   * @author Bela Ban
30   * @author Filip Hanik
31   * @version $Id: FRAG.java,v 1.28 2005/10/28 07:42:41 belaban Exp $
32   */
33  public class FRAG extends Protocol {
34      private int frag_size=8192;  // conservative value
35  
36      /*the fragmentation list contains a fragmentation table per sender
37       *this way it becomes easier to clean up if a sender (member) leaves or crashes
38       */
39      private final FragmentationList     fragment_list=new FragmentationList();
40      private int                         curr_id=1;
41      private final ExposedByteArrayOutputStream bos=new ExposedByteArrayOutputStream(1024);
42      private final Vector                members=new Vector(11);
43      private final static String         name="FRAG";
44  
45      long num_sent_msgs=0;
46      long num_sent_frags=0;
47      long num_received_msgs=0;
48      long num_received_frags=0;
49  
50  
51      public String getName() {
52          return name;
53      }
54  
55      public int getFragSize() {return frag_size;}
56      public void setFragSize(int s) {frag_size=s;}
57      public long getNumberOfSentMessages() {return num_sent_msgs;}
58      public long getNumberOfSentFragments() {return num_sent_frags;}
59      public long getNumberOfReceivedMessages() {return num_received_msgs;}
60      public long getNumberOfReceivedFragments() {return num_received_frags;}
61  
62      /**
63       * Setup the Protocol instance acording to the configuration string
64       */
65      public boolean setProperties(Properties props) {
66          String str;
67          
68          super.setProperties(props);
69          str=props.getProperty("frag_size");
70          if(str != null) {
71              frag_size=Integer.parseInt(str);
72              props.remove("frag_size");
73          }
74  
75          if(props.size() > 0) {
76              log.error("FRAG.setProperties(): the following properties are not recognized: " + props);
77              return false;
78          }
79          return true;
80      }
81  
82      public void resetStats() {
83          super.resetStats();
84          num_sent_msgs=num_sent_frags=num_received_msgs=num_received_frags=0;
85      }
86  
87  
88      /**
89       * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only
90       * add a header if framentation is needed !
91       */
92      public void down(Event evt) {
93          switch(evt.getType()) {
94  
95          case Event.MSG:
96              Message msg=(Message)evt.getArg();
97              long size=msg.size();
98              num_sent_msgs++;
99              if(size > frag_size) {
100                 if(trace) {
101                     StringBuffer sb=new StringBuffer("message size is ");
102                     sb.append(size).append(", will fragment (frag_size=").append(frag_size).append(')');
103                     log.trace(sb.toString());
104                 }
105                 fragment(msg);  // Fragment and pass down
106                 return;
107             }
108             break;
109 
110         case Event.VIEW_CHANGE:
111             //don't do anything if this dude is sending out the view change
112             //we are receiving a view change,
113             //in here we check for the
114             View view=(View)evt.getArg();
115             Vector new_mbrs=view.getMembers(), left_mbrs;
116             Address mbr;
117 
118             left_mbrs=Util.determineLeftMembers(members, new_mbrs);
119             members.clear();
120             members.addAll(new_mbrs);
121 
122             for(int i=0; i < left_mbrs.size(); i++) {
123                 mbr=(Address)left_mbrs.elementAt(i);
124                 //the new view doesn't contain the sender, he must have left,
125                 //hence we will clear all his fragmentation tables
126                 fragment_list.remove(mbr);
127                 if(trace)
128                     log.trace("[VIEW_CHANGE] removed " + mbr + " from fragmentation table");
129             }
130             break;
131 
132         case Event.CONFIG:
133             passDown(evt);
134             if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
135             handleConfigEvent((HashMap)evt.getArg());
136             return;
137         }
138 
139         passDown(evt);  // Pass on to the layer below us
140     }
141 
142 
143     /**
144      * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack.
145      */
146     public void up(Event evt) {
147         switch(evt.getType()) {
148 
149         case Event.MSG:
150             Message msg=(Message)evt.getArg();
151             Object obj=msg.getHeader(name);
152             if(obj != null && obj instanceof FragHeader) { // needs to be defragmented
153                 unfragment(msg); // Unfragment and possibly pass up
154                 return;
155             }
156             else {
157                 num_received_msgs++;
158             }
159             break;
160 
161         case Event.CONFIG:
162             passUp(evt);
163             if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
164             handleConfigEvent((HashMap)evt.getArg());
165             return;
166         }
167 
168         passUp(evt); // Pass up to the layer above us by default
169     }
170 
171 
172     /**
173      * Send all fragments as separate messages (with same ID !).
174      * Example:
175      * <pre>
176      * Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}
177      * would be fragmented into:
178      * <p/>
179      * [2344,3,0]{dst,src,buf1},
180      * [2344,3,1]{dst,src,buf2} and
181      * [2344,3,2]{dst,src,buf3}
182      * </pre>
183      */
184     private void fragment(Message msg) {
185         DataOutputStream   out=null;
186         byte[]             buffer;
187         byte[]             fragments[];
188         Event              evt;
189         FragHeader         hdr;
190         Message            frag_msg;
191         Address            dest=msg.getDest(), src=msg.getSrc();
192         long               id=curr_id++; // used as seqnos
193         int                num_frags;
194 
195         try {
196             // Write message into a byte buffer and fragment it
197             bos.reset();
198             out=new DataOutputStream(bos);
199             msg.writeTo(out);
200             out.flush();
201             buffer=bos.getRawBuffer();
202             fragments=Util.fragmentBuffer(buffer, frag_size, bos.size());
203             num_frags=fragments.length;
204             num_sent_frags+=num_frags;
205 
206             if(trace) {
207                 StringBuffer sb=new StringBuffer();
208                 sb.append("fragmenting packet to ").append(dest != null ? dest.toString() : "<all members>");
209                 sb.append(" (size=").append(buffer.length).append(") into ").append(num_frags);
210                 sb.append(" fragment(s) [frag_size=").append(frag_size).append(']');
211                 log.trace(sb.toString());
212             }
213 
214             for(int i=0; i < num_frags; i++) {
215                 frag_msg=new Message(dest, src, fragments[i]);
216                 hdr=new FragHeader(id, i, num_frags);
217                 frag_msg.putHeader(name, hdr);
218                 evt=new Event(Event.MSG, frag_msg);
219                 passDown(evt);
220             }
221         }
222         catch(Exception e) {
223             log.error("exception occurred trying to fragment message", e);
224         }
225         finally {
226             Util.closeOutputStream(out);
227         }
228     }
229 
230 
231     /**
232      * 1. Get all the fragment buffers
233      * 2. When all are received -> Assemble them into one big buffer
234      * 3. Read headers and byte buffer from big buffer
235      * 4. Set headers and buffer in msg
236      * 5. Pass msg up the stack
237      */
238     private void unfragment(Message msg) {
239         FragmentationTable   frag_table;
240         Address              sender=msg.getSrc();
241         Message              assembled_msg;
242         FragHeader           hdr=(FragHeader)msg.removeHeader(name);
243         byte[]               m;
244         ByteArrayInputStream bis;
245         DataInputStream      in=null;
246 
247         frag_table=fragment_list.get(sender);
248         if(frag_table == null) {
249             frag_table=new FragmentationTable(sender);
250             try {
251                 fragment_list.add(sender, frag_table);
252             }
253             catch(IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread
254                 frag_table=fragment_list.get(sender);
255             }
256         }
257         num_received_frags++;
258         m=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer());
259         if(m != null) {
260             try {
261                 bis=new ByteArrayInputStream(m);
262                 in=new DataInputStream(bis);
263                 assembled_msg=new Message(false);
264                 assembled_msg.readFrom(in);
265                 if(trace) log.trace("assembled_msg is " + assembled_msg);
266                 assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
267                 num_received_msgs++;
268                 passUp(new Event(Event.MSG, assembled_msg));
269             }
270             catch(Exception e) {
271                 log.error("exception is " + e);
272             }
273             finally {
274                 Util.closeInputStream(in);
275             }
276         }
277     }
278 
279 
280     void handleConfigEvent(HashMap map) {
281         if(map == null) return;
282         if(map.containsKey("frag_size")) {
283             frag_size=((Integer)map.get("frag_size")).intValue();
284             if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);
285         }
286     }
287 
288 
289 
290 
291     /**
292      * A fragmentation list keeps a list of fragmentation tables
293      * sorted by an Address ( the sender ).
294      * This way, if the sender disappears or leaves the group half way
295      * sending the content, we can simply remove this members fragmentation
296      * table and clean up the memory of the receiver.
297      * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table
298      */
299     static class FragmentationList {
300         /* initialize the hashtable to hold all the fragmentation tables
301          * 11 is the best growth capacity to start with<br/>
302          * HashMap<Address,FragmentationTable>
303          */
304         private final HashMap frag_tables=new HashMap(11);
305 
306 
307         /**
308          * Adds a fragmentation table for this particular sender
309          * If this sender already has a fragmentation table, an IllegalArgumentException
310          * will be thrown.
311          * @param sender - the address of the sender, cannot be null
312          * @param table  - the fragmentation table of this sender, cannot be null
313          * @throws IllegalArgumentException if an entry for this sender already exist
314          */
315         public void add(Address sender, FragmentationTable table) throws IllegalArgumentException {
316             FragmentationTable healthCheck;
317 
318             synchronized(frag_tables) {
319                 healthCheck=(FragmentationTable)frag_tables.get(sender);
320                 if(healthCheck == null) {
321                     frag_tables.put(sender, table);
322                 }
323                 else {
324                     throw new IllegalArgumentException("Sender <" + sender + "> already exists in the fragementation list.");
325                 }
326             }
327         }
328 
329         /**
330          * returns a fragmentation table for this sender
331          * returns null if the sender doesn't have a fragmentation table
332          * @return the fragmentation table for this sender, or null if no table exist
333          */
334         public FragmentationTable get(Address sender) {
335             synchronized(frag_tables) {
336                 return (FragmentationTable)frag_tables.get(sender);
337             }
338         }
339 
340 
341         /**
342          * returns true if this sender already holds a
343          * fragmentation for this sender, false otherwise
344          * @param sender - the sender, cannot be null
345          * @return true if this sender already has a fragmentation table
346          */
347         public boolean containsSender(Address sender) {
348             synchronized(frag_tables) {
349                 return frag_tables.containsKey(sender);
350             }
351         }
352 
353         /**
354          * removes the fragmentation table from the list.
355          * after this operation, the fragementation list will no longer
356          * hold a reference to this sender's fragmentation table
357          * @param sender - the sender who's fragmentation table you wish to remove, cannot be null
358          * @return true if the table was removed, false if the sender doesn't have an entry
359          */
360         public boolean remove(Address sender) {
361             synchronized(frag_tables) {
362                 boolean result=containsSender(sender);
363                 frag_tables.remove(sender);
364                 return result;
365             }
366         }
367 
368         /**
369          * returns a list of all the senders that have fragmentation tables opened.
370          * @return an array of all the senders in the fragmentation list
371          */
372         public Address[] getSenders() {
373             Address[] result;
374             int index=0;
375 
376             synchronized(frag_tables) {
377                 result=new Address[frag_tables.size()];
378                 for(Iterator it=frag_tables.keySet().iterator(); it.hasNext();) {
379                     result[index++]=(Address)it.next();
380                 }
381             }
382             return result;
383         }
384 
385         public String toString() {
386             Map.Entry entry;
387             StringBuffer buf=new StringBuffer("Fragmentation list contains ");
388             synchronized(frag_tables) {
389                 buf.append(frag_tables.size()).append(" tables\n");
390                 for(Iterator it=frag_tables.entrySet().iterator(); it.hasNext();) {
391                     entry=(Map.Entry)it.next();
392                     buf.append(entry.getKey()).append(": " ).append(entry.getValue()).append("\n");
393                 }
394             }
395             return buf.toString();
396         }
397 
398     }
399 
400     /**
401      * Keeps track of the fragments that are received.
402      * Reassembles fragements into entire messages when all fragments have been received.
403      * The fragmentation holds a an array of byte arrays for a unique sender
404      * The first dimension of the array is the order of the fragmentation, in case the arrive out of order
405      */
406     static class FragmentationTable {
407         private final Address sender;
408         /* the hashtable that holds the fragmentation entries for this sender*/
409         private final Hashtable h=new Hashtable(11);  // keys: frag_ids, vals: Entrys
410 
411 
412         FragmentationTable(Address sender) {
413             this.sender=sender;
414         }
415 
416 
417         /**
418          * inner class represents an entry for a message
419          * each entry holds an array of byte arrays sorted
420          * once all the byte buffer entries have been filled
421          * the fragmentation is considered complete.
422          */
423         static class Entry {
424             //the total number of fragment in this message
425             int tot_frags=0;
426             // each fragment is a byte buffer
427             byte[] fragments[]=null;
428             //the number of fragments we have received
429             int number_of_frags_recvd=0;
430             // the message ID
431             long msg_id=-1;
432 
433             /**
434              * Creates a new entry
435              *
436              * @param tot_frags the number of fragments to expect for this message
437              */
438             Entry(long msg_id, int tot_frags) {
439                 this.msg_id=msg_id;
440                 this.tot_frags=tot_frags;
441                 fragments=new byte[tot_frags][];
442                 for(int i=0; i < tot_frags; i++) {
443                     fragments[i]=null;
444                 }
445             }
446 
447             /**
448              * adds on fragmentation buffer to the message
449              *
450              * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)
451              * @param frag    the byte buffer containing the data for this fragmentation, should not be null
452              */
453             public void set(int frag_id, byte[] frag) {
454                 fragments[frag_id]=frag;
455                 number_of_frags_recvd++;
456             }
457 
458             /**
459              * returns true if this fragmentation is complete
460              * ie, all fragmentations have been received for this buffer
461              */
462             public boolean isComplete() {
463                 /*first make the simple check*/
464                 if(number_of_frags_recvd < tot_frags) {
465                     return false;
466                 }
467                 /*then double check just in case*/
468                 for(int i=0; i < fragments.length; i++) {
469                     if(fragments[i] == null)
470                         return false;
471                 }
472                 /*all fragmentations have been received*/
473                 return true;
474             }
475 
476             /**
477              * Assembles all the fragmentations into one buffer
478              * this method does not check if the fragmentation is complete
479              *
480              * @return the complete message in one buffer
481              */
482             public byte[] assembleBuffer() {
483                 return Util.defragmentBuffer(fragments);
484             }
485 
486             /**
487              * debug only
488              */
489             public String toString() {
490                 StringBuffer ret=new StringBuffer();
491                 ret.append("[tot_frags=" + tot_frags + ", number_of_frags_recvd=" + number_of_frags_recvd + ']');
492                 return ret.toString();
493             }
494 
495             public int hashCode() {
496                 return super.hashCode();
497             }
498         }
499 
500 
501         /**
502          * Creates a new entry if not yet present. Adds the fragment.
503          * If all fragements for a given message have been received,
504          * an entire message is reassembled and returned.
505          * Otherwise null is returned.
506          *
507          * @param id        - the message ID, unique for a sender
508          * @param frag_id   the index of this fragmentation (0..tot_frags-1)
509          * @param tot_frags the total number of fragmentations expected
510          * @param fragment  - the byte buffer for this fragment
511          */
512         public synchronized byte[] add(long id, int frag_id, int tot_frags, byte[] fragment) {
513 
514             /*initialize the return value to default not complete */
515             byte[] retval=null;
516 
517             Entry e=(Entry)h.get(new Long(id));
518 
519             if(e == null) {   // Create new entry if not yet present
520                 e=new Entry(id, tot_frags);
521                 h.put(new Long(id), e);
522             }
523 
524             e.set(frag_id, fragment);
525             if(e.isComplete()) {
526                 retval=e.assembleBuffer();
527                 h.remove(new Long(id));
528             }
529 
530             return retval;
531         }
532 
533         public void reset() {
534         }
535 
536         public String toString() {
537             StringBuffer buf=new StringBuffer("Fragmentation Table Sender:").append(sender).append("\n\t");
538             java.util.Enumeration e=this.h.elements();
539             while(e.hasMoreElements()) {
540                 Entry entry=(Entry)e.nextElement();
541                 int count=0;
542                 for(int i=0; i < entry.fragments.length; i++) {
543                     if(entry.fragments[i] != null) {
544                         count++;
545                     }
546                 }
547                 buf.append("Message ID:").append(entry.msg_id).append("\n\t");
548                 buf.append("Total Frags:").append(entry.tot_frags).append("\n\t");
549                 buf.append("Frags Received:").append(count).append("\n\n");
550             }
551             return buf.toString();
552         }
553     }
554 
555 }
556 
557