Source code: org/jgroups/JChannel.java
1 // $Id: JChannel.java,v 1.44 2005/11/08 13:57:08 belaban Exp $
2
3 package org.jgroups;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.conf.ConfiguratorFactory;
8 import org.jgroups.conf.ProtocolStackConfigurator;
9 import org.jgroups.stack.ProtocolStack;
10 import org.jgroups.stack.StateTransferInfo;
11 import org.jgroups.util.*;
12 import org.w3c.dom.Element;
13
14 import java.io.File;
15 import java.io.Serializable;
16 import java.net.URL;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.Vector;
20
21 /**
22 * JChannel is a pure Java implementation of Channel.
23 * When a JChannel object is instantiated it automatically sets up the
24 * protocol stack.
25 * <p>
26 * <B>Properties</B>
27 * <P>
28 * Properties are used to configure a channel, and are accepted in
29 * several forms; the String form is described here.
30 * A property string consists of a number of properties separated by
31 * colons. For example:
32 * <p>
33 * <pre>"<prop1>(arg1=val1):<prop2>(arg1=val1;arg2=val2):<prop3>:<propn>"</pre>
34 * <p>
35 * Each property relates directly to a protocol layer, which is
36 * implemented as a Java class. When a protocol stack is to be created
37 * based on the above property string, the first property becomes the
38 * bottom-most layer, the second one will be placed on the first, etc.:
39 * the stack is created from the bottom to the top, as the string is
40 * parsed from left to right. Each property has to be the name of a
41 * Java class that resides in the
42 * {@link org.jgroups.protocols} package.
43 * <p>
44 * Note that only the base name has to be given, not the fully specified
45 * class name (e.g., UDP instead of org.jgroups.protocols.UDP).
46 * <p>
47 * Each layer may have 0 or more arguments, which are specified as a
48 * list of name/value pairs in parentheses directly after the property.
49 * In the example above, the first protocol layer has 1 argument,
50 * the second 2, the third none. When a layer is created, these
51 * properties (if there are any) will be set in a layer by invoking
52 * the layer's setProperties() method
53 * <p>
54 * As an example the property string below instructs JGroups to create
55 * a JChannel with protocols UDP, PING, FD and GMS:<p>
56 * <pre>"UDP(mcast_addr=228.10.9.8;mcast_port=5678):PING:FD:GMS"</pre>
57 * <p>
58 * The UDP protocol layer is at the bottom of the stack, and it
59 * should use mcast address 228.10.9.8. and port 5678 rather than
60 * the default IP multicast address and port. The only other argument
61 * instructs FD to output debug information while executing.
62 * Property UDP refers to a class {@link org.jgroups.protocols.UDP},
63 * which is subsequently loaded and an instance of which is created as protocol layer.
64 * If any of these classes are not found, an exception will be thrown and
65 * the construction of the stack will be aborted.
66 *
67 * @author Bela Ban
68 * @author Filip Hanik
69 * @version $Revision: 1.44 $
70 */
71 public class JChannel extends Channel {
72
73 /**
74 * The default protocol stack used by the default constructor.
75 */
76 public static final String DEFAULT_PROTOCOL_STACK=
77 "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +
78 "PING(timeout=3000;num_initial_members=6):" +
79 "FD(timeout=3000):" +
80 "VERIFY_SUSPECT(timeout=1500):" +
81 "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" +
82 "UNICAST(timeout=600,1200,2400,4800):" +
83 "pbcast.STABLE(desired_avg_gossip=10000):" +
84 "FRAG:" +
85 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
86 "shun=true;print_local_addr=true)";
87
88 static final String FORCE_PROPS="force.properties";
89
90 /* the protocol stack configuration string */
91 private String props=null;
92
93 /*the address of this JChannel instance*/
94 private Address local_addr=null;
95 /*the channel (also know as group) name*/
96 private String channel_name=null; // group name
97 /*the latest view of the group membership*/
98 private View my_view=null;
99 /*the queue that is used to receive messages (events) from the protocol stack*/
100 private final Queue mq=new Queue();
101 /*the protocol stack, used to send and receive messages from the protocol stack*/
102 private ProtocolStack prot_stack=null;
103
104 /** Thread responsible for closing a channel and potentially reconnecting to it (e.g., when shunned). */
105 protected CloserThread closer=null;
106
107 /** To wait until a local address has been assigned */
108 private final Promise local_addr_promise=new Promise();
109
110 /** To wait until we have connected successfully */
111 private final Promise connect_promise=new Promise();
112
113 /** To wait until we have been disconnected from the channel */
114 private final Promise disconnect_promise=new Promise();
115
116 private final Promise state_promise=new Promise();
117
118 private final Object suspend_mutex=new Object();
119 private boolean suspended=false;
120
121 /** wait until we have a non-null local_addr */
122 private long LOCAL_ADDR_TIMEOUT=30000; //=Long.parseLong(System.getProperty("local_addr.timeout", "30000"));
123 /*if the states is fetched automatically, this is the default timeout, 5 secs*/
124 private static final long GET_STATE_DEFAULT_TIMEOUT=5000;
125 /*flag to indicate whether to receive views from the protocol stack*/
126 private boolean receive_views=true;
127 /*flag to indicate whether to receive suspect messages*/
128 private boolean receive_suspects=true;
129 /*flag to indicate whether to receive blocks, if this is set to true, receive_views is set to true*/
130 private boolean receive_blocks=false;
131 /*flag to indicate whether to receive local messages
132 *if this is set to false, the JChannel will not receive messages sent by itself*/
133 private boolean receive_local_msgs=true;
134 /*flag to indicate whether to receive a state message or not*/
135 private boolean receive_get_states=false;
136 /*flag to indicate whether the channel will reconnect (reopen) when the exit message is received*/
137 private boolean auto_reconnect=false;
138 /*flag t indicate whether the state is supposed to be retrieved after the channel is reconnected
139 *setting this to true, automatically forces auto_reconnect to true*/
140 private boolean auto_getstate=false;
141 /*channel connected flag*/
142 private boolean connected=false;
143
144 /** block send()/down() if true (unlocked by UNBLOCK_SEND event) */
145 private final CondVar block_sending=new CondVar("block_sending", Boolean.FALSE);
146
147 /*channel closed flag*/
148 private boolean closed=false; // close() has been called, channel is unusable
149
150 /** True if a state transfer protocol is available, false otherwise */
151 private boolean state_transfer_supported=false; // set by CONFIG event from STATE_TRANSFER protocol
152
153 /** Used to maintain additional data across channel disconnects/reconnects. This is a kludge and will be remove
154 * as soon as JGroups supports logical addresses
155 */
156 private byte[] additional_data=null;
157
158 protected final Log log=LogFactory.getLog(getClass());
159
160 /** Collect statistics */
161 protected boolean stats=true;
162
163 protected long sent_msgs=0, received_msgs=0, sent_bytes=0, received_bytes=0;
164
165
166
167 /**
168 * Constructs a <code>JChannel</code> instance with the protocol stack
169 * specified by the <code>DEFAULT_PROTOCOL_STACK</code> member.
170 *
171 * @throws ChannelException if problems occur during the initialization of
172 * the protocol stack.
173 */
174 public JChannel() throws ChannelException {
175 this(DEFAULT_PROTOCOL_STACK);
176 }
177
178 /**
179 * Constructs a <code>JChannel</code> instance with the protocol stack
180 * configuration contained by the specified file.
181 *
182 * @param properties a file containing a JGroups XML protocol stack
183 * configuration.
184 *
185 * @throws ChannelException if problems occur during the configuration or
186 * initialization of the protocol stack.
187 */
188 public JChannel(File properties) throws ChannelException {
189 this(ConfiguratorFactory.getStackConfigurator(properties));
190 }
191
192 /**
193 * Constructs a <code>JChannel</code> instance with the protocol stack
194 * configuration contained by the specified XML element.
195 *
196 * @param properties a XML element containing a JGroups XML protocol stack
197 * configuration.
198 *
199 * @throws ChannelException if problems occur during the configuration or
200 * initialization of the protocol stack.
201 */
202 public JChannel(Element properties) throws ChannelException {
203 this(ConfiguratorFactory.getStackConfigurator(properties));
204 }
205
206 /**
207 * Constructs a <code>JChannel</code> instance with the protocol stack
208 * configuration indicated by the specified URL.
209 *
210 * @param properties a URL pointing to a JGroups XML protocol stack
211 * configuration.
212 *
213 * @throws ChannelException if problems occur during the configuration or
214 * initialization of the protocol stack.
215 */
216 public JChannel(URL properties) throws ChannelException {
217 this(ConfiguratorFactory.getStackConfigurator(properties));
218 }
219
220 /**
221 * Constructs a <code>JChannel</code> instance with the protocol stack
222 * configuration based upon the specified properties parameter.
223 *
224 * @param properties an old style property string, a string representing a
225 * system resource containing a JGroups XML configuration,
226 * a string representing a URL pointing to a JGroups XML
227 * XML configuration, or a string representing a file name
228 * that contains a JGroups XML configuration.
229 *
230 * @throws ChannelException if problems occur during the configuration and
231 * initialization of the protocol stack.
232 */
233 public JChannel(String properties) throws ChannelException {
234 this(ConfiguratorFactory.getStackConfigurator(properties));
235 }
236
237 /**
238 * Constructs a <code>JChannel</code> instance with the protocol stack
239 * configuration contained by the protocol stack configurator parameter.
240 * <p>
241 * All of the public constructors of this class eventually delegate to this
242 * method.
243 *
244 * @param configurator a protocol stack configurator containing a JGroups
245 * protocol stack configuration.
246 *
247 * @throws ChannelException if problems occur during the initialization of
248 * the protocol stack.
249 */
250 protected JChannel(ProtocolStackConfigurator configurator) throws ChannelException {
251 props = configurator.getProtocolStackString();
252
253 /*create the new protocol stack*/
254 prot_stack=new ProtocolStack(this, props);
255
256 /* Setup protocol stack (create layers, queues between them */
257 try {
258 prot_stack.setup();
259 }
260 catch(Throwable e) {
261 throw new ChannelException("unable to setup the protocol stack", e);
262 }
263 }
264
265 /**
266 * Creates a new JChannel with the protocol stack as defined in the properties
267 * parameter. an example of this parameter is<BR>
268 * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"<BR>
269 * Other examples can be found in the ./conf directory<BR>
270 * @param properties the protocol stack setup; if null, the default protocol stack will be used.
271 * The properties can also be a java.net.URL object or a string that is a URL spec.
272 * The JChannel will validate any URL object and String object to see if they are a URL.
273 * In case of the parameter being a url, the JChannel will try to load the xml from there.
274 * In case properties is a org.w3c.dom.Element, the ConfiguratorFactory will parse the
275 * DOM tree with the element as its root element.
276 * @deprecated Use the constructors with specific parameter types instead.
277 */
278 public JChannel(Object properties) throws ChannelException {
279 if (properties == null) {
280 properties = DEFAULT_PROTOCOL_STACK;
281 }
282
283 try {
284 ProtocolStackConfigurator c=ConfiguratorFactory.getStackConfigurator(properties);
285 props=c.getProtocolStackString();
286 }
287 catch(Exception x) {
288 throw new ChannelException("unable to load protocol stack", x);
289 }
290
291 /*create the new protocol stack*/
292 prot_stack=new ProtocolStack(this, props);
293
294 /* Setup protocol stack (create layers, queues between them */
295 try {
296 prot_stack.setup();
297 }
298 catch(Throwable e) {
299 throw new ChannelException("failed to setup protocol stack", e);
300 }
301 }
302
303
304 /**
305 * Returns the protocol stack.
306 * Currently used by Debugger.
307 * Specific to JChannel, therefore
308 * not visible in Channel
309 */
310 public ProtocolStack getProtocolStack() {
311 return prot_stack;
312 }
313
314 protected Log getLog() {
315 return log;
316 }
317
318 /**
319 * returns the protocol stack configuration in string format.
320 * an example of this property is<BR>
321 * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"
322 */
323 public String getProperties() {
324 return props;
325 }
326
327 public boolean statsEnabled() {
328 return stats;
329 }
330
331 public void enableStats(boolean stats) {
332 this.stats=stats;
333 }
334
335 public void resetStats() {
336 sent_msgs=received_msgs=sent_bytes=received_bytes=0;
337 }
338
339 public long getSentMessages() {return sent_msgs;}
340 public long getSentBytes() {return sent_bytes;}
341 public long getReceivedMessages() {return received_msgs;}
342 public long getReceivedBytes() {return received_bytes;}
343 public int getNumberOfTasksInTimer() {return prot_stack != null ? prot_stack.timer.size() : -1;}
344
345 public String dumpTimerQueue() {
346 return prot_stack != null? prot_stack.dumpTimerQueue() : "<n/a";
347 }
348
349 /**
350 * Returns a pretty-printed form of all the protocols. If include_properties is set,
351 * the properties for each protocol will also be printed.
352 */
353 public String printProtocolSpec(boolean include_properties) {
354 return prot_stack != null ? prot_stack.printProtocolSpec(include_properties) : null;
355 }
356
357
358 /**
359 * Connects the channel to a group.
360 * If the channel is already connected, an error message will be printed to the error log.
361 * If the channel is closed a ChannelClosed exception will be thrown.
362 * This method starts the protocol stack by calling ProtocolStack.start,
363 * then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event.
364 * Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified
365 * and the channel is considered connected.
366 *
367 * @param channel_name A <code>String</code> denoting the group name. Cannot be null.
368 * @exception ChannelException The protocol stack cannot be started
369 * @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
370 * A new channel has to be created first.
371 */
372 public synchronized void connect(String channel_name) throws ChannelException, ChannelClosedException {
373 /*make sure the channel is not closed*/
374 checkClosed();
375
376 /*if we already are connected, then ignore this*/
377 if(connected) {
378 if(log.isErrorEnabled()) log.error("already connected to " + channel_name);
379 return;
380 }
381
382 /*make sure we have a valid channel name*/
383 if(channel_name == null) {
384 if(log.isInfoEnabled()) log.info("channel_name is null, assuming unicast channel");
385 }
386 else
387 this.channel_name=channel_name;
388
389 try {
390 prot_stack.startStack(); // calls start() in all protocols, from top to bottom
391 }
392 catch(Throwable e) {
393 throw new ChannelException("failed to start protocol stack", e);
394 }
395
396 /* try to get LOCAL_ADDR_TIMEOUT. Catch SecurityException if called in an untrusted environment (e.g. using JNLP) */
397 try {
398 LOCAL_ADDR_TIMEOUT=Long.parseLong(System.getProperty("local_addr.timeout","30000"));
399 }
400 catch (SecurityException e1) {
401 /* Use the default value specified above*/
402 }
403
404 /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
405 local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT);
406 if(local_addr == null) {
407 log.fatal("local_addr is null; cannot connect");
408 throw new ChannelException("local_addr is null");
409 }
410
411
412 /*create a temporary view, assume this channel is the only member and
413 *is the coordinator*/
414 Vector t=new Vector(1);
415 t.addElement(local_addr);
416 my_view=new View(local_addr, 0, t); // create a dummy view
417
418 // only connect if we are not a unicast channel
419 if(channel_name != null) {
420 connect_promise.reset();
421 Event connect_event=new Event(Event.CONNECT, channel_name);
422 down(connect_event);
423 connect_promise.getResult(); // waits forever until connected (or channel is closed)
424 }
425
426 /*notify any channel listeners*/
427 connected=true;
428 notifyChannelConnected(this);
429 }
430
431
432 /**
433 * Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>
434 * Otherwise the following actions happen in the listed order<BR>
435 * <ol>
436 * <li> The JChannel sends a DISCONNECT event down the protocol stack<BR>
437 * <li> Blocks until the channel to receives a DISCONNECT_OK event<BR>
438 * <li> Sends a STOP_QUEING event down the stack<BR>
439 * <li> Stops the protocol stack by calling ProtocolStack.stop()<BR>
440 * <li> Notifies the listener, if the listener is available<BR>
441 * </ol>
442 */
443 public synchronized void disconnect() {
444 if(closed) return;
445
446 resume();
447
448 if(connected) {
449
450 if(channel_name != null) {
451
452 /* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a
453 * DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a
454 * DISCONNECT_OK has been received, or until timeout has elapsed.
455 */
456 Event disconnect_event=new Event(Event.DISCONNECT, local_addr);
457 disconnect_promise.reset();
458 down(disconnect_event); // DISCONNECT is handled by each layer
459 disconnect_promise.getResult(); // wait for DISCONNECT_OK
460 }
461
462 // Just in case we use the QUEUE protocol and it is still blocked...
463 down(new Event(Event.STOP_QUEUEING));
464
465 connected=false;
466 try {
467 prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom
468 }
469 catch(Exception e) {
470 if(log.isErrorEnabled()) log.error("exception: " + e);
471 }
472 notifyChannelDisconnected(this);
473 init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
474 }
475 }
476
477
478 /**
479 * Destroys the channel.
480 * After this method has been called, the channel us unusable.<BR>
481 * This operation will disconnect the channel and close the channel receive queue immediately<BR>
482 */
483 public synchronized void close() {
484 _close(true, true); // by default disconnect before closing channel and close mq
485 }
486
487
488 /** Shuts down the channel without disconnecting */
489 public synchronized void shutdown() {
490 _close(false, true); // by default disconnect before closing channel and close mq
491 }
492
493 /**
494 * Opens the channel.
495 * This does the following actions:
496 * <ol>
497 * <li> Resets the receiver queue by calling Queue.reset
498 * <li> Sets up the protocol stack by calling ProtocolStack.setup
499 * <li> Sets the closed flag to false
500 * </ol>
501 */
502 public synchronized void open() throws ChannelException {
503 if(!closed)
504 throw new ChannelException("channel is already open");
505
506 try {
507 mq.reset();
508
509 // new stack is created on open() - bela June 12 2003
510 prot_stack=new ProtocolStack(this, props);
511 prot_stack.setup();
512 closed=false;
513 }
514 catch(Exception e) {
515 throw new ChannelException("failed to open channel" , e);
516 }
517 }
518
519 /**
520 * returns true if the Open operation has been called successfully
521 */
522 public boolean isOpen() {
523 return !closed;
524 }
525
526
527 /**
528 * returns true if the Connect operation has been called successfully
529 */
530 public boolean isConnected() {
531 return connected;
532 }
533
534 public int getNumMessages() {
535 return mq != null? mq.size() : -1;
536 }
537
538
539 public String dumpQueue() {
540 return Util.dumpQueue(mq);
541 }
542
543 /**
544 * Returns a map of statistics of the various protocols and of the channel itself.
545 * @return Map<String,Map>. A map where the keys are the protocols ("channel" pseudo key is
546 * used for the channel itself") and the values are property maps.
547 */
548 public Map dumpStats() {
549 Map retval=prot_stack.dumpStats();
550 if(retval != null) {
551 Map tmp=dumpChannelStats();
552 if(tmp != null)
553 retval.put("channel", tmp);
554 }
555 return retval;
556 }
557
558 private Map dumpChannelStats() {
559 Map retval=new HashMap();
560 retval.put("sent_msgs", new Long(sent_msgs));
561 retval.put("sent_bytes", new Long(sent_bytes));
562 retval.put("received_msgs", new Long(received_msgs));
563 retval.put("received_bytes", new Long(received_bytes));
564 return retval;
565 }
566
567
568 /**
569 * Sends a message through the protocol stack.
570 * Implements the Transport interface.
571 *
572 * @param msg the message to be sent through the protocol stack,
573 * the destination of the message is specified inside the message itself
574 * @exception ChannelNotConnectedException
575 * @exception ChannelClosedException
576 */
577 public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {
578 checkClosed();
579 checkNotConnected();
580 if(stats) {
581 sent_msgs++;
582 sent_bytes+=msg.getLength();
583 }
584 down(new Event(Event.MSG, msg));
585 }
586
587
588 /**
589 * creates a new message with the destination address, and the source address
590 * and the object as the message value
591 * @param dst - the destination address of the message, null for all members
592 * @param src - the source address of the message
593 * @param obj - the value of the message
594 * @exception ChannelNotConnectedException
595 * @exception ChannelClosedException
596 * @see JChannel#send
597 */
598 public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException {
599 send(new Message(dst, src, obj));
600 }
601
602
603 /**
604 * Blocking receive method.
605 * This method returns the object that was first received by this JChannel and that has not been
606 * received before. After the object is received, it is removed from the receive queue.<BR>
607 * If you only want to inspect the object received without removing it from the queue call
608 * JChannel.peek<BR>
609 * If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR>
610 * By specifying a timeout of 0, the operation blocks forever, or until a message has been received.
611 * @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever
612 * @exception TimeoutException if a timeout occured prior to a new message was received
613 * @exception ChannelNotConnectedException
614 * @exception ChannelClosedException
615 * @see JChannel#peek
616 */
617 public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
618 Object retval=null;
619 Event evt;
620
621 checkClosed();
622 checkNotConnected();
623
624 try {
625 evt=(timeout <= 0) ? (Event)mq.remove() : (Event)mq.remove(timeout);
626 retval=getEvent(evt);
627 evt=null;
628 if(stats) {
629 if(retval != null && retval instanceof Message) {
630 received_msgs++;
631 received_bytes+=((Message)retval).getLength();
632 }
633 }
634 return retval;
635 }
636 catch(QueueClosedException queue_closed) {
637 throw new ChannelClosedException();
638 }
639 catch(TimeoutException t) {
640 throw t;
641 }
642 catch(Exception e) {
643 if(log.isErrorEnabled()) log.error("exception: " + e);
644 return null;
645 }
646 }
647
648
649 /**
650 * Just peeks at the next message, view or block. Does <em>not</em> install
651 * new view if view is received<BR>
652 * Does the same thing as JChannel.receive but doesn't remove the object from the
653 * receiver queue
654 */
655 public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
656 Object retval=null;
657 Event evt;
658
659 checkClosed();
660 checkNotConnected();
661
662 try {
663 evt=(timeout <= 0) ? (Event)mq.peek() : (Event)mq.peek(timeout);
664 retval=getEvent(evt);
665 evt=null;
666 return retval;
667 }
668 catch(QueueClosedException queue_closed) {
669 if(log.isErrorEnabled()) log.error("exception: " + queue_closed);
670 return null;
671 }
672 catch(TimeoutException t) {
673 return null;
674 }
675 catch(Exception e) {
676 if(log.isErrorEnabled()) log.error("exception: " + e);
677 return null;
678 }
679 }
680
681
682
683
684 /**
685 * Returns the current view.
686 * <BR>
687 * If the channel is not connected or if it is closed it will return null.
688 * <BR>
689 * @return returns the current group view, or null if the channel is closed or disconnected
690 */
691 public View getView() {
692 return closed || !connected ? null : my_view;
693 }
694
695
696 /**
697 * returns the local address of the channel
698 * returns null if the channel is closed
699 */
700 public Address getLocalAddress() {
701 return closed ? null : local_addr;
702 }
703
704
705 /**
706 * returns the name of the channel
707 * if the channel is not connected or if it is closed it will return null
708 */
709 public String getChannelName() {
710 return closed ? null : !connected ? null : channel_name;
711 }
712
713
714 /**
715 * Sets a channel option. The options can be one of the following:
716 * <UL>
717 * <LI> Channel.BLOCK
718 * <LI> Channel.VIEW
719 * <LI> Channel.SUSPECT
720 * <LI> Channel.LOCAL
721 * <LI> Channel.GET_STATE_EVENTS
722 * <LI> Channel.AUTO_RECONNECT
723 * <LI> Channel.AUTO_GETSTATE
724 * </UL>
725 * <P>
726 * There are certain dependencies between the options that you can set,
727 * I will try to describe them here.
728 * <P>
729 * Option: Channel.VIEW option<BR>
730 * Value: java.lang.Boolean<BR>
731 * Result: set to true the JChannel will receive VIEW change events<BR>
732 *<BR>
733 * Option: Channel.SUSPECT<BR>
734 * Value: java.lang.Boolean<BR>
735 * Result: set to true the JChannel will receive SUSPECT events<BR>
736 *<BR>
737 * Option: Channel.BLOCK<BR>
738 * Value: java.lang.Boolean<BR>
739 * Result: set to true will set setOpt(VIEW, true) and the JChannel will receive BLOCKS and VIEW events<BR>
740 *<BR>
741 * Option: GET_STATE_EVENTS<BR>
742 * Value: java.lang.Boolean<BR>
743 * Result: set to true the JChannel will receive state events<BR>
744 *<BR>
745 * Option: LOCAL<BR>
746 * Value: java.lang.Boolean<BR>
747 * Result: set to true the JChannel will receive messages that it self sent out.<BR>
748 *<BR>
749 * Option: AUTO_RECONNECT<BR>
750 * Value: java.lang.Boolean<BR>
751 * Result: set to true and the JChannel will try to reconnect when it is being closed<BR>
752 *<BR>
753 * Option: AUTO_GETSTATE<BR>
754 * Value: java.lang.Boolean<BR>
755 * Result: set to true, the AUTO_RECONNECT will be set to true and the JChannel will try to get the state after a close and reconnect happens<BR>
756 * <BR>
757 *
758 * @param option the parameter option Channel.VIEW, Channel.SUSPECT, etc
759 * @param value the value to set for this option
760 *
761 */
762 public void setOpt(int option, Object value) {
763 if(closed) {
764 if(log.isWarnEnabled()) log.warn("channel is closed; option not set !");
765 return;
766 }
767
768 switch(option) {
769 case VIEW:
770 if(value instanceof Boolean)
771 receive_views=((Boolean)value).booleanValue();
772 else
773 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
774 " (" + value + "): value has to be Boolean");
775 break;
776 case SUSPECT:
777 if(value instanceof Boolean)
778 receive_suspects=((Boolean)value).booleanValue();
779 else
780 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
781 " (" + value + "): value has to be Boolean");
782 break;
783 case BLOCK:
784 if(value instanceof Boolean)
785 receive_blocks=((Boolean)value).booleanValue();
786 else
787 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
788 " (" + value + "): value has to be Boolean");
789 if(receive_blocks)
790 receive_views=true;
791 break;
792
793 case GET_STATE_EVENTS:
794 if(value instanceof Boolean)
795 receive_get_states=((Boolean)value).booleanValue();
796 else
797 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
798 " (" + value + "): value has to be Boolean");
799 break;
800
801
802 case LOCAL:
803 if(value instanceof Boolean)
804 receive_local_msgs=((Boolean)value).booleanValue();
805 else
806 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
807 " (" + value + "): value has to be Boolean");
808 break;
809
810 case AUTO_RECONNECT:
811 if(value instanceof Boolean)
812 auto_reconnect=((Boolean)value).booleanValue();
813 else
814 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
815 " (" + value + "): value has to be Boolean");
816 break;
817
818 case AUTO_GETSTATE:
819 if(value instanceof Boolean) {
820 auto_getstate=((Boolean)value).booleanValue();
821 if(auto_getstate)
822 auto_reconnect=true;
823 }
824 else
825 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
826 " (" + value + "): value has to be Boolean");
827 break;
828
829 default:
830 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) + " not known");
831 break;
832 }
833 }
834
835
836 /**
837 * returns the value of an option.
838 * @param option the option you want to see the value for
839 * @return the object value, in most cases java.lang.Boolean
840 * @see JChannel#setOpt
841 */
842 public Object getOpt(int option) {
843 switch(option) {
844 case VIEW:
845 // return Boolean.valueOf(receive_views);
846 return receive_views ? Boolean.TRUE : Boolean.FALSE;
847 case BLOCK:
848 // return Boolean.valueOf(receive_blocks);
849 return receive_blocks ? Boolean.TRUE : Boolean.FALSE;
850 case SUSPECT:
851 // return Boolean.valueOf(receive_suspects);
852 return receive_suspects ? Boolean.TRUE : Boolean.FALSE;
853 case GET_STATE_EVENTS:
854 // return Boolean.valueOf(receive_get_states);
855 return receive_get_states ? Boolean.TRUE : Boolean.FALSE;
856 case LOCAL:
857 // return Boolean.valueOf(receive_local_msgs);
858 return receive_local_msgs ? Boolean.TRUE : Boolean.FALSE;
859 default:
860 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) + " not known");
861 return null;
862 }
863 }
864
865
866 /**
867 * Called to acknowledge a block() (callback in <code>MembershipListener</code> or
868 * <code>BlockEvent</code> received from call to <code>receive()</code>).
869 * After sending blockOk(), no messages should be sent until a new view has been received.
870 * Calling this method on a closed channel has no effect.
871 */
872 public void blockOk() {
873 down(new Event(Event.BLOCK_OK));
874 down(new Event(Event.START_QUEUEING));
875 }
876
877
878 /**
879 * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
880 * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
881 * milliseconds have elapsed. The argument of GET_STATE_OK should be a single object.
882 * @param target - the target member to receive the state from. if null, state is retrieved from coordinator
883 * @param timeout - the number of milliseconds to wait for the operation to complete successfully
884 * @return true of the state was received, false if the operation timed out
885 */
886 public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
887 StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_SINGLE, target);
888 info.timeout=timeout;
889 return _getState(new Event(Event.GET_STATE, info), timeout);
890 }
891
892
893 /**
894 * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
895 * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
896 * milliseconds have elapsed. The argument of GET_STATE_OK should be a vector of objects.
897 * @param targets - the target members to receive the state from ( an Address list )
898 * @param timeout - the number of milliseconds to wait for the operation to complete successfully
899 * @return true of the state was received, false if the operation timed out
900 */
901 public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
902 StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_MANY, targets);
903 return _getState(new Event(Event.GET_STATE, info), timeout);
904 }
905
906
907 /**
908 * Called by the application is response to receiving a <code>getState()</code> object when
909 * calling <code>receive()</code>.
910 * When the application receives a getState() message on the receive() method,
911 * it should call returnState() to reply with the state of the application
912 * @param state The state of the application as a byte buffer
913 * (to send over the network).
914 */
915 public void returnState(byte[] state) {
916 down(new Event(Event.GET_APPLSTATE_OK, state));
917 }
918
919
920
921
922
923 /**
924 * Callback method <BR>
925 * Called by the ProtocolStack when a message is received.
926 * It will be added to the message queue from which subsequent
927 * <code>Receive</code>s will dequeue it.
928 * @param evt the event carrying the message from the protocol stack
929 */
930 public void up(Event evt) {
931 int type=evt.getType();
932 Message msg;
933
934
935 switch(type) {
936
937 case Event.MSG:
938 msg=(Message)evt.getArg();
939 if(!receive_local_msgs) { // discard local messages (sent by myself to me)
940 if(local_addr != null && msg.getSrc() != null)
941 if(local_addr.equals(msg.getSrc()))
942 return;
943 }
944 break;
945
946 case Event.VIEW_CHANGE:
947 my_view=(View)evt.getArg();
948
949 // crude solution to bug #775120: if we get our first view *before* the CONNECT_OK,
950 // we simply set the state to connected
951 if(connected == false) {
952 connected=true;
953 connect_promise.setResult(Boolean.TRUE);
954 }
955
956 // unblock queueing of messages due to previous BLOCK event:
957 down(new Event(Event.STOP_QUEUEING));
958 if(!receive_views) // discard if client has not set receving views to on
959 return;
960 //if(connected == false)
961 // my_view=(View)evt.getArg();
962 break;
963
964 case Event.SUSPECT:
965 if(!receive_suspects)
966 return;
967 break;
968
969 case Event.GET_APPLSTATE: // return the application's state
970 if(!receive_get_states) { // if not set to handle state transfers, send null state
971 down(new Event(Event.GET_APPLSTATE_OK, null));
972 return;
973 }
974 break;
975
976 case Event.CONFIG:
977 HashMap config=(HashMap)evt.getArg();
978 if(config != null && config.containsKey("state_transfer"))
979 state_transfer_supported=((Boolean)config.get("state_transfer")).booleanValue();
980 break;
981
982 case Event.BLOCK:
983 // If BLOCK is received by application, then we trust the application to not send
984 // any more messages until a VIEW_CHANGE is received. Otherwise (BLOCKs are disabled),
985 // we queue any messages sent until the next VIEW_CHANGE (they will be sent in the
986 // next view)
987
988 if(!receive_blocks) { // discard if client has not set 'receiving blocks' to 'on'
989 down(new Event(Event.BLOCK_OK));
990 down(new Event(Event.START_QUEUEING));
991 return;
992 }
993 break;
994
995 case Event.CONNECT_OK:
996 connect_promise.setResult(Boolean.TRUE);
997 break;
998
999 case Event.DISCONNECT_OK:
1000 disconnect_promise.setResult(Boolean.TRUE);
1001 break;
1002
1003 case Event.GET_STATE_OK:
1004 Object state=evt.getArg();
1005 state_promise.setResult(state);
1006 if(up_handler != null) {
1007 up_handler.up(evt);
1008 return;
1009 }
1010 if(state != null) {
1011 if(receiver != null) {
1012 receiver.setState((byte[])state);
1013 }
1014 else {
1015 try {mq.add(new Event(Event.STATE_RECEIVED, state));} catch(Exception e) {}
1016 }
1017 }
1018 break;
1019
1020 case Event.SET_LOCAL_ADDRESS:
1021 local_addr_promise.setResult(evt.getArg());
1022 break;
1023
1024 case Event.EXIT:
1025 handleExit(evt);
1026 return; // no need to pass event up; already done in handleExit()
1027
1028 case Event.BLOCK_SEND: // emitted by FLOW_CONTROL
1029 if(log.isInfoEnabled()) log.info("received BLOCK_SEND");
1030 block_sending.set(Boolean.TRUE);
1031 break;
1032
1033 case Event.UNBLOCK_SEND: // emitted by FLOW_CONTROL
1034 if(log.isInfoEnabled()) log.info("received UNBLOCK_SEND");
1035 block_sending.set(Boolean.FALSE);
1036 break;
1037
1038 default:
1039 break;
1040 }
1041
1042
1043 // If UpHandler is installed, pass all events to it and return (UpHandler is e.g. a building block)
1044 if(up_handler != null) {
1045 up_handler.up(evt);
1046 return;
1047 }
1048
1049 switch(type) {
1050 case Event.MSG:
1051 if(receiver != null) {
1052 receiver.receive((Message)evt.getArg());
1053 return;
1054 }
1055 break;
1056 case Event.VIEW_CHANGE:
1057 if(receiver != null) {
1058 receiver.viewAccepted((View)evt.getArg());
1059 return;
1060 }
1061 break;
1062 case Event.SUSPECT:
1063 if(receiver != null) {
1064 receiver.suspect((Address)evt.getArg());
1065 return;
1066 }
1067 break;
1068 case Event.GET_APPLSTATE:
1069 if(receiver != null) {
1070 byte[] tmp_state=receiver.getState();
1071 returnState(tmp_state);
1072 return;
1073 }
1074 break;
1075 case Event.BLOCK:
1076 if(receiver != null) {
1077 receiver.block();
1078 return;
1079 }
1080 break;
1081 default:
1082 break;
1083 }
1084
1085 if(type == Event.MSG || type == Event.VIEW_CHANGE || type == Event.SUSPECT ||
1086 type == Event.GET_APPLSTATE || type == Event.BLOCK) {
1087 try {
1088 mq.add(evt);
1089 }
1090 catch(Exception e) {
1091 if(log.isErrorEnabled()) log.error("exception: " + e);
1092 }
1093 }
1094 }
1095
1096
1097 /**
1098 * Sends a message through the protocol stack if the stack is available
1099 * @param evt the message to send down, encapsulated in an event
1100 */
1101 public void down(Event evt) {
1102 if(evt == null) return;
1103
1104 if(suspended) {
1105 synchronized(suspend_mutex) {
1106 while(suspended) {
1107 try {
1108 suspend_mutex.wait();
1109 }
1110 catch(InterruptedException e) {
1111 }
1112 }
1113 }
1114 }
1115
1116 int type=evt.getType();
1117
1118 // only block for messages; all other events are passed through
1119 // we use double-checked locking; it is okay to 'lose' one or more messages because block_sending changes
1120 // to true after an initial false value
1121 if(type == Event.MSG && block_sending.get().equals(Boolean.TRUE)) {
1122 if(log.isTraceEnabled()) log.trace("down() blocks because block_sending == true");
1123 block_sending.waitUntil(Boolean.FALSE);
1124 }
1125
1126 // handle setting of additional data (kludge, will be removed soon)
1127 if(type == Event.CONFIG) {
1128 try {
1129 Map m=(Map)evt.getArg();
1130 if(m != null && m.containsKey("additional_data")) {
1131 additional_data=(byte[])m.get("additional_data");
1132 }
1133 }
1134 catch(Throwable t) {
1135 if(log.isErrorEnabled()) log.error("CONFIG event did not contain a hashmap: " + t);
1136 }
1137 }
1138
1139 if(prot_stack != null)
1140 prot_stack.down(evt);
1141 else
1142 if(log.isErrorEnabled()) log.error("no protocol stack available");
1143 }
1144
1145 /** Send() blocks from now on, until resume() is called */
1146 public void suspend() {
1147 synchronized(suspend_mutex) {
1148 suspended=true;
1149 }
1150 }
1151
1152 /** Send() unblocks */
1153 public void resume() {
1154 synchronized(suspend_mutex) {
1155 suspended=false;
1156 suspend_mutex.notifyAll();
1157 }
1158 }
1159
1160 public boolean isSuspended() {
1161 return suspended;
1162 }
1163
1164
1165 public String toString(boolean details) {
1166 StringBuffer sb=new StringBuffer();
1167 sb.append("local_addr=").append(local_addr).append('\n');
1168 sb.append("channel_name=").append(channel_name).append('\n');
1169 sb.append("my_view=").append(my_view).append('\n');
1170 sb.append("connected=").append(connected).append('\n');
1171 sb.append("closed=").append(closed).append('\n');
1172 if(mq != null)
1173 sb.append("incoming queue size=").append(mq.size()).append('\n');
1174 if(details) {
1175 sb.append("block_sending=").append(block_sending).append('\n');
1176 sb.append("receive_views=").append(receive_views).append('\n');
1177 sb.append("receive_suspects=").append(receive_suspects).append('\n');
1178 sb.append("receive_blocks=").append(receive_blocks).append('\n');
1179 sb.append("receive_local_msgs=").append(receive_local_msgs).append('\n');
1180 sb.append("receive_get_states=").append(receive_get_states).append('\n');
1181 sb.append("auto_reconnect=").append(auto_reconnect).append('\n');
1182 sb.append("auto_getstate=").append(auto_getstate).append('\n');
1183 sb.append("state_transfer_supported=").append(state_transfer_supported).append('\n');
1184 sb.append("props=").append(props).append('\n');
1185 }
1186
1187 return sb.toString();
1188 }
1189
1190
1191 /* ----------------------------------- Private Methods ------------------------------------- */
1192
1193
1194 /**
1195 * Initializes all variables. Used after <tt>close()</tt> or <tt>disconnect()</tt>,
1196 * to be ready for new <tt>connect()</tt>
1197 */
1198 private void init() {
1199 local_addr=null;
1200 channel_name=null;
1201 my_view=null;
1202
1203 // changed by Bela Sept 25 2003
1204 //if(mq != null && mq.closed())
1205 // mq.reset();
1206
1207 connect_promise.reset();
1208 disconnect_promise.reset();
1209 connected=false;
1210 block_sending.set(Boolean.FALSE);
1211 }
1212
1213
1214 /**
1215 * health check.<BR>
1216 * throws a ChannelNotConnected exception if the channel is not connected
1217 */
1218 private final void checkNotConnected() throws ChannelNotConnectedException {
1219 if(!connected)
1220 throw new ChannelNotConnectedException();
1221 }
1222
1223 /**
1224 * health check<BR>
1225 * throws a ChannelClosed exception if the channel is closed
1226 */
1227 private final void checkClosed() throws ChannelClosedException {
1228 if(closed)
1229 throw new ChannelClosedException();
1230 }
1231
1232
1233
1234 /**
1235 * returns the value of the event<BR>
1236 * These objects will be returned<BR>
1237 * <PRE>
1238 * <B>Event Type - Return Type</B>
1239 * Event.MSG - returns a Message object
1240 * Event.VIEW_CHANGE - returns a View object
1241 * Event.SUSPECT - returns a SuspectEvent object
1242 * Event.BLOCK - returns a new BlockEvent object
1243 * Event.GET_APPLSTATE - returns a GetStateEvent object
1244 * Event.STATE_RECEIVED- returns a SetStateEvent object
1245 * Event.Exit - returns an ExitEvent object
1246 * All other - return the actual Event object
1247 * </PRE>
1248 * @param evt - the event of which you want to extract the value
1249 * @return the event value if it matches the select list,
1250 * returns null if the event is null
1251 * returns the event itself if a match (See above) can not be made of the event type
1252 */
1253 static Object getEvent(Event evt) {
1254 if(evt == null)
1255 return null; // correct ?
1256
1257 switch(evt.getType()) {
1258 case Event.MSG:
1259 return evt.getArg();
1260 case Event.VIEW_CHANGE:
1261 return evt.getArg();
1262 case Event.SUSPECT:
1263 return new SuspectEvent(evt.getArg());
1264 case Event.BLOCK:
1265 return new BlockEvent();
1266 case Event.GET_APPLSTATE:
1267 return new GetStateEvent(evt.getArg());
1268 case Event.STATE_RECEIVED:
1269 return new SetStateEvent((byte[])evt.getArg());
1270 case Event.EXIT:
1271 return new ExitEvent();
1272 default:
1273 return evt;
1274 }
1275 }
1276
1277
1278 /**
1279 * Receives the state from the group and modifies the JChannel.state object<br>
1280 * This method initializes the local state variable to null, and then sends the state
1281 * event down the stack. It waits for a GET_STATE_OK event to bounce back
1282 * @param evt the get state event, has to be of type Event.GET_STATE
1283 * @param timeout the number of milliseconds to wait for the GET_STATE_OK response
1284 * @return true of the state was received, false if the operation timed out
1285 */
1286 boolean _getState(Event evt, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
1287 checkClosed();
1288 checkNotConnected();
1289 if(!state_transfer_supported) {
1290 log.error("fetching s