Source code: org/jgroups/stack/Protocol.java
1 // $Id: Protocol.java,v 1.34 2005/10/27 16:05:04 belaban Exp $
2
3 package org.jgroups.stack;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Event;
9 import org.jgroups.util.Queue;
10 import org.jgroups.util.QueueClosedException;
11
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.Vector;
15
16
17
18
19 class UpHandler extends Thread {
20 private Queue mq=null;
21 private Protocol handler=null;
22 private ProtocolObserver observer=null;
23 protected final Log log=LogFactory.getLog(this.getClass());
24
25
26 public UpHandler(Queue mq, Protocol handler, ProtocolObserver observer) {
27 this.mq=mq;
28 this.handler=handler;
29 this.observer=observer;
30 if(handler != null)
31 setName("UpHandler (" + handler.getName() + ')');
32 else
33 setName("UpHandler");
34 setDaemon(true);
35 }
36
37
38 public void setObserver(ProtocolObserver observer) {
39 this.observer=observer;
40 }
41
42
43 /** Removes events from mq and calls handler.up(evt) */
44 public void run() {
45 Event evt;
46 while(!mq.closed()) {
47 try {
48 evt=(Event)mq.remove();
49 if(evt == null) {
50 if(log.isWarnEnabled()) log.warn("removed null event");
51 continue;
52 }
53
54 if(observer != null) { // call debugger hook (if installed)
55 if(observer.up(evt, mq.size()) == false) { // false means discard event
56 return;
57 }
58 }
59 handler.up(evt);
60 }
61 catch(QueueClosedException queue_closed) {
62 break;
63 }
64 catch(Throwable e) {
65 if(log.isWarnEnabled()) log.warn(getName() + " caught exception", e);
66 }
67 }
68 }
69
70 }
71
72
73 class DownHandler extends Thread {
74 private Queue mq=null;
75 private Protocol handler=null;
76 private ProtocolObserver observer=null;
77 protected final Log log=LogFactory.getLog(this.getClass());
78
79
80
81 public DownHandler(Queue mq, Protocol handler, ProtocolObserver observer) {
82 this.mq=mq;
83 this.handler=handler;
84 this.observer=observer;
85 if(handler != null)
86 setName("DownHandler (" + handler.getName() + ')');
87 else
88 setName("DownHandler");
89 setDaemon(true);
90 }
91
92
93 public void setObserver(ProtocolObserver observer) {
94 this.observer=observer;
95 }
96
97
98 /** Removes events from mq and calls handler.down(evt) */
99 public void run() {
100 Event evt;
101 while(!mq.closed()) {
102 try {
103 evt=(Event)mq.remove();
104 if(evt == null) {
105 if(log.isWarnEnabled()) log.warn("removed null event");
106 continue;
107 }
108
109 if(observer != null) { // call debugger hook (if installed)
110 if(observer.down(evt, mq.size()) == false) { // false means discard event
111 continue;
112 }
113 }
114
115 int type=evt.getType();
116 if(type == Event.START || type == Event.STOP) {
117 if(handler.handleSpecialDownEvent(evt) == false)
118 continue;
119 }
120 handler.down(evt);
121 }
122 catch(QueueClosedException queue_closed) {
123 break;
124 }
125 catch(Throwable e) {
126 if(log.isWarnEnabled()) log.warn(getName() + " caught exception", e);
127 }
128 }
129 }
130
131 }
132
133
134 /**
135 * The Protocol class provides a set of common services for protocol layers. Each layer has to
136 * be a subclass of Protocol and override a number of methods (typically just <code>up()</code>,
137 * <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form
138 * a protocol stack. <a href=org.jgroups.Event.html>Events</a> are passed from lower
139 * layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom
140 * will be passed to its higher layer as an Event. That layer will in turn pass the Event to
141 * its layer and so on, until a layer handles the Message and sends a response or discards it,
142 * the former resulting in another Event being passed down the stack.<p>
143 * Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is
144 * received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed
145 * in the up-queue where it will be retrieved by the up-handler thread which will invoke method
146 * <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling
147 * of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol
148 * class, subclasses will almost never have to override this behavior.<p>
149 * The important thing to bear in mind is that Events have to passed on between layers in FIFO
150 * order which is guaranteed by the Protocol implementation and must be guranteed by subclasses
151 * implementing their on Event queuing.<p>
152 * <b>Note that each class implementing interface Protocol MUST provide an empty, public
153 * constructor !</b>
154 */
155 public abstract class Protocol {
156 protected final Properties props=new Properties();
157 protected Protocol up_prot=null, down_prot=null;
158 protected ProtocolStack stack=null;
159 protected final Queue up_queue=new Queue();
160 protected final Queue down_queue=new Queue();
161 protected UpHandler up_handler=null;
162 protected int up_thread_prio=-1;
163 protected DownHandler down_handler=null;
164 protected int down_thread_prio=-1;
165 protected ProtocolObserver observer=null; // hook for debugger
166 private final static long THREAD_JOIN_TIMEOUT=1000;
167 protected boolean down_thread=true; // determines whether the down_handler thread should be started
168 protected boolean up_thread=true; // determines whether the up_handler thread should be started
169 protected boolean stats=true; // determines whether to collect statistics (and expose them via JMX)
170 protected final Log log=LogFactory.getLog(this.getClass());
171 protected boolean trace=log.isTraceEnabled();
172 protected boolean warn=log.isWarnEnabled();
173
174
175 /**
176 * Configures the protocol initially. A configuration string consists of name=value
177 * items, separated by a ';' (semicolon), e.g.:<pre>
178 * "loopback=false;unicast_inport=4444"
179 * </pre>
180 */
181 public boolean setProperties(Properties props) {
182 if(props != null)
183 this.props.putAll(props);
184 return true;
185 }
186
187
188 /** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then
189 * calls setProperties(), which might invoke the setProperties() method of the actual protocol instance.
190 */
191 public boolean setPropertiesInternal(Properties props) {
192 String str;
193 this.props.putAll(props);
194
195 str=props.getProperty("down_thread");
196 if(str != null) {
197 down_thread=Boolean.valueOf(str).booleanValue();
198 props.remove("down_thread");
199 }
200
201 str=props.getProperty("down_thread_prio");
202 if(str != null) {
203 down_thread_prio=Integer.parseInt(str);
204 props.remove("down_thread_prio");
205 }
206
207 str=props.getProperty("up_thread");
208 if(str != null) {
209 up_thread=Boolean.valueOf(str).booleanValue();
210 props.remove("up_thread");
211 }
212
213 str=props.getProperty("up_thread_prio");
214 if(str != null) {
215 up_thread_prio=Integer.parseInt(str);
216 props.remove("up_thread_prio");
217 }
218
219 str=props.getProperty("stats");
220 if(str != null) {
221 stats=Boolean.valueOf(str).booleanValue();
222 props.remove("stats");
223 }
224
225 return setProperties(props);
226 }
227
228
229 public Properties getProperties() {
230 return props;
231 }
232
233
234 public boolean isTrace() {
235 return trace;
236 }
237
238 public void setTrace(boolean trace) {
239 this.trace=trace;
240 }
241
242 public boolean isWarn() {
243 return warn;
244 }
245
246 public void setWarn(boolean warn) {
247 this.warn=warn;
248 }
249
250 public boolean upThreadEnabled() {
251 return up_thread;
252 }
253
254 public boolean downThreadEnabled() {
255 return down_thread;
256 }
257
258 public boolean statsEnabled() {
259 return stats;
260 }
261
262 public void enableStats(boolean flag) {
263 stats=flag;
264 }
265
266 public void resetStats() {
267 ;
268 }
269
270 public String printStats() {
271 return null;
272 }
273
274 public Map dumpStats() {
275 return null;
276 }
277
278
279 public void setObserver(ProtocolObserver observer) {
280 this.observer=observer;
281 observer.setProtocol(this);
282 if(up_handler != null)
283 up_handler.setObserver(observer);
284 if(down_handler != null)
285 down_handler.setObserver(observer);
286 }
287
288 /**
289 * Called after instance has been created (null constructor) and before protocol is started.
290 * Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
291 * @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the
292 * ProtocolStack to fail, so the channel constructor will throw an exception
293 */
294 public void init() throws Exception {
295 }
296
297 /**
298 * This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work.
299 * Protocols are connected and queues are ready to receive events.
300 * Will be called <em>from bottom to top</em>. This call will replace
301 * the <b>START</b> and <b>START_OK</b> events.
302 * @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
303 * to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception
304 */
305 public void start() throws Exception {
306 }
307
308 /**
309 * This method is called on a {@link org.jgroups.Channel#disconnect()}. Stops work (e.g. by closing multicast socket).
310 * Will be called <em>from top to bottom</em>. This means that at the time of the method invocation the
311 * neighbor protocol below is still working. This method will replace the
312 * <b>STOP</b>, <b>STOP_OK</b>, <b>CLEANUP</b> and <b>CLEANUP_OK</b> events. The ProtocolStack guarantees that
313 * when this method is called all messages in the down queue will have been flushed
314 */
315 public void stop() {
316 }
317
318
319 /**
320 * This method is called on a {@link org.jgroups.Channel#close()}.
321 * Does some cleanup; after the call the VM will terminate
322 */
323 public void destroy() {
324 }
325
326
327 public Queue getUpQueue() {
328 return up_queue;
329 } // used by Debugger (ProtocolView)
330
331 public Queue getDownQueue() {
332 return down_queue;
333 } // used by Debugger (ProtocolView)
334
335
336 /** List of events that are required to be answered by some layer above.
337 @return Vector (of Integers) */
338 public Vector requiredUpServices() {
339 return null;
340 }
341
342 /** List of events that are required to be answered by some layer below.
343 @return Vector (of Integers) */
344 public Vector requiredDownServices() {
345 return null;
346 }
347
348 /** List of events that are provided to layers above (they will be handled when sent down from
349 above).
350 @return Vector (of Integers) */
351 public Vector providedUpServices() {
352 return null;
353 }
354
355 /** List of events that are provided to layers below (they will be handled when sent down from
356 below).
357 @return Vector (of Integers) */
358 public Vector providedDownServices() {
359 return null;
360 }
361
362
363 public abstract String getName(); // all protocol names have to be unique !
364
365 public Protocol getUpProtocol() {
366 return up_prot;
367 }
368
369 public Protocol getDownProtocol() {
370 return down_prot;
371 }
372
373 public void setUpProtocol(Protocol up_prot) {
374 this.up_prot=up_prot;
375 }
376
377 public void setDownProtocol(Protocol down_prot) {
378 this.down_prot=down_prot;
379 }
380
381 public void setProtocolStack(ProtocolStack stack) {
382 this.stack=stack;
383 }
384
385
386 /** Used internally. If overridden, call this method first. Only creates the up_handler thread
387 if down_thread is true */
388 public void startUpHandler() {
389 if(up_thread) {
390 if(up_handler == null) {
391 up_handler=new UpHandler(up_queue, this, observer);
392 if(up_thread_prio >= 0) {
393 try {
394 up_handler.setPriority(up_thread_prio);
395 }
396 catch(Throwable t) {
397 if(log.isErrorEnabled()) log.error("priority " + up_thread_prio +
398 " could not be set for thread", t);
399 }
400 }
401 up_handler.start();
402 }
403 }
404 }
405
406
407 /** Used internally. If overridden, call this method first. Only creates the down_handler thread
408 if down_thread is true */
409 public void startDownHandler() {
410 if(down_thread) {
411 if(down_handler == null) {
412 down_handler=new DownHandler(down_queue, this, observer);
413 if(down_thread_prio >= 0) {
414 try {
415 down_handler.setPriority(down_thread_prio);
416 }
417 catch(Throwable t) {
418 if(log.isErrorEnabled()) log.error("priority " + down_thread_prio +
419 " could not be set for thread", t);
420 }
421 }
422 down_handler.start();
423 }
424 }
425 }
426
427
428 /** Used internally. If overridden, call parent's method first */
429 public void stopInternal() {
430 up_queue.close(false); // this should terminate up_handler thread
431
432 if(up_handler != null && up_handler.isAlive()) {
433 try {
434 up_handler.join(THREAD_JOIN_TIMEOUT);
435 }
436 catch(Exception ex) {
437 }
438 if(up_handler != null && up_handler.isAlive()) {
439 up_handler.interrupt(); // still alive ? let's just kill it without mercy...
440 try {
441 up_handler.join(THREAD_JOIN_TIMEOUT);
442 }
443 catch(Exception ex) {
444 }
445 if(up_handler != null && up_handler.isAlive())
446 if(log.isErrorEnabled()) log.error("up_handler thread for " + getName() +
447 " was interrupted (in order to be terminated), but is still alive");
448 }
449 }
450 up_handler=null;
451
452 down_queue.close(false); // this should terminate down_handler thread
453 if(down_handler != null && down_handler.isAlive()) {
454 try {
455 down_handler.join(THREAD_JOIN_TIMEOUT);
456 }
457 catch(Exception ex) {
458 }
459 if(down_handler != null && down_handler.isAlive()) {
460 down_handler.interrupt(); // still alive ? let's just kill it without mercy...
461 try {
462 down_handler.join(THREAD_JOIN_TIMEOUT);
463 }
464 catch(Exception ex) {
465 }
466 if(down_handler != null && down_handler.isAlive())
467 if(log.isErrorEnabled()) log.error("down_handler thread for " + getName() +
468 " was interrupted (in order to be terminated), but is is still alive");
469 }
470 }
471 down_handler=null;
472 }
473
474
475 /**
476 * Internal method, should not be called by clients. Used by ProtocolStack. I would have
477 * used the 'friends' modifier, but this is available only in C++ ... If the up_handler thread
478 * is not available (down_thread == false), then directly call the up() method: we will run on the
479 * caller's thread (e.g. the protocol layer below us).
480 */
481 protected void receiveUpEvent(Event evt) {
482 if(up_handler == null) {
483 if(observer != null) { // call debugger hook (if installed)
484 if(observer.up(evt, up_queue.size()) == false) { // false means discard event
485 return;
486 }
487 }
488 up(evt);
489 return;
490 }
491 try {
492 up_queue.add(evt);
493 }
494 catch(Exception e) {
495 if(log.isWarnEnabled()) log.warn("exception: " + e);
496 }
497 }
498
499 /**
500 * Internal method, should not be called by clients. Used by ProtocolStack. I would have
501 * used the 'friends' modifier, but this is available only in C++ ... If the down_handler thread
502 * is not available (down_thread == false), then directly call the down() method: we will run on the
503 * caller's thread (e.g. the protocol layer above us).
504 */
505 protected void receiveDownEvent(Event evt) {
506 if(down_handler == null) {
507 if(observer != null) { // call debugger hook (if installed)
508 if(observer.down(evt, down_queue.size()) == false) { // false means discard event
509 return;
510 }
511 }
512 int type=evt.getType();
513 if(type == Event.START || type == Event.STOP) {
514 if(handleSpecialDownEvent(evt) == false)
515 return;
516 }
517 down(evt);
518 return;
519 }
520 try {
521 down_queue.add(evt);
522 }
523 catch(Exception e) {
524 if(log.isWarnEnabled()) log.warn("exception: " + e);
525 }
526 }
527
528 /**
529 * Causes the event to be forwarded to the next layer up in the hierarchy. Typically called
530 * by the implementation of <code>Up</code> (when done).
531 */
532 public void passUp(Event evt) {
533 if(observer != null) { // call debugger hook (if installed)
534 if(observer.passUp(evt) == false) { // false means don't pass up (=discard) event
535 return;
536 }
537 }
538 up_prot.receiveUpEvent(evt);
539 }
540
541 /**
542 * Causes the event to be forwarded to the next layer down in the hierarchy.Typically called
543 * by the implementation of <code>Down</code> (when done).
544 */
545 public void passDown(Event evt) {
546 if(observer != null) { // call debugger hook (if installed)
547 if(observer.passDown(evt) == false) { // false means don't pass down (=discard) event
548 return;
549 }
550 }
551 down_prot.receiveDownEvent(evt);
552 }
553
554
555 /**
556 * An event was received from the layer below. Usually the current layer will want to examine
557 * the event type and - depending on its type - perform some computation
558 * (e.g. removing headers from a MSG event type, or updating the internal membership list
559 * when receiving a VIEW_CHANGE event).
560 * Finally the event is either a) discarded, or b) an event is sent down
561 * the stack using <code>passDown()</code> or c) the event (or another event) is sent up
562 * the stack using <code>passUp()</code>.
563 */
564 public void up(Event evt) {
565 passUp(evt);
566 }
567
568 /**
569 * An event is to be sent down the stack. The layer may want to examine its type and perform
570 * some action on it, depending on the event's type. If the event is a message MSG, then
571 * the layer may need to add a header to it (or do nothing at all) before sending it down
572 * the stack using <code>passDown()</code>. In case of a GET_ADDRESS event (which tries to
573 * retrieve the stack's address from one of the bottom layers), the layer may need to send
574 * a new response event back up the stack using <code>passUp()</code>.
575 */
576 public void down(Event evt) {
577 passDown(evt);
578 }
579
580
581 /** These are special internal events that should not be handled by protocols
582 * @return boolean True: the event should be passed further down the stack. False: the event should
583 * be discarded (not passed down the stack)
584 */
585 protected boolean handleSpecialDownEvent(Event evt) {
586 switch(evt.getType()) {
587 case Event.START:
588 try {
589 start();
590
591 // if we're the transport protocol, reply with a START_OK up the stack
592 if(down_prot == null) {
593 passUp(new Event(Event.START_OK, Boolean.TRUE));
594 return false; // don't pass down the stack
595 }
596 else
597 return true; // pass down the stack
598 }
599 catch(Exception e) {
600 passUp(new Event(Event.START_OK, new Exception("exception caused by " + getName() + ".start(): " + e)));
601 return false;
602 }
603 case Event.STOP:
604 stop();
605 if(down_prot == null) {
606 passUp(new Event(Event.STOP_OK, Boolean.TRUE));
607 return false; // don't pass down the stack
608 }
609 else
610 return true; // pass down the stack
611 default:
612 return true; // pass down by default
613 }
614 }
615 }