Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/JChannel.java


1   // $Id: JChannel.java,v 1.44 2005/11/08 13:57:08 belaban Exp $
2   
3   package org.jgroups;
4   
5   import org.apache.commons.logging.Log;
6   import org.apache.commons.logging.LogFactory;
7   import org.jgroups.conf.ConfiguratorFactory;
8   import org.jgroups.conf.ProtocolStackConfigurator;
9   import org.jgroups.stack.ProtocolStack;
10  import org.jgroups.stack.StateTransferInfo;
11  import org.jgroups.util.*;
12  import org.w3c.dom.Element;
13  
14  import java.io.File;
15  import java.io.Serializable;
16  import java.net.URL;
17  import java.util.HashMap;
18  import java.util.Map;
19  import java.util.Vector;
20  
21  /**
22   * JChannel is a pure Java implementation of Channel.
23   * When a JChannel object is instantiated it automatically sets up the
24   * protocol stack.
25   * <p>
26   * <B>Properties</B>
27   * <P>
28   * Properties are used to configure a channel, and are accepted in
29   * several forms; the String form is described here.
30   * A property string consists of a number of properties separated by
31   * colons.  For example:
32   * <p>
33   * <pre>"&lt;prop1&gt;(arg1=val1):&lt;prop2&gt;(arg1=val1;arg2=val2):&lt;prop3&gt;:&lt;propn&gt;"</pre>
34   * <p>
35   * Each property relates directly to a protocol layer, which is
36   * implemented as a Java class. When a protocol stack is to be created
37   * based on the above property string, the first property becomes the
38   * bottom-most layer, the second one will be placed on the first, etc.:
39   * the stack is created from the bottom to the top, as the string is
40   * parsed from left to right. Each property has to be the name of a
41   * Java class that resides in the
42   * {@link org.jgroups.protocols} package.
43   * <p>
44   * Note that only the base name has to be given, not the fully specified
45   * class name (e.g., UDP instead of org.jgroups.protocols.UDP).
46   * <p>
47   * Each layer may have 0 or more arguments, which are specified as a
48   * list of name/value pairs in parentheses directly after the property.
49   * In the example above, the first protocol layer has 1 argument,
50   * the second 2, the third none. When a layer is created, these
51   * properties (if there are any) will be set in a layer by invoking
52   * the layer's setProperties() method
53   * <p>
54   * As an example the property string below instructs JGroups to create
55   * a JChannel with protocols UDP, PING, FD and GMS:<p>
56   * <pre>"UDP(mcast_addr=228.10.9.8;mcast_port=5678):PING:FD:GMS"</pre>
57   * <p>
58   * The UDP protocol layer is at the bottom of the stack, and it
59   * should use mcast address 228.10.9.8. and port 5678 rather than
60   * the default IP multicast address and port. The only other argument
61   * instructs FD to output debug information while executing.
62   * Property UDP refers to a class {@link org.jgroups.protocols.UDP},
63   * which is subsequently loaded and an instance of which is created as protocol layer.
64   * If any of these classes are not found, an exception will be thrown and
65   * the construction of the stack will be aborted.
66   *
67   * @author Bela Ban
68   * @author Filip Hanik
69   * @version $Revision: 1.44 $
70   */
71  public class JChannel extends Channel {
72  
73      /**
74       * The default protocol stack used by the default constructor.
75       */
76      public static final String DEFAULT_PROTOCOL_STACK=
77              "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +
78              "PING(timeout=3000;num_initial_members=6):" +
79              "FD(timeout=3000):" +
80              "VERIFY_SUSPECT(timeout=1500):" +
81              "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" +
82              "UNICAST(timeout=600,1200,2400,4800):" +
83              "pbcast.STABLE(desired_avg_gossip=10000):" +
84              "FRAG:" +
85              "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
86              "shun=true;print_local_addr=true)";
87  
88      static final String FORCE_PROPS="force.properties";
89  
90      /* the protocol stack configuration string */
91      private String props=null;
92  
93      /*the address of this JChannel instance*/
94      private Address local_addr=null;
95      /*the channel (also know as group) name*/
96      private String channel_name=null;  // group name
97      /*the latest view of the group membership*/
98      private View my_view=null;
99      /*the queue that is used to receive messages (events) from the protocol stack*/
100     private final Queue mq=new Queue();
101     /*the protocol stack, used to send and receive messages from the protocol stack*/
102     private ProtocolStack prot_stack=null;
103 
104     /** Thread responsible for closing a channel and potentially reconnecting to it (e.g., when shunned). */
105     protected CloserThread closer=null;
106 
107     /** To wait until a local address has been assigned */
108     private final Promise local_addr_promise=new Promise();
109 
110     /** To wait until we have connected successfully */
111     private final Promise connect_promise=new Promise();
112 
113     /** To wait until we have been disconnected from the channel */
114     private final Promise disconnect_promise=new Promise();
115 
116     private final Promise state_promise=new Promise();
117 
118     private final Object suspend_mutex=new Object();
119     private boolean suspended=false;
120 
121     /** wait until we have a non-null local_addr */
122     private long LOCAL_ADDR_TIMEOUT=30000; //=Long.parseLong(System.getProperty("local_addr.timeout", "30000"));
123     /*if the states is fetched automatically, this is the default timeout, 5 secs*/
124     private static final long GET_STATE_DEFAULT_TIMEOUT=5000;
125     /*flag to indicate whether to receive views from the protocol stack*/
126     private boolean receive_views=true;
127     /*flag to indicate whether to receive suspect messages*/
128     private boolean receive_suspects=true;
129     /*flag to indicate whether to receive blocks, if this is set to true, receive_views is set to true*/
130     private boolean receive_blocks=false;
131     /*flag to indicate whether to receive local messages
132      *if this is set to false, the JChannel will not receive messages sent by itself*/
133     private boolean receive_local_msgs=true;
134     /*flag to indicate whether to receive a state message or not*/
135     private boolean receive_get_states=false;
136     /*flag to indicate whether the channel will reconnect (reopen) when the exit message is received*/
137     private boolean auto_reconnect=false;
138     /*flag t indicate whether the state is supposed to be retrieved after the channel is reconnected
139      *setting this to true, automatically forces auto_reconnect to true*/
140     private boolean auto_getstate=false;
141     /*channel connected flag*/
142     private boolean connected=false;
143 
144     /** block send()/down() if true (unlocked by UNBLOCK_SEND event) */
145     private final CondVar block_sending=new CondVar("block_sending", Boolean.FALSE);
146 
147     /*channel closed flag*/
148     private boolean closed=false;      // close() has been called, channel is unusable
149 
150     /** True if a state transfer protocol is available, false otherwise */
151     private boolean state_transfer_supported=false; // set by CONFIG event from STATE_TRANSFER protocol
152 
153     /** Used to maintain additional data across channel disconnects/reconnects. This is a kludge and will be remove
154      * as soon as JGroups supports logical addresses
155      */
156     private byte[] additional_data=null;
157 
158     protected final Log log=LogFactory.getLog(getClass());
159 
160     /** Collect statistics */
161     protected boolean stats=true;
162 
163     protected long sent_msgs=0, received_msgs=0, sent_bytes=0, received_bytes=0;
164 
165 
166 
167     /**
168      * Constructs a <code>JChannel</code> instance with the protocol stack
169      * specified by the <code>DEFAULT_PROTOCOL_STACK</code> member.
170      *
171      * @throws ChannelException if problems occur during the initialization of
172      *                          the protocol stack.
173      */
174     public JChannel() throws ChannelException {
175         this(DEFAULT_PROTOCOL_STACK);
176     }
177 
178     /**
179      * Constructs a <code>JChannel</code> instance with the protocol stack
180      * configuration contained by the specified file.
181      *
182      * @param properties a file containing a JGroups XML protocol stack
183      *                   configuration.
184      *
185      * @throws ChannelException if problems occur during the configuration or
186      *                          initialization of the protocol stack.
187      */
188     public JChannel(File properties) throws ChannelException {
189         this(ConfiguratorFactory.getStackConfigurator(properties));
190     }
191 
192     /**
193      * Constructs a <code>JChannel</code> instance with the protocol stack
194      * configuration contained by the specified XML element.
195      *
196      * @param properties a XML element containing a JGroups XML protocol stack
197      *                   configuration.
198      *
199      * @throws ChannelException if problems occur during the configuration or
200      *                          initialization of the protocol stack.
201      */
202     public JChannel(Element properties) throws ChannelException {
203         this(ConfiguratorFactory.getStackConfigurator(properties));
204     }
205 
206     /**
207      * Constructs a <code>JChannel</code> instance with the protocol stack
208      * configuration indicated by the specified URL.
209      *
210      * @param properties a URL pointing to a JGroups XML protocol stack
211      *                   configuration.
212      *
213      * @throws ChannelException if problems occur during the configuration or
214      *                          initialization of the protocol stack.
215      */
216     public JChannel(URL properties) throws ChannelException {
217         this(ConfiguratorFactory.getStackConfigurator(properties));
218     }
219 
220     /**
221      * Constructs a <code>JChannel</code> instance with the protocol stack
222      * configuration based upon the specified properties parameter.
223      *
224      * @param properties an old style property string, a string representing a
225      *                   system resource containing a JGroups XML configuration,
226      *                   a string representing a URL pointing to a JGroups XML
227      *                   XML configuration, or a string representing a file name
228      *                   that contains a JGroups XML configuration.
229      *
230      * @throws ChannelException if problems occur during the configuration and
231      *                          initialization of the protocol stack.
232      */
233     public JChannel(String properties) throws ChannelException {
234         this(ConfiguratorFactory.getStackConfigurator(properties));
235     }
236 
237     /**
238      * Constructs a <code>JChannel</code> instance with the protocol stack
239      * configuration contained by the protocol stack configurator parameter.
240      * <p>
241      * All of the public constructors of this class eventually delegate to this
242      * method.
243      *
244      * @param configurator a protocol stack configurator containing a JGroups
245      *                     protocol stack configuration.
246      *
247      * @throws ChannelException if problems occur during the initialization of
248      *                          the protocol stack.
249      */
250     protected JChannel(ProtocolStackConfigurator configurator) throws ChannelException {
251         props = configurator.getProtocolStackString();
252 
253         /*create the new protocol stack*/
254         prot_stack=new ProtocolStack(this, props);
255 
256         /* Setup protocol stack (create layers, queues between them */
257         try {
258             prot_stack.setup();
259         }
260         catch(Throwable e) {
261             throw new ChannelException("unable to setup the protocol stack", e);
262         }
263     }
264 
265     /**
266      * Creates a new JChannel with the protocol stack as defined in the properties
267      * parameter. an example of this parameter is<BR>
268      * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"<BR>
269      * Other examples can be found in the ./conf directory<BR>
270      * @param properties the protocol stack setup; if null, the default protocol stack will be used.
271      *            The properties can also be a java.net.URL object or a string that is a URL spec.
272      *                   The JChannel will validate any URL object and String object to see if they are a URL.
273      *                   In case of the parameter being a url, the JChannel will try to load the xml from there.
274      *                   In case properties is a org.w3c.dom.Element, the ConfiguratorFactory will parse the
275      *                   DOM tree with the element as its root element.
276      * @deprecated Use the constructors with specific parameter types instead.
277      */
278     public JChannel(Object properties) throws ChannelException {
279         if (properties == null) {
280             properties = DEFAULT_PROTOCOL_STACK;
281         }
282 
283         try {
284             ProtocolStackConfigurator c=ConfiguratorFactory.getStackConfigurator(properties);
285             props=c.getProtocolStackString();
286         }
287         catch(Exception x) {
288             throw new ChannelException("unable to load protocol stack", x);
289         }
290 
291         /*create the new protocol stack*/
292         prot_stack=new ProtocolStack(this, props);
293 
294         /* Setup protocol stack (create layers, queues between them */
295         try {
296             prot_stack.setup();
297         }
298         catch(Throwable e) {
299             throw new ChannelException("failed to setup protocol stack", e);
300         }
301     }
302 
303 
304     /**
305      * Returns the protocol stack.
306      * Currently used by Debugger.
307      * Specific to JChannel, therefore
308      * not visible in Channel
309      */
310     public ProtocolStack getProtocolStack() {
311         return prot_stack;
312     }
313 
314     protected Log getLog() {
315         return log;
316     }
317 
318     /**
319      * returns the protocol stack configuration in string format.
320      * an example of this property is<BR>
321      * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"
322      */
323     public String getProperties() {
324         return props;
325     }
326 
327     public boolean statsEnabled() {
328         return stats;
329     }
330 
331     public void enableStats(boolean stats) {
332         this.stats=stats;
333     }
334 
335     public void resetStats() {
336         sent_msgs=received_msgs=sent_bytes=received_bytes=0;
337     }
338 
339     public long getSentMessages() {return sent_msgs;}
340     public long getSentBytes() {return sent_bytes;}
341     public long getReceivedMessages() {return received_msgs;}
342     public long getReceivedBytes() {return received_bytes;}
343     public int  getNumberOfTasksInTimer() {return prot_stack != null ? prot_stack.timer.size() : -1;}
344 
345     public String dumpTimerQueue() {
346         return prot_stack != null? prot_stack.dumpTimerQueue() : "<n/a";
347     }
348 
349     /**
350      * Returns a pretty-printed form of all the protocols. If include_properties is set,
351      * the properties for each protocol will also be printed.
352      */
353     public String printProtocolSpec(boolean include_properties) {
354         return prot_stack != null ? prot_stack.printProtocolSpec(include_properties) : null;
355     }
356 
357 
358     /**
359      * Connects the channel to a group.
360      * If the channel is already connected, an error message will be printed to the error log.
361      * If the channel is closed a ChannelClosed exception will be thrown.
362      * This method starts the protocol stack by calling ProtocolStack.start,
363      * then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event.
364      * Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified
365      * and the channel is considered connected.
366      *
367      * @param channel_name A <code>String</code> denoting the group name. Cannot be null.
368      * @exception ChannelException The protocol stack cannot be started
369      * @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
370      *                                   A new channel has to be created first.
371      */
372     public synchronized void connect(String channel_name) throws ChannelException, ChannelClosedException {
373         /*make sure the channel is not closed*/
374         checkClosed();
375 
376         /*if we already are connected, then ignore this*/
377         if(connected) {
378             if(log.isErrorEnabled()) log.error("already connected to " + channel_name);
379             return;
380         }
381 
382         /*make sure we have a valid channel name*/
383         if(channel_name == null) {
384             if(log.isInfoEnabled()) log.info("channel_name is null, assuming unicast channel");
385         }
386         else
387             this.channel_name=channel_name;
388 
389         try {
390             prot_stack.startStack(); // calls start() in all protocols, from top to bottom
391         }
392         catch(Throwable e) {
393             throw new ChannelException("failed to start protocol stack", e);
394         }
395 
396         /* try to get LOCAL_ADDR_TIMEOUT. Catch SecurityException if called in an untrusted environment (e.g. using JNLP) */
397         try {
398             LOCAL_ADDR_TIMEOUT=Long.parseLong(System.getProperty("local_addr.timeout","30000"));
399         }
400         catch (SecurityException e1) {
401             /* Use the default value specified above*/
402         }
403 
404     /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
405         local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT);
406         if(local_addr == null) {
407             log.fatal("local_addr is null; cannot connect");
408             throw new ChannelException("local_addr is null");
409         }
410 
411 
412         /*create a temporary view, assume this channel is the only member and
413          *is the coordinator*/
414         Vector t=new Vector(1);
415         t.addElement(local_addr);
416         my_view=new View(local_addr, 0, t);  // create a dummy view
417 
418         // only connect if we are not a unicast channel
419         if(channel_name != null) {
420             connect_promise.reset();
421             Event connect_event=new Event(Event.CONNECT, channel_name);
422             down(connect_event);
423             connect_promise.getResult();  // waits forever until connected (or channel is closed)
424         }
425 
426         /*notify any channel listeners*/
427         connected=true;
428         notifyChannelConnected(this);
429     }
430 
431 
432     /**
433      * Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>
434      * Otherwise the following actions happen in the listed order<BR>
435      * <ol>
436      * <li> The JChannel sends a DISCONNECT event down the protocol stack<BR>
437      * <li> Blocks until the channel to receives a DISCONNECT_OK event<BR>
438      * <li> Sends a STOP_QUEING event down the stack<BR>
439      * <li> Stops the protocol stack by calling ProtocolStack.stop()<BR>
440      * <li> Notifies the listener, if the listener is available<BR>
441      * </ol>
442      */
443     public synchronized void disconnect() {
444         if(closed) return;
445 
446         resume();
447 
448         if(connected) {
449 
450             if(channel_name != null) {
451 
452                 /* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a
453                 *  DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a
454                 *  DISCONNECT_OK has been received, or until timeout has elapsed.
455                 */
456                 Event disconnect_event=new Event(Event.DISCONNECT, local_addr);
457                 disconnect_promise.reset();
458                 down(disconnect_event);   // DISCONNECT is handled by each layer
459                 disconnect_promise.getResult(); // wait for DISCONNECT_OK
460             }
461 
462             // Just in case we use the QUEUE protocol and it is still blocked...
463             down(new Event(Event.STOP_QUEUEING));
464 
465             connected=false;
466             try {
467                 prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom
468             }
469             catch(Exception e) {
470                 if(log.isErrorEnabled()) log.error("exception: " + e);
471             }
472             notifyChannelDisconnected(this);
473             init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
474         }
475     }
476 
477 
478     /**
479      * Destroys the channel.
480      * After this method has been called, the channel us unusable.<BR>
481      * This operation will disconnect the channel and close the channel receive queue immediately<BR>
482      */
483     public synchronized void close() {
484         _close(true, true); // by default disconnect before closing channel and close mq
485     }
486 
487 
488     /** Shuts down the channel without disconnecting */
489     public synchronized void shutdown() {
490         _close(false, true); // by default disconnect before closing channel and close mq
491     }
492 
493     /**
494      * Opens the channel.
495      * This does the following actions:
496      * <ol>
497      * <li> Resets the receiver queue by calling Queue.reset
498      * <li> Sets up the protocol stack by calling ProtocolStack.setup
499      * <li> Sets the closed flag to false
500      * </ol>
501      */
502     public synchronized void open() throws ChannelException {
503         if(!closed)
504             throw new ChannelException("channel is already open");
505 
506         try {
507             mq.reset();
508 
509             // new stack is created on open() - bela June 12 2003
510             prot_stack=new ProtocolStack(this, props);
511             prot_stack.setup();
512             closed=false;
513         }
514         catch(Exception e) {
515             throw new ChannelException("failed to open channel" , e);
516         }
517     }
518 
519     /**
520      * returns true if the Open operation has been called successfully
521      */
522     public boolean isOpen() {
523         return !closed;
524     }
525 
526 
527     /**
528      * returns true if the Connect operation has been called successfully
529      */
530     public boolean isConnected() {
531         return connected;
532     }
533 
534     public int getNumMessages() {
535         return mq != null? mq.size() : -1;
536     }
537 
538 
539     public String dumpQueue() {
540         return Util.dumpQueue(mq);
541     }
542 
543     /**
544      * Returns a map of statistics of the various protocols and of the channel itself.
545      * @return Map<String,Map>. A map where the keys are the protocols ("channel" pseudo key is
546      * used for the channel itself") and the values are property maps.
547      */
548     public Map dumpStats() {
549         Map retval=prot_stack.dumpStats();
550         if(retval != null) {
551             Map tmp=dumpChannelStats();
552             if(tmp != null)
553                 retval.put("channel", tmp);
554         }
555         return retval;
556     }
557 
558     private Map dumpChannelStats() {
559         Map retval=new HashMap();
560         retval.put("sent_msgs", new Long(sent_msgs));
561         retval.put("sent_bytes", new Long(sent_bytes));
562         retval.put("received_msgs", new Long(received_msgs));
563         retval.put("received_bytes", new Long(received_bytes));
564         return retval;
565     }
566 
567 
568     /**
569      * Sends a message through the protocol stack.
570      * Implements the Transport interface.
571      * 
572      * @param msg the message to be sent through the protocol stack,
573      *        the destination of the message is specified inside the message itself
574      * @exception ChannelNotConnectedException
575      * @exception ChannelClosedException
576      */
577     public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {
578         checkClosed();
579         checkNotConnected();
580         if(stats) {
581             sent_msgs++;
582             sent_bytes+=msg.getLength();
583         }
584         down(new Event(Event.MSG, msg));
585     }
586 
587 
588     /**
589      * creates a new message with the destination address, and the source address
590      * and the object as the message value
591      * @param dst - the destination address of the message, null for all members
592      * @param src - the source address of the message
593      * @param obj - the value of the message
594      * @exception ChannelNotConnectedException
595      * @exception ChannelClosedException
596      * @see JChannel#send
597      */
598     public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException {
599         send(new Message(dst, src, obj));
600     }
601 
602 
603     /**
604      * Blocking receive method.
605      * This method returns the object that was first received by this JChannel and that has not been
606      * received before. After the object is received, it is removed from the receive queue.<BR>
607      * If you only want to inspect the object received without removing it from the queue call
608      * JChannel.peek<BR>
609      * If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR>
610      * By specifying a timeout of 0, the operation blocks forever, or until a message has been received.
611      * @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever
612      * @exception TimeoutException if a timeout occured prior to a new message was received
613      * @exception ChannelNotConnectedException
614      * @exception ChannelClosedException
615      * @see JChannel#peek
616      */
617     public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
618         Object retval=null;
619         Event evt;
620 
621         checkClosed();
622         checkNotConnected();
623 
624         try {
625             evt=(timeout <= 0) ? (Event)mq.remove() : (Event)mq.remove(timeout);
626             retval=getEvent(evt);
627             evt=null;
628             if(stats) {
629                 if(retval != null && retval instanceof Message) {
630                     received_msgs++;
631                     received_bytes+=((Message)retval).getLength();
632                 }
633             }
634             return retval;
635         }
636         catch(QueueClosedException queue_closed) {
637             throw new ChannelClosedException();
638         }
639         catch(TimeoutException t) {
640             throw t;
641         }
642         catch(Exception e) {
643             if(log.isErrorEnabled()) log.error("exception: " + e);
644             return null;
645         }
646     }
647 
648 
649     /**
650      * Just peeks at the next message, view or block. Does <em>not</em> install
651      * new view if view is received<BR>
652      * Does the same thing as JChannel.receive but doesn't remove the object from the
653      * receiver queue
654      */
655     public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
656         Object retval=null;
657         Event evt;
658 
659         checkClosed();
660         checkNotConnected();
661 
662         try {
663             evt=(timeout <= 0) ? (Event)mq.peek() : (Event)mq.peek(timeout);
664             retval=getEvent(evt);
665             evt=null;
666             return retval;
667         }
668         catch(QueueClosedException queue_closed) {
669             if(log.isErrorEnabled()) log.error("exception: " + queue_closed);
670             return null;
671         }
672         catch(TimeoutException t) {
673             return null;
674         }
675         catch(Exception e) {
676             if(log.isErrorEnabled()) log.error("exception: " + e);
677             return null;
678         }
679     }
680 
681 
682 
683 
684     /**
685      * Returns the current view.
686      * <BR>
687      * If the channel is not connected or if it is closed it will return null.
688      * <BR>
689      * @return returns the current group view, or null if the channel is closed or disconnected
690      */
691     public View getView() {
692         return closed || !connected ? null : my_view;
693     }
694 
695 
696     /**
697      * returns the local address of the channel
698      * returns null if the channel is closed
699      */
700     public Address getLocalAddress() {
701         return closed ? null : local_addr;
702     }
703 
704 
705     /**
706      * returns the name of the channel
707      * if the channel is not connected or if it is closed it will return null
708      */
709     public String getChannelName() {
710         return closed ? null : !connected ? null : channel_name;
711     }
712 
713 
714     /**
715      * Sets a channel option.  The options can be one of the following:
716      * <UL>
717      * <LI>    Channel.BLOCK
718      * <LI>    Channel.VIEW
719      * <LI>    Channel.SUSPECT
720      * <LI>    Channel.LOCAL
721      * <LI>    Channel.GET_STATE_EVENTS
722      * <LI>    Channel.AUTO_RECONNECT
723      * <LI>    Channel.AUTO_GETSTATE
724      * </UL>
725      * <P>
726      * There are certain dependencies between the options that you can set,
727      * I will try to describe them here.
728      * <P>
729      * Option: Channel.VIEW option<BR>
730      * Value:  java.lang.Boolean<BR>
731      * Result: set to true the JChannel will receive VIEW change events<BR>
732      *<BR>
733      * Option: Channel.SUSPECT<BR>
734      * Value:  java.lang.Boolean<BR>
735      * Result: set to true the JChannel will receive SUSPECT events<BR>
736      *<BR>
737      * Option: Channel.BLOCK<BR>
738      * Value:  java.lang.Boolean<BR>
739      * Result: set to true will set setOpt(VIEW, true) and the JChannel will receive BLOCKS and VIEW events<BR>
740      *<BR>
741      * Option: GET_STATE_EVENTS<BR>
742      * Value:  java.lang.Boolean<BR>
743      * Result: set to true the JChannel will receive state events<BR>
744      *<BR>
745      * Option: LOCAL<BR>
746      * Value:  java.lang.Boolean<BR>
747      * Result: set to true the JChannel will receive messages that it self sent out.<BR>
748      *<BR>
749      * Option: AUTO_RECONNECT<BR>
750      * Value:  java.lang.Boolean<BR>
751      * Result: set to true and the JChannel will try to reconnect when it is being closed<BR>
752      *<BR>
753      * Option: AUTO_GETSTATE<BR>
754      * Value:  java.lang.Boolean<BR>
755      * Result: set to true, the AUTO_RECONNECT will be set to true and the JChannel will try to get the state after a close and reconnect happens<BR>
756      * <BR>
757      *
758      * @param option the parameter option Channel.VIEW, Channel.SUSPECT, etc
759      * @param value the value to set for this option
760      *
761      */
762     public void setOpt(int option, Object value) {
763         if(closed) {
764             if(log.isWarnEnabled()) log.warn("channel is closed; option not set !");
765             return;
766         }
767 
768         switch(option) {
769             case VIEW:
770                 if(value instanceof Boolean)
771                     receive_views=((Boolean)value).booleanValue();
772                 else
773                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
774                                                      " (" + value + "): value has to be Boolean");
775                 break;
776             case SUSPECT:
777                 if(value instanceof Boolean)
778                     receive_suspects=((Boolean)value).booleanValue();
779                 else
780                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
781                                                      " (" + value + "): value has to be Boolean");
782                 break;
783             case BLOCK:
784                 if(value instanceof Boolean)
785                     receive_blocks=((Boolean)value).booleanValue();
786                 else
787                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
788                                                      " (" + value + "): value has to be Boolean");
789                 if(receive_blocks)
790                     receive_views=true;
791                 break;
792 
793             case GET_STATE_EVENTS:
794                 if(value instanceof Boolean)
795                     receive_get_states=((Boolean)value).booleanValue();
796                 else
797                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
798                                                      " (" + value + "): value has to be Boolean");
799                 break;
800 
801 
802             case LOCAL:
803                 if(value instanceof Boolean)
804                     receive_local_msgs=((Boolean)value).booleanValue();
805                 else
806                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
807                                                      " (" + value + "): value has to be Boolean");
808                 break;
809 
810             case AUTO_RECONNECT:
811                 if(value instanceof Boolean)
812                     auto_reconnect=((Boolean)value).booleanValue();
813                 else
814                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
815                                                      " (" + value + "): value has to be Boolean");
816                 break;
817 
818             case AUTO_GETSTATE:
819                 if(value instanceof Boolean) {
820                     auto_getstate=((Boolean)value).booleanValue();
821                     if(auto_getstate)
822                         auto_reconnect=true;
823                 }
824                 else
825                     if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
826                                                      " (" + value + "): value has to be Boolean");
827                 break;
828 
829             default:
830                 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) + " not known");
831                 break;
832         }
833     }
834 
835 
836     /**
837      * returns the value of an option.
838      * @param option the option you want to see the value for
839      * @return the object value, in most cases java.lang.Boolean
840      * @see JChannel#setOpt
841      */
842     public Object getOpt(int option) {
843         switch(option) {
844             case VIEW:
845 //                return Boolean.valueOf(receive_views);
846               return receive_views ? Boolean.TRUE : Boolean.FALSE;
847             case BLOCK:
848 //                return Boolean.valueOf(receive_blocks);
849               return receive_blocks ? Boolean.TRUE : Boolean.FALSE;
850             case SUSPECT:
851 //                return Boolean.valueOf(receive_suspects);
852               return receive_suspects ? Boolean.TRUE : Boolean.FALSE;
853             case GET_STATE_EVENTS:
854 //                return Boolean.valueOf(receive_get_states);
855               return receive_get_states ? Boolean.TRUE : Boolean.FALSE;
856             case LOCAL:
857 //                return Boolean.valueOf(receive_local_msgs);
858               return receive_local_msgs ? Boolean.TRUE : Boolean.FALSE;
859             default:
860                 if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) + " not known");
861                 return null;
862         }
863     }
864 
865 
866     /**
867      * Called to acknowledge a block() (callback in <code>MembershipListener</code> or
868      * <code>BlockEvent</code> received from call to <code>receive()</code>).
869      * After sending blockOk(), no messages should be sent until a new view has been received.
870      * Calling this method on a closed channel has no effect.
871      */
872     public void blockOk() {
873         down(new Event(Event.BLOCK_OK));
874         down(new Event(Event.START_QUEUEING));
875     }
876 
877 
878     /**
879      * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
880      * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
881      * milliseconds have elapsed. The argument of GET_STATE_OK should be a single object.
882      * @param target - the target member to receive the state from. if null, state is retrieved from coordinator
883      * @param timeout - the number of milliseconds to wait for the operation to complete successfully
884      * @return true of the state was received, false if the operation timed out
885      */
886     public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
887         StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_SINGLE, target);
888         info.timeout=timeout;
889         return _getState(new Event(Event.GET_STATE, info), timeout);
890     }
891 
892 
893     /**
894      * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
895      * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
896      * milliseconds have elapsed. The argument of GET_STATE_OK should be a vector of objects.
897      * @param targets - the target members to receive the state from ( an Address list )
898      * @param timeout - the number of milliseconds to wait for the operation to complete successfully
899      * @return true of the state was received, false if the operation timed out
900      */
901     public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
902         StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_MANY, targets);
903         return _getState(new Event(Event.GET_STATE, info), timeout);
904     }
905 
906 
907     /**
908      * Called by the application is response to receiving a <code>getState()</code> object when
909      * calling <code>receive()</code>.
910      * When the application receives a getState() message on the receive() method,
911      * it should call returnState() to reply with the state of the application
912      * @param state The state of the application as a byte buffer
913      *              (to send over the network).
914      */
915     public void returnState(byte[] state) {
916         down(new Event(Event.GET_APPLSTATE_OK, state));
917     }
918 
919 
920 
921 
922 
923     /**
924      * Callback method <BR>
925      * Called by the ProtocolStack when a message is received.
926      * It will be added to the message queue from which subsequent
927      * <code>Receive</code>s will dequeue it.
928      * @param evt the event carrying the message from the protocol stack
929      */
930     public void up(Event evt) {
931         int type=evt.getType();
932         Message msg;
933 
934 
935         switch(type) {
936 
937         case Event.MSG:
938             msg=(Message)evt.getArg();
939             if(!receive_local_msgs) {  // discard local messages (sent by myself to me)
940                 if(local_addr != null && msg.getSrc() != null)
941                     if(local_addr.equals(msg.getSrc()))
942                         return;
943             }
944             break;
945 
946         case Event.VIEW_CHANGE:
947             my_view=(View)evt.getArg();
948 
949             // crude solution to bug #775120: if we get our first view *before* the CONNECT_OK,
950             // we simply set the state to connected
951             if(connected == false) {
952                 connected=true;
953                 connect_promise.setResult(Boolean.TRUE);
954             }
955 
956             // unblock queueing of messages due to previous BLOCK event:
957             down(new Event(Event.STOP_QUEUEING));
958             if(!receive_views)  // discard if client has not set receving views to on
959                 return;
960             //if(connected == false)
961             //  my_view=(View)evt.getArg();
962             break;
963 
964         case Event.SUSPECT:
965             if(!receive_suspects)
966                 return;
967             break;
968 
969         case Event.GET_APPLSTATE:  // return the application's state
970             if(!receive_get_states) {  // if not set to handle state transfers, send null state
971                 down(new Event(Event.GET_APPLSTATE_OK, null));
972                 return;
973             }
974             break;
975 
976         case Event.CONFIG:
977             HashMap config=(HashMap)evt.getArg();
978             if(config != null && config.containsKey("state_transfer"))
979                 state_transfer_supported=((Boolean)config.get("state_transfer")).booleanValue();
980             break;
981 
982         case Event.BLOCK:
983             // If BLOCK is received by application, then we trust the application to not send
984             // any more messages until a VIEW_CHANGE is received. Otherwise (BLOCKs are disabled),
985             // we queue any messages sent until the next VIEW_CHANGE (they will be sent in the
986             // next view)
987 
988             if(!receive_blocks) {  // discard if client has not set 'receiving blocks' to 'on'
989                 down(new Event(Event.BLOCK_OK));
990                 down(new Event(Event.START_QUEUEING));
991                 return;
992             }
993             break;
994 
995         case Event.CONNECT_OK:
996             connect_promise.setResult(Boolean.TRUE);
997             break;
998 
999         case Event.DISCONNECT_OK:
1000            disconnect_promise.setResult(Boolean.TRUE);
1001            break;
1002
1003        case Event.GET_STATE_OK:
1004            Object state=evt.getArg();
1005            state_promise.setResult(state);
1006            if(up_handler != null) {
1007                up_handler.up(evt);
1008                return;
1009            }
1010            if(state != null) {
1011                if(receiver != null) {
1012                    receiver.setState((byte[])state);
1013                }
1014                else {
1015                    try {mq.add(new Event(Event.STATE_RECEIVED, state));} catch(Exception e) {}
1016                }
1017            }
1018            break;
1019
1020        case Event.SET_LOCAL_ADDRESS:
1021            local_addr_promise.setResult(evt.getArg());
1022            break;
1023
1024        case Event.EXIT:
1025            handleExit(evt);
1026            return;  // no need to pass event up; already done in handleExit()
1027
1028        case Event.BLOCK_SEND: // emitted by FLOW_CONTROL
1029            if(log.isInfoEnabled()) log.info("received BLOCK_SEND");
1030            block_sending.set(Boolean.TRUE);
1031            break;
1032
1033        case Event.UNBLOCK_SEND:  // emitted by FLOW_CONTROL
1034            if(log.isInfoEnabled()) log.info("received UNBLOCK_SEND");
1035            block_sending.set(Boolean.FALSE);
1036            break;
1037
1038        default:
1039            break;
1040        }
1041
1042
1043        // If UpHandler is installed, pass all events to it and return (UpHandler is e.g. a building block)
1044        if(up_handler != null) {
1045            up_handler.up(evt);
1046            return;
1047        }
1048
1049        switch(type) {
1050            case Event.MSG:
1051                if(receiver != null) {
1052                    receiver.receive((Message)evt.getArg());
1053                    return;
1054                }
1055                break;
1056            case Event.VIEW_CHANGE:
1057                if(receiver != null) {
1058                    receiver.viewAccepted((View)evt.getArg());
1059                    return;
1060                }
1061                break;
1062            case Event.SUSPECT:
1063                if(receiver != null) {
1064                    receiver.suspect((Address)evt.getArg());
1065                    return;
1066                }
1067                break;
1068            case Event.GET_APPLSTATE:
1069                if(receiver != null) {
1070                    byte[] tmp_state=receiver.getState();
1071                    returnState(tmp_state);
1072                    return;
1073                }
1074                break;
1075            case Event.BLOCK:
1076                if(receiver != null) {
1077                    receiver.block();
1078                    return;
1079                }
1080                break;
1081            default:
1082                break;
1083        }
1084
1085        if(type == Event.MSG || type == Event.VIEW_CHANGE || type == Event.SUSPECT ||
1086                type == Event.GET_APPLSTATE || type == Event.BLOCK) {
1087            try {
1088                mq.add(evt);
1089            }
1090            catch(Exception e) {
1091                if(log.isErrorEnabled()) log.error("exception: " + e);
1092            }
1093        }
1094    }
1095
1096
1097    /**
1098     * Sends a message through the protocol stack if the stack is available
1099     * @param evt the message to send down, encapsulated in an event
1100     */
1101    public void down(Event evt) {
1102        if(evt == null) return;
1103
1104        if(suspended) {
1105            synchronized(suspend_mutex) {
1106                while(suspended) {
1107                    try {
1108                        suspend_mutex.wait();
1109                    }
1110                    catch(InterruptedException e) {
1111                    }
1112                }
1113            }
1114        }
1115
1116        int type=evt.getType();
1117
1118        // only block for messages; all other events are passed through
1119        // we use double-checked locking; it is okay to 'lose' one or more messages because block_sending changes
1120        // to true after an initial false value
1121        if(type == Event.MSG && block_sending.get().equals(Boolean.TRUE)) {
1122            if(log.isTraceEnabled()) log.trace("down() blocks because block_sending == true");
1123            block_sending.waitUntil(Boolean.FALSE);
1124        }
1125
1126        // handle setting of additional data (kludge, will be removed soon)
1127        if(type == Event.CONFIG) {
1128            try {
1129                Map m=(Map)evt.getArg();
1130                if(m != null && m.containsKey("additional_data")) {
1131                    additional_data=(byte[])m.get("additional_data");
1132                }
1133            }
1134            catch(Throwable t) {
1135                if(log.isErrorEnabled()) log.error("CONFIG event did not contain a hashmap: " + t);
1136            }
1137        }
1138
1139        if(prot_stack != null)
1140            prot_stack.down(evt);
1141        else
1142            if(log.isErrorEnabled()) log.error("no protocol stack available");
1143    }
1144
1145    /** Send() blocks from now on, until resume() is called */
1146    public void suspend() {
1147        synchronized(suspend_mutex) {
1148            suspended=true;
1149        }
1150    }
1151
1152    /** Send() unblocks */
1153    public void resume() {
1154        synchronized(suspend_mutex) {
1155            suspended=false;
1156            suspend_mutex.notifyAll();
1157        }
1158    }
1159
1160    public boolean isSuspended() {
1161        return suspended;
1162    }
1163
1164
1165    public String toString(boolean details) {
1166        StringBuffer sb=new StringBuffer();
1167        sb.append("local_addr=").append(local_addr).append('\n');
1168        sb.append("channel_name=").append(channel_name).append('\n');
1169        sb.append("my_view=").append(my_view).append('\n');
1170        sb.append("connected=").append(connected).append('\n');
1171        sb.append("closed=").append(closed).append('\n');
1172        if(mq != null)
1173            sb.append("incoming queue size=").append(mq.size()).append('\n');
1174        if(details) {
1175            sb.append("block_sending=").append(block_sending).append('\n');
1176            sb.append("receive_views=").append(receive_views).append('\n');
1177            sb.append("receive_suspects=").append(receive_suspects).append('\n');
1178            sb.append("receive_blocks=").append(receive_blocks).append('\n');
1179            sb.append("receive_local_msgs=").append(receive_local_msgs).append('\n');
1180            sb.append("receive_get_states=").append(receive_get_states).append('\n');
1181            sb.append("auto_reconnect=").append(auto_reconnect).append('\n');
1182            sb.append("auto_getstate=").append(auto_getstate).append('\n');
1183            sb.append("state_transfer_supported=").append(state_transfer_supported).append('\n');
1184            sb.append("props=").append(props).append('\n');
1185        }
1186
1187        return sb.toString();
1188    }
1189
1190
1191    /* ----------------------------------- Private Methods ------------------------------------- */
1192
1193
1194    /**
1195     * Initializes all variables. Used after <tt>close()</tt> or <tt>disconnect()</tt>,
1196     * to be ready for new <tt>connect()</tt>
1197     */
1198    private void init() {
1199        local_addr=null;
1200        channel_name=null;
1201        my_view=null;
1202
1203        // changed by Bela Sept 25 2003
1204        //if(mq != null && mq.closed())
1205          //  mq.reset();
1206
1207        connect_promise.reset();
1208        disconnect_promise.reset();
1209        connected=false;
1210        block_sending.set(Boolean.FALSE);
1211    }
1212
1213
1214    /**
1215     * health check.<BR>
1216     * throws a ChannelNotConnected exception if the channel is not connected
1217     */
1218    private final void checkNotConnected() throws ChannelNotConnectedException {
1219        if(!connected)
1220            throw new ChannelNotConnectedException();
1221    }
1222
1223    /**
1224     * health check<BR>
1225     * throws a ChannelClosed exception if the channel is closed
1226     */
1227    private final void checkClosed() throws ChannelClosedException {
1228        if(closed)
1229            throw new ChannelClosedException();
1230    }
1231
1232
1233
1234    /**
1235     * returns the value of the event<BR>
1236     * These objects will be returned<BR>
1237     * <PRE>
1238     * <B>Event Type    - Return Type</B>
1239     * Event.MSG           - returns a Message object
1240     * Event.VIEW_CHANGE   - returns a View object
1241     * Event.SUSPECT       - returns a SuspectEvent object
1242     * Event.BLOCK         - returns a new BlockEvent object
1243     * Event.GET_APPLSTATE - returns a GetStateEvent object
1244     * Event.STATE_RECEIVED- returns a SetStateEvent object
1245     * Event.Exit          - returns an ExitEvent object
1246     * All other           - return the actual Event object
1247     * </PRE>
1248     * @param   evt - the event of which you want to extract the value
1249     * @return the event value if it matches the select list,
1250     *         returns null if the event is null
1251     *         returns the event itself if a match (See above) can not be made of the event type
1252     */
1253    static Object getEvent(Event evt) {
1254        if(evt == null)
1255            return null; // correct ?
1256
1257        switch(evt.getType()) {
1258            case Event.MSG:
1259                return evt.getArg();
1260            case Event.VIEW_CHANGE:
1261                return evt.getArg();
1262            case Event.SUSPECT:
1263                return new SuspectEvent(evt.getArg());
1264            case Event.BLOCK:
1265                return new BlockEvent();
1266            case Event.GET_APPLSTATE:
1267                return new GetStateEvent(evt.getArg());
1268            case Event.STATE_RECEIVED:
1269                return new SetStateEvent((byte[])evt.getArg());
1270            case Event.EXIT:
1271                return new ExitEvent();
1272            default:
1273                return evt;
1274        }
1275    }
1276
1277
1278    /**
1279     * Receives the state from the group and modifies the JChannel.state object<br>
1280     * This method initializes the local state variable to null, and then sends the state
1281     * event down the stack. It waits for a GET_STATE_OK event to bounce back
1282     * @param evt the get state event, has to be of type Event.GET_STATE
1283     * @param timeout the number of milliseconds to wait for the GET_STATE_OK response
1284     * @return true of the state was received, false if the operation timed out
1285     */
1286    boolean _getState(Event evt, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
1287        checkClosed();
1288        checkNotConnected();
1289        if(!state_transfer_supported) {
1290            log.error("fetching s