Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/stack/Protocol.java


1   // $Id: Protocol.java,v 1.34 2005/10/27 16:05:04 belaban Exp $
2   
3   package org.jgroups.stack;
4   
5   
6   import org.apache.commons.logging.Log;
7   import org.apache.commons.logging.LogFactory;
8   import org.jgroups.Event;
9   import org.jgroups.util.Queue;
10  import org.jgroups.util.QueueClosedException;
11  
12  import java.util.Map;
13  import java.util.Properties;
14  import java.util.Vector;
15  
16  
17  
18  
19  class UpHandler extends Thread {
20      private Queue mq=null;
21      private Protocol handler=null;
22      private ProtocolObserver observer=null;
23      protected final Log  log=LogFactory.getLog(this.getClass());
24  
25  
26      public UpHandler(Queue mq, Protocol handler, ProtocolObserver observer) {
27          this.mq=mq;
28          this.handler=handler;
29          this.observer=observer;
30          if(handler != null)
31              setName("UpHandler (" + handler.getName() + ')');
32          else
33              setName("UpHandler");
34          setDaemon(true);
35      }
36  
37  
38      public void setObserver(ProtocolObserver observer) {
39          this.observer=observer;
40      }
41  
42  
43      /** Removes events from mq and calls handler.up(evt) */
44      public void run() {
45          Event evt;
46          while(!mq.closed()) {
47              try {
48                  evt=(Event)mq.remove();
49                  if(evt == null) {
50                      if(log.isWarnEnabled()) log.warn("removed null event");
51                      continue;
52                  }
53  
54                  if(observer != null) {                          // call debugger hook (if installed)
55                      if(observer.up(evt, mq.size()) == false) {  // false means discard event
56                          return;
57                      }
58                  }
59                  handler.up(evt);
60              }
61              catch(QueueClosedException queue_closed) {
62                  break;
63              }
64              catch(Throwable e) {
65                  if(log.isWarnEnabled()) log.warn(getName() + " caught exception", e);
66              }
67          }
68      }
69  
70  }
71  
72  
73  class DownHandler extends Thread {
74      private Queue mq=null;
75      private Protocol handler=null;
76      private ProtocolObserver observer=null;
77      protected final Log  log=LogFactory.getLog(this.getClass());
78  
79  
80  
81      public DownHandler(Queue mq, Protocol handler, ProtocolObserver observer) {
82          this.mq=mq;
83          this.handler=handler;
84          this.observer=observer;
85          if(handler != null)
86              setName("DownHandler (" + handler.getName() + ')');
87          else
88              setName("DownHandler");
89          setDaemon(true);
90      }
91  
92  
93      public void setObserver(ProtocolObserver observer) {
94          this.observer=observer;
95      }
96  
97  
98      /** Removes events from mq and calls handler.down(evt) */
99      public void run() {
100         Event evt;
101         while(!mq.closed()) {
102             try {
103                 evt=(Event)mq.remove();
104                 if(evt == null) {
105                     if(log.isWarnEnabled()) log.warn("removed null event");
106                     continue;
107                 }
108 
109                 if(observer != null) {                            // call debugger hook (if installed)
110                     if(observer.down(evt, mq.size()) == false) {  // false means discard event
111                         continue;
112                     }
113                 }
114 
115                 int type=evt.getType();
116                 if(type == Event.START || type == Event.STOP) {
117                     if(handler.handleSpecialDownEvent(evt) == false)
118                         continue;
119                 }
120                 handler.down(evt);
121             }
122             catch(QueueClosedException queue_closed) {
123                 break;
124             }
125             catch(Throwable e) {
126                 if(log.isWarnEnabled()) log.warn(getName() + " caught exception", e);
127             }
128         }
129     }
130 
131 }
132 
133 
134 /**
135  * The Protocol class provides a set of common services for protocol layers. Each layer has to
136  * be a subclass of Protocol and override a number of methods (typically just <code>up()</code>,
137  * <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form
138  * a protocol stack. <a href=org.jgroups.Event.html>Events</a> are passed from lower
139  * layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom
140  * will be passed to its higher layer as an Event. That layer will in turn pass the Event to
141  * its layer and so on, until a layer handles the Message and sends a response or discards it,
142  * the former resulting in another Event being passed down the stack.<p>
143  * Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is
144  * received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed
145  * in the up-queue where it will be retrieved by the up-handler thread which will invoke method
146  * <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling
147  * of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol
148  * class, subclasses will almost never have to override this behavior.<p>
149  * The important thing to bear in mind is that Events have to passed on between layers in FIFO
150  * order which is guaranteed by the Protocol implementation and must be guranteed by subclasses
151  * implementing their on Event queuing.<p>
152  * <b>Note that each class implementing interface Protocol MUST provide an empty, public
153  * constructor !</b>
154  */
155 public abstract class Protocol {
156     protected final Properties props=new Properties();
157     protected Protocol         up_prot=null, down_prot=null;
158     protected ProtocolStack    stack=null;
159     protected final Queue      up_queue=new Queue();
160     protected final Queue      down_queue=new Queue();
161     protected UpHandler        up_handler=null;
162     protected int              up_thread_prio=-1;
163     protected DownHandler      down_handler=null;
164     protected int              down_thread_prio=-1;
165     protected ProtocolObserver observer=null; // hook for debugger
166     private final static long  THREAD_JOIN_TIMEOUT=1000;
167     protected boolean          down_thread=true;  // determines whether the down_handler thread should be started
168     protected boolean          up_thread=true;    // determines whether the up_handler thread should be started
169     protected boolean          stats=true;  // determines whether to collect statistics (and expose them via JMX)
170     protected final Log        log=LogFactory.getLog(this.getClass());
171     protected boolean          trace=log.isTraceEnabled();
172     protected boolean          warn=log.isWarnEnabled();
173 
174 
175     /**
176      * Configures the protocol initially. A configuration string consists of name=value
177      * items, separated by a ';' (semicolon), e.g.:<pre>
178      * "loopback=false;unicast_inport=4444"
179      * </pre>
180      */
181     public boolean setProperties(Properties props) {
182         if(props != null)
183             this.props.putAll(props);
184         return true;
185     }
186 
187 
188     /** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then
189      *  calls setProperties(), which might invoke the setProperties() method of the actual protocol instance.
190      */
191     public boolean setPropertiesInternal(Properties props) {
192         String str;
193         this.props.putAll(props);
194 
195         str=props.getProperty("down_thread");
196         if(str != null) {
197             down_thread=Boolean.valueOf(str).booleanValue();
198             props.remove("down_thread");
199         }
200 
201         str=props.getProperty("down_thread_prio");
202         if(str != null) {
203             down_thread_prio=Integer.parseInt(str);
204             props.remove("down_thread_prio");
205         }
206 
207         str=props.getProperty("up_thread");
208         if(str != null) {
209             up_thread=Boolean.valueOf(str).booleanValue();
210             props.remove("up_thread");
211         }
212 
213         str=props.getProperty("up_thread_prio");
214         if(str != null) {
215             up_thread_prio=Integer.parseInt(str);
216             props.remove("up_thread_prio");
217         }
218 
219         str=props.getProperty("stats");
220         if(str != null) {
221             stats=Boolean.valueOf(str).booleanValue();
222             props.remove("stats");
223         }
224 
225         return setProperties(props);
226     }
227 
228 
229     public Properties getProperties() {
230         return props;
231     }
232 
233 
234     public boolean isTrace() {
235         return trace;
236     }
237 
238     public void setTrace(boolean trace) {
239         this.trace=trace;
240     }
241 
242     public boolean isWarn() {
243         return warn;
244     }
245 
246     public void setWarn(boolean warn) {
247         this.warn=warn;
248     }
249 
250     public boolean upThreadEnabled() {
251         return up_thread;
252     }
253 
254     public boolean downThreadEnabled() {
255         return down_thread;
256     }
257 
258     public boolean statsEnabled() {
259         return stats;
260     }
261 
262     public void enableStats(boolean flag) {
263         stats=flag;
264     }
265 
266     public void resetStats() {
267         ;
268     }
269 
270     public String printStats() {
271         return null;
272     }
273 
274     public Map dumpStats() {
275         return null;
276     }
277 
278 
279     public void setObserver(ProtocolObserver observer) {
280         this.observer=observer;
281         observer.setProtocol(this);
282         if(up_handler != null)
283             up_handler.setObserver(observer);
284         if(down_handler != null)
285             down_handler.setObserver(observer);
286     }
287 
288     /**
289      * Called after instance has been created (null constructor) and before protocol is started.
290      * Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
291      * @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the
292      *                      ProtocolStack to fail, so the channel constructor will throw an exception
293      */
294     public void init() throws Exception {
295     }
296 
297     /**
298      * This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work.
299      * Protocols are connected and queues are ready to receive events.
300      * Will be called <em>from bottom to top</em>. This call will replace
301      * the <b>START</b> and <b>START_OK</b> events.
302      * @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
303      *                      to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception
304      */
305     public void start() throws Exception {
306     }
307 
308     /**
309      * This method is called on a {@link org.jgroups.Channel#disconnect()}. Stops work (e.g. by closing multicast socket).
310      * Will be called <em>from top to bottom</em>. This means that at the time of the method invocation the
311      * neighbor protocol below is still working. This method will replace the
312      * <b>STOP</b>, <b>STOP_OK</b>, <b>CLEANUP</b> and <b>CLEANUP_OK</b> events. The ProtocolStack guarantees that
313      * when this method is called all messages in the down queue will have been flushed
314      */
315     public void stop() {
316     }
317 
318 
319     /**
320      * This method is called on a {@link org.jgroups.Channel#close()}.
321      * Does some cleanup; after the call the VM will terminate
322      */
323     public void destroy() {
324     }
325 
326 
327     public Queue getUpQueue() {
328         return up_queue;
329     }    // used by Debugger (ProtocolView)
330 
331     public Queue getDownQueue() {
332         return down_queue;
333     }  // used by Debugger (ProtocolView)
334 
335 
336     /** List of events that are required to be answered by some layer above.
337      @return Vector (of Integers) */
338     public Vector requiredUpServices() {
339         return null;
340     }
341 
342     /** List of events that are required to be answered by some layer below.
343      @return Vector (of Integers) */
344     public Vector requiredDownServices() {
345         return null;
346     }
347 
348     /** List of events that are provided to layers above (they will be handled when sent down from
349      above).
350      @return Vector (of Integers) */
351     public Vector providedUpServices() {
352         return null;
353     }
354 
355     /** List of events that are provided to layers below (they will be handled when sent down from
356      below).
357      @return Vector (of Integers) */
358     public Vector providedDownServices() {
359         return null;
360     }
361 
362 
363     public abstract String getName();   // all protocol names have to be unique !
364 
365     public Protocol getUpProtocol() {
366         return up_prot;
367     }
368 
369     public Protocol getDownProtocol() {
370         return down_prot;
371     }
372 
373     public void setUpProtocol(Protocol up_prot) {
374         this.up_prot=up_prot;
375     }
376 
377     public void setDownProtocol(Protocol down_prot) {
378         this.down_prot=down_prot;
379     }
380 
381     public void setProtocolStack(ProtocolStack stack) {
382         this.stack=stack;
383     }
384 
385 
386     /** Used internally. If overridden, call this method first. Only creates the up_handler thread
387      if down_thread is true */
388     public void startUpHandler() {
389         if(up_thread) {
390             if(up_handler == null) {
391                 up_handler=new UpHandler(up_queue, this, observer);
392                 if(up_thread_prio >= 0) {
393                     try {
394                         up_handler.setPriority(up_thread_prio);
395                     }
396                     catch(Throwable t) {
397                         if(log.isErrorEnabled()) log.error("priority " + up_thread_prio +
398                                 " could not be set for thread", t);
399                     }
400                 }
401                 up_handler.start();
402             }
403         }
404     }
405 
406 
407     /** Used internally. If overridden, call this method first. Only creates the down_handler thread
408      if down_thread is true */
409     public void startDownHandler() {
410         if(down_thread) {
411             if(down_handler == null) {
412                 down_handler=new DownHandler(down_queue, this, observer);
413                 if(down_thread_prio >= 0) {
414                     try {
415                         down_handler.setPriority(down_thread_prio);
416                     }
417                     catch(Throwable t) {
418                         if(log.isErrorEnabled()) log.error("priority " + down_thread_prio +
419                                 " could not be set for thread", t);
420                     }
421                 }
422                 down_handler.start();
423             }
424         }
425     }
426 
427 
428     /** Used internally. If overridden, call parent's method first */
429     public void stopInternal() {
430         up_queue.close(false);  // this should terminate up_handler thread
431 
432         if(up_handler != null && up_handler.isAlive()) {
433             try {
434                 up_handler.join(THREAD_JOIN_TIMEOUT);
435             }
436             catch(Exception ex) {
437             }
438             if(up_handler != null && up_handler.isAlive()) {
439                 up_handler.interrupt();  // still alive ? let's just kill it without mercy...
440                 try {
441                     up_handler.join(THREAD_JOIN_TIMEOUT);
442                 }
443                 catch(Exception ex) {
444                 }
445                 if(up_handler != null && up_handler.isAlive())
446                     if(log.isErrorEnabled()) log.error("up_handler thread for " + getName() +
447                                                            " was interrupted (in order to be terminated), but is still alive");
448             }
449         }
450         up_handler=null;
451 
452         down_queue.close(false); // this should terminate down_handler thread
453         if(down_handler != null && down_handler.isAlive()) {
454             try {
455                 down_handler.join(THREAD_JOIN_TIMEOUT);
456             }
457             catch(Exception ex) {
458             }
459             if(down_handler != null && down_handler.isAlive()) {
460                 down_handler.interrupt(); // still alive ? let's just kill it without mercy...
461                 try {
462                     down_handler.join(THREAD_JOIN_TIMEOUT);
463                 }
464                 catch(Exception ex) {
465                 }
466                 if(down_handler != null && down_handler.isAlive())
467                     if(log.isErrorEnabled()) log.error("down_handler thread for " + getName() +
468                                                            " was interrupted (in order to be terminated), but is is still alive");
469             }
470         }
471         down_handler=null;
472     }
473 
474 
475     /**
476      * Internal method, should not be called by clients. Used by ProtocolStack. I would have
477      * used the 'friends' modifier, but this is available only in C++ ... If the up_handler thread
478      * is not available (down_thread == false), then directly call the up() method: we will run on the
479      * caller's thread (e.g. the protocol layer below us).
480      */
481     protected void receiveUpEvent(Event evt) {
482         if(up_handler == null) {
483             if(observer != null) {                               // call debugger hook (if installed)
484                 if(observer.up(evt, up_queue.size()) == false) {  // false means discard event
485                     return;
486                 }
487             }
488             up(evt);
489             return;
490         }
491         try {
492             up_queue.add(evt);
493         }
494         catch(Exception e) {
495             if(log.isWarnEnabled()) log.warn("exception: " + e);
496         }
497     }
498 
499     /**
500      * Internal method, should not be called by clients. Used by ProtocolStack. I would have
501      * used the 'friends' modifier, but this is available only in C++ ... If the down_handler thread
502      * is not available (down_thread == false), then directly call the down() method: we will run on the
503      * caller's thread (e.g. the protocol layer above us).
504      */
505     protected void receiveDownEvent(Event evt) {
506         if(down_handler == null) {
507             if(observer != null) {                                    // call debugger hook (if installed)
508                 if(observer.down(evt, down_queue.size()) == false) {  // false means discard event
509                     return;
510                 }
511             }
512             int type=evt.getType();
513             if(type == Event.START || type == Event.STOP) {
514                 if(handleSpecialDownEvent(evt) == false)
515                     return;
516             }
517             down(evt);
518             return;
519         }
520         try {
521             down_queue.add(evt);
522         }
523         catch(Exception e) {
524             if(log.isWarnEnabled()) log.warn("exception: " + e);
525         }
526     }
527 
528     /**
529      * Causes the event to be forwarded to the next layer up in the hierarchy. Typically called
530      * by the implementation of <code>Up</code> (when done).
531      */
532     public void passUp(Event evt) {
533         if(observer != null) {                   // call debugger hook (if installed)
534             if(observer.passUp(evt) == false) {  // false means don't pass up (=discard) event
535                 return;
536             }
537         }
538         up_prot.receiveUpEvent(evt);
539     }
540 
541     /**
542      * Causes the event to be forwarded to the next layer down in the hierarchy.Typically called
543      * by the implementation of <code>Down</code> (when done).
544      */
545     public void passDown(Event evt) {
546         if(observer != null) {                     // call debugger hook (if installed)
547             if(observer.passDown(evt) == false) {  // false means don't pass down (=discard) event
548                 return;
549             }
550         }
551         down_prot.receiveDownEvent(evt);
552     }
553 
554 
555     /**
556      * An event was received from the layer below. Usually the current layer will want to examine
557      * the event type and - depending on its type - perform some computation
558      * (e.g. removing headers from a MSG event type, or updating the internal membership list
559      * when receiving a VIEW_CHANGE event).
560      * Finally the event is either a) discarded, or b) an event is sent down
561      * the stack using <code>passDown()</code> or c) the event (or another event) is sent up
562      * the stack using <code>passUp()</code>.
563      */
564     public void up(Event evt) {
565         passUp(evt);
566     }
567 
568     /**
569      * An event is to be sent down the stack. The layer may want to examine its type and perform
570      * some action on it, depending on the event's type. If the event is a message MSG, then
571      * the layer may need to add a header to it (or do nothing at all) before sending it down
572      * the stack using <code>passDown()</code>. In case of a GET_ADDRESS event (which tries to
573      * retrieve the stack's address from one of the bottom layers), the layer may need to send
574      * a new response event back up the stack using <code>passUp()</code>.
575      */
576     public void down(Event evt) {
577         passDown(evt);
578     }
579 
580 
581     /**  These are special internal events that should not be handled by protocols
582      * @return boolean True: the event should be passed further down the stack. False: the event should
583      * be discarded (not passed down the stack)
584      */
585     protected boolean handleSpecialDownEvent(Event evt) {
586         switch(evt.getType()) {
587             case Event.START:
588                 try {
589                     start();
590 
591                     // if we're the transport protocol, reply with a START_OK up the stack
592                     if(down_prot == null) {
593                         passUp(new Event(Event.START_OK, Boolean.TRUE));
594                         return false; // don't pass down the stack
595                     }
596                     else
597                         return true; // pass down the stack
598                 }
599                 catch(Exception e) {
600                     passUp(new Event(Event.START_OK, new Exception("exception caused by " + getName() + ".start(): " + e)));
601                     return false;
602                 }
603             case Event.STOP:
604                 stop();
605                 if(down_prot == null) {
606                     passUp(new Event(Event.STOP_OK, Boolean.TRUE));
607                     return false; // don't pass down the stack
608                 }
609                 else
610                     return true; // pass down the stack
611             default:
612                 return true; // pass down by default
613         }
614     }
615 }