Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » tribes » group » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.catalina.tribes.group;
   18   
   19   
   20   import java.io.Serializable;
   21   import java.util.ArrayList;
   22   import java.util.Iterator;
   23   
   24   import org.apache.catalina.tribes.ByteMessage;
   25   import org.apache.catalina.tribes.Channel;
   26   import org.apache.catalina.tribes.ChannelException;
   27   import org.apache.catalina.tribes.ChannelInterceptor;
   28   import org.apache.catalina.tribes.ChannelListener;
   29   import org.apache.catalina.tribes.ChannelMessage;
   30   import org.apache.catalina.tribes.ChannelReceiver;
   31   import org.apache.catalina.tribes.ChannelSender;
   32   import org.apache.catalina.tribes.ErrorHandler;
   33   import org.apache.catalina.tribes.ManagedChannel;
   34   import org.apache.catalina.tribes.Member;
   35   import org.apache.catalina.tribes.MembershipListener;
   36   import org.apache.catalina.tribes.MembershipService;
   37   import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
   38   import org.apache.catalina.tribes.io.ChannelData;
   39   import org.apache.catalina.tribes.io.XByteBuffer;
   40   import org.apache.catalina.tribes.UniqueId;
   41   import org.apache.catalina.tribes.Heartbeat;
   42   import org.apache.catalina.tribes.io.BufferPool;
   43   import org.apache.catalina.tribes.RemoteProcessException;
   44   import org.apache.catalina.tribes.util.Logs;
   45   import org.apache.catalina.tribes.util.Arrays;
   46   
   47   /**
   48    * The default implementation of a Channel.<br>
   49    * The GroupChannel manages the replication channel. It coordinates
   50    * message being sent and received with membership announcements.
   51    * The channel has an chain of interceptors that can modify the message or perform other logic.<br>
   52    * It manages a complete group, both membership and replication.
   53    * @author Filip Hanik
   54    * @version $Revision: 568742 $, $Date: 2007-08-22 22:19:54 +0200 (mer., 22 août 2007) $
   55    */
   56   public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel {
   57       /**
   58        * Flag to determine if the channel manages its own heartbeat
   59        * If set to true, the channel will start a local thread for the heart beat.
   60        */
   61       protected boolean heartbeat = true;
   62       /**
   63        * If <code>heartbeat == true</code> then how often do we want this
   64        * heartbeat to run. default is one minute
   65        */
   66       protected long heartbeatSleeptime = 5*1000;//every 5 seconds
   67   
   68       /**
   69        * Internal heartbeat thread
   70        */
   71       protected HeartbeatThread hbthread = null;
   72   
   73       /**
   74        * The  <code>ChannelCoordinator</code> coordinates the bottom layer components:<br>
   75        * - MembershipService<br>
   76        * - ChannelSender <br>
   77        * - ChannelReceiver<br>
   78        */
   79       protected ChannelCoordinator coordinator = new ChannelCoordinator();
   80   
   81       /**
   82        * The first interceptor in the inteceptor stack.
   83        * The interceptors are chained in a linked list, so we only need a reference to the
   84        * first one
   85        */
   86       protected ChannelInterceptor interceptors = null;
   87   
   88       /**
   89        * A list of membership listeners that subscribe to membership announcements
   90        */
   91       protected ArrayList membershipListeners = new ArrayList();
   92   
   93       /**
   94        * A list of channel listeners that subscribe to incoming messages
   95        */
   96       protected ArrayList channelListeners = new ArrayList();
   97   
   98       /**
   99        * If set to true, the GroupChannel will check to make sure that
  100        */
  101       protected boolean optionCheck = false;
  102   
  103       /**
  104        * Creates a GroupChannel. This constructor will also
  105        * add the first interceptor in the GroupChannel.<br>
  106        * The first interceptor is always the channel itself.
  107        */
  108       public GroupChannel() {
  109           addInterceptor(this);
  110       }
  111   
  112   
  113       /**
  114        * Adds an interceptor to the stack for message processing<br>
  115        * Interceptors are ordered in the way they are added.<br>
  116        * <code>channel.addInterceptor(A);</code><br>
  117        * <code>channel.addInterceptor(C);</code><br>
  118        * <code>channel.addInterceptor(B);</code><br>
  119        * Will result in a interceptor stack like this:<br>
  120        * <code>A -> C -> B</code><br>
  121        * The complete stack will look like this:<br>
  122        * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
  123        * @param interceptor ChannelInterceptorBase
  124        */
  125       public void addInterceptor(ChannelInterceptor interceptor) {
  126           if ( interceptors == null ) {
  127               interceptors = interceptor;
  128               interceptors.setNext(coordinator);
  129               interceptors.setPrevious(null);
  130               coordinator.setPrevious(interceptors);
  131           } else {
  132               ChannelInterceptor last = interceptors;
  133               while ( last.getNext() != coordinator ) {
  134                   last = last.getNext();
  135               }
  136               last.setNext(interceptor);
  137               interceptor.setNext(coordinator);
  138               interceptor.setPrevious(last);
  139               coordinator.setPrevious(interceptor);
  140           }
  141       }
  142   
  143       /**
  144        * Sends a heartbeat through the interceptor stack.<br>
  145        * Invoke this method from the application on a periodic basis if
  146        * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code>
  147        */
  148       public void heartbeat() {
  149           super.heartbeat();
  150           Iterator i = membershipListeners.iterator();
  151           while ( i.hasNext() ) {
  152               Object o = i.next();
  153               if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
  154           }
  155           i = channelListeners.iterator();
  156           while ( i.hasNext() ) {
  157               Object o = i.next();
  158               if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
  159           }
  160   
  161       }
  162   
  163   
  164       /**
  165        * Send a message to the destinations specified
  166        * @param destination Member[] - destination.length > 1
  167        * @param msg Serializable - the message to send
  168        * @param options int - sender options, options can trigger guarantee levels and different interceptors to
  169        * react to the message see class documentation for the <code>Channel</code> object.<br>
  170        * @return UniqueId - the unique Id that was assigned to this message
  171        * @throws ChannelException - if an error occurs processing the message
  172        * @see org.apache.catalina.tribes.Channel
  173        */
  174       public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException {
  175           return send(destination,msg,options,null);
  176       }
  177   
  178       /**
  179        *
  180        * @param destination Member[] - destination.length > 1
  181        * @param msg Serializable - the message to send
  182        * @param options int - sender options, options can trigger guarantee levels and different interceptors to
  183        * react to the message see class documentation for the <code>Channel</code> object.<br>
  184        * @param handler - callback object for error handling and completion notification, used when a message is
  185        * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
  186        * @return UniqueId - the unique Id that was assigned to this message
  187        * @throws ChannelException - if an error occurs processing the message
  188        * @see org.apache.catalina.tribes.Channel
  189        */
  190       public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException {
  191           if ( msg == null ) throw new ChannelException("Cant send a NULL message");
  192           XByteBuffer buffer = null;
  193           try {
  194               if ( destination == null || destination.length == 0) throw new ChannelException("No destination given");
  195               ChannelData data = new ChannelData(true);//generates a unique Id
  196               data.setAddress(getLocalMember(false));
  197               data.setTimestamp(System.currentTimeMillis());
  198               byte[] b = null;
  199               if ( msg instanceof ByteMessage ){
  200                   b = ((ByteMessage)msg).getMessage();
  201                   options = options | SEND_OPTIONS_BYTE_MESSAGE;
  202               } else {
  203                   b = XByteBuffer.serialize(msg);
  204                   options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
  205               }
  206               data.setOptions(options);
  207               //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
  208               buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
  209               buffer.append(b,0,b.length);
  210               data.setMessage(buffer);
  211               InterceptorPayload payload = null;
  212               if ( handler != null ) {
  213                   payload = new InterceptorPayload();
  214                   payload.setErrorHandler(handler);
  215               }
  216               getFirstInterceptor().sendMessage(destination, data, payload);
  217               if ( Logs.MESSAGES.isTraceEnabled() ) {
  218                   Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
  219                   Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " +msg);
  220               }
  221   
  222               return new UniqueId(data.getUniqueId());
  223           }catch ( Exception x ) {
  224               if ( x instanceof ChannelException ) throw (ChannelException)x;
  225               throw new ChannelException(x);
  226           } finally {
  227               if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
  228           }
  229       }
  230   
  231   
  232       /**
  233        * Callback from the interceptor stack. <br>
  234        * When a message is received from a remote node, this method will be invoked by
  235        * the previous interceptor.<br>
  236        * This method can also be used to send a message to other components within the same application,
  237        * but its an extreme case, and you're probably better off doing that logic between the applications itself.
  238        * @param msg ChannelMessage
  239        */
  240       public void messageReceived(ChannelMessage msg) {
  241           if ( msg == null ) return;
  242           try {
  243               if ( Logs.MESSAGES.isTraceEnabled() ) {
  244                   Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
  245               }
  246   
  247               Serializable fwd = null;
  248               if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
  249                   fwd = new ByteMessage(msg.getMessage().getBytes());
  250               } else {
  251                   try {
  252                       fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength());
  253                   }catch (Exception sx) {
  254                       log.error("Unable to deserialize message:"+msg,sx);
  255                       return;
  256                   }
  257               }
  258               if ( Logs.MESSAGES.isTraceEnabled() ) {
  259                   Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
  260               }
  261   
  262               //get the actual member with the correct alive time
  263               Member source = msg.getAddress();
  264               boolean rx = false;
  265               boolean delivered = false;
  266               for ( int i=0; i<channelListeners.size(); i++ ) {
  267                   ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
  268                   if (channelListener != null && channelListener.accept(fwd, source)) {
  269                       channelListener.messageReceived(fwd, source);
  270                       delivered = true;
  271                       //if the message was accepted by an RPC channel, that channel
  272                       //is responsible for returning the reply, otherwise we send an absence reply
  273                       if ( channelListener instanceof RpcChannel ) rx = true;
  274                   }
  275               }//for
  276               if ((!rx) && (fwd instanceof RpcMessage)) {
  277                   //if we have a message that requires a response,
  278                   //but none was given, send back an immediate one
  279                   sendNoRpcChannelReply((RpcMessage)fwd,source);
  280               }
  281               if ( Logs.MESSAGES.isTraceEnabled() ) {
  282                   Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
  283               }
  284   
  285           } catch ( Exception x ) {
  286               //this could be the channel listener throwing an exception, we should log it 
  287               //as a warning.
  288               if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
  289               throw new RemoteProcessException("Exception:"+x.getMessage(),x);
  290           }
  291       }
  292   
  293       /**
  294        * Sends a <code>NoRpcChannelReply</code> message to a member<br>
  295        * This method gets invoked by the channel if a RPC message comes in
  296        * and no channel listener accepts the message. This avoids timeout
  297        * @param msg RpcMessage
  298        * @param destination Member - the destination for the reply
  299        */
  300       protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
  301           try {
  302               //avoid circular loop
  303               if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
  304               RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
  305               send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
  306           } catch ( Exception x ) {
  307               log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x);
  308           }
  309       }
  310   
  311       /**
  312        * memberAdded gets invoked by the interceptor below the channel
  313        * and the channel will broadcast it to the membership listeners
  314        * @param member Member - the new member
  315        */
  316       public void memberAdded(Member member) {
  317           //notify upwards
  318           for (int i=0; i<membershipListeners.size(); i++ ) {
  319               MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
  320               if (membershipListener != null) membershipListener.memberAdded(member);
  321           }
  322       }
  323   
  324       /**
  325        * memberDisappeared gets invoked by the interceptor below the channel
  326        * and the channel will broadcast it to the membership listeners
  327        * @param member Member - the member that left or crashed
  328        */
  329       public void memberDisappeared(Member member) {
  330           //notify upwards
  331           for (int i=0; i<membershipListeners.size(); i++ ) {
  332               MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
  333               if (membershipListener != null) membershipListener.memberDisappeared(member);
  334           }
  335       }
  336   
  337       /**
  338        * Sets up the default implementation interceptor stack
  339        * if no interceptors have been added
  340        * @throws ChannelException
  341        */
  342       protected synchronized void setupDefaultStack() throws ChannelException {
  343   
  344           if ( getFirstInterceptor() != null &&
  345                ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
  346               ChannelInterceptor interceptor = null;
  347               Class clazz = null;
  348               try {
  349                   clazz = Class.forName("org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
  350                                         true,GroupChannel.class.getClassLoader());
  351                   clazz.newInstance();
  352               } catch ( Throwable x ) {
  353                   clazz = MessageDispatchInterceptor.class;
  354               }//catch
  355               try {
  356                   interceptor = (ChannelInterceptor) clazz.newInstance();
  357               } catch (Exception x) {
  358                   throw new ChannelException("Unable to add MessageDispatchInterceptor to interceptor chain.",x);
  359               }
  360               this.addInterceptor(interceptor);
  361           }
  362       }
  363   
  364       /**
  365        * Validates the option flags that each interceptor is using and reports
  366        * an error if two interceptor share the same flag.
  367        * @throws ChannelException
  368        */
  369       protected void checkOptionFlags() throws ChannelException {
  370           StringBuffer conflicts = new StringBuffer();
  371           ChannelInterceptor first = interceptors;
  372           while ( first != null ) {
  373               int flag = first.getOptionFlag();
  374               if ( flag != 0 ) {
  375                   ChannelInterceptor next = first.getNext();
  376                   while ( next != null ) {
  377                       int nflag = next.getOptionFlag();
  378                       if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {
  379                           conflicts.append("[");
  380                           conflicts.append(first.getClass().getName());
  381                           conflicts.append(":");
  382                           conflicts.append(flag);
  383                           conflicts.append(" == ");
  384                           conflicts.append(next.getClass().getName());
  385                           conflicts.append(":");
  386                           conflicts.append(nflag);
  387                           conflicts.append("] ");
  388                       }//end if
  389                       next = next.getNext();
  390                   }//while
  391               }//end if
  392               first = first.getNext();
  393           }//while
  394           if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());
  395   
  396       }
  397   
  398       /**
  399        * Starts the channel
  400        * @param svc int - what service to start
  401        * @throws ChannelException
  402        * @see org.apache.catalina.tribes.Channel#start(int)
  403        */
  404       public synchronized void start(int svc) throws ChannelException {
  405           setupDefaultStack();
  406           if (optionCheck) checkOptionFlags();
  407           super.start(svc);
  408           if ( hbthread == null && heartbeat ) {
  409               hbthread = new HeartbeatThread(this,heartbeatSleeptime);
  410               hbthread.start();
  411           }
  412       }
  413   
  414       /**
  415        * Stops the channel
  416        * @param svc int
  417        * @throws ChannelException
  418        * @see org.apache.catalina.tribes.Channel#stop(int)
  419        */
  420       public synchronized void stop(int svc) throws ChannelException {
  421           if (hbthread != null) {
  422               hbthread.stopHeartbeat();
  423               hbthread = null;
  424           }
  425           super.stop(svc);
  426       }
  427   
  428       /**
  429        * Returns the first interceptor of the stack. Useful for traversal.
  430        * @return ChannelInterceptor
  431        */
  432       public ChannelInterceptor getFirstInterceptor() {
  433           if (interceptors != null) return interceptors;
  434           else return coordinator;
  435       }
  436   
  437       /**
  438        * Returns the channel receiver component
  439        * @return ChannelReceiver
  440        */
  441       public ChannelReceiver getChannelReceiver() {
  442           return coordinator.getClusterReceiver();
  443       }
  444   
  445       /**
  446        * Returns the channel sender component
  447        * @return ChannelSender
  448        */
  449       public ChannelSender getChannelSender() {
  450           return coordinator.getClusterSender();
  451       }
  452   
  453       /**
  454        * Returns the membership service component
  455        * @return MembershipService
  456        */
  457       public MembershipService getMembershipService() {
  458           return coordinator.getMembershipService();
  459       }
  460   
  461       /**
  462        * Sets the channel receiver component
  463        * @param clusterReceiver ChannelReceiver
  464        */
  465       public void setChannelReceiver(ChannelReceiver clusterReceiver) {
  466           coordinator.setClusterReceiver(clusterReceiver);
  467       }
  468   
  469       /**
  470        * Sets the channel sender component
  471        * @param clusterSender ChannelSender
  472        */
  473       public void setChannelSender(ChannelSender clusterSender) {
  474           coordinator.setClusterSender(clusterSender);
  475       }
  476   
  477       /**
  478        * Sets the membership component
  479        * @param membershipService MembershipService
  480        */
  481       public void setMembershipService(MembershipService membershipService) {
  482           coordinator.setMembershipService(membershipService);
  483       }
  484   
  485       /**
  486        * Adds a membership listener to the channel.<br>
  487        * Membership listeners are uniquely identified using the equals(Object) method
  488        * @param membershipListener MembershipListener
  489        */
  490       public void addMembershipListener(MembershipListener membershipListener) {
  491           if (!this.membershipListeners.contains(membershipListener) )
  492               this.membershipListeners.add(membershipListener);
  493       }
  494   
  495       /**
  496        * Removes a membership listener from the channel.<br>
  497        * Membership listeners are uniquely identified using the equals(Object) method
  498        * @param membershipListener MembershipListener
  499        */
  500   
  501       public void removeMembershipListener(MembershipListener membershipListener) {
  502           membershipListeners.remove(membershipListener);
  503       }
  504   
  505       /**
  506        * Adds a channel listener to the channel.<br>
  507        * Channel listeners are uniquely identified using the equals(Object) method
  508        * @param channelListener ChannelListener
  509        */
  510       public void addChannelListener(ChannelListener channelListener) {
  511           if (!this.channelListeners.contains(channelListener) ) {
  512               this.channelListeners.add(channelListener);
  513           } else {
  514               throw new IllegalArgumentException("Listener already exists:"+channelListener+"["+channelListener.getClass().getName()+"]");
  515           }
  516       }
  517   
  518       /**
  519        *
  520        * Removes a channel listener from the channel.<br>
  521        * Channel listeners are uniquely identified using the equals(Object) method
  522        * @param channelListener ChannelListener
  523        */
  524       public void removeChannelListener(ChannelListener channelListener) {
  525           channelListeners.remove(channelListener);
  526       }
  527   
  528       /**
  529        * Returns an iterator of all the interceptors in this stack
  530        * @return Iterator
  531        */
  532       public Iterator getInterceptors() {
  533           return new InterceptorIterator(this.getNext(),this.coordinator);
  534       }
  535   
  536       /**
  537        * Enables/disables the option check<br>
  538        * Setting this to true, will make the GroupChannel perform a conflict check
  539        * on the interceptors. If two interceptors are using the same option flag
  540        * and throw an error upon start.
  541        * @param optionCheck boolean
  542        */
  543       public void setOptionCheck(boolean optionCheck) {
  544           this.optionCheck = optionCheck;
  545       }
  546   
  547       /**
  548        * Configure local heartbeat sleep time<br>
  549        * Only used when <code>getHeartbeat()==true</code>
  550        * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
  551        */
  552       public void setHeartbeatSleeptime(long heartbeatSleeptime) {
  553           this.heartbeatSleeptime = heartbeatSleeptime;
  554       }
  555   
  556       /**
  557        * Enables or disables local heartbeat.
  558        * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal
  559        * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
  560        * @param heartbeat boolean
  561        */
  562       public void setHeartbeat(boolean heartbeat) {
  563           this.heartbeat = heartbeat;
  564       }
  565   
  566       /**
  567        * @see #setOptionCheck(boolean)
  568        * @return boolean
  569        */
  570       public boolean getOptionCheck() {
  571           return optionCheck;
  572       }
  573   
  574       /**
  575        * @see #setHeartbeat(boolean)
  576        * @return boolean
  577        */
  578       public boolean getHeartbeat() {
  579           return heartbeat;
  580       }
  581   
  582       /**
  583        * Returns the sleep time in milliseconds that the internal heartbeat will
  584        * sleep in between invokations of <code>Channel.heartbeat()</code>
  585        * @return long
  586        */
  587       public long getHeartbeatSleeptime() {
  588           return heartbeatSleeptime;
  589       }
  590   
  591       /**
  592        *
  593        * <p>Title: Interceptor Iterator</p>
  594        *
  595        * <p>Description: An iterator to loop through the interceptors in a channel</p>
  596        *
  597        * @version 1.0
  598        */
  599       public static class InterceptorIterator implements Iterator {
  600           private ChannelInterceptor end;
  601           private ChannelInterceptor start;
  602           public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) {
  603               this.end = end;
  604               this.start = start;
  605           }
  606   
  607           public boolean hasNext() {
  608               return start!=null && start != end;
  609           }
  610   
  611           public Object next() {
  612               Object result = null;
  613               if ( hasNext() ) {
  614                   result = start;
  615                   start = start.getNext();
  616               }
  617               return result;
  618           }
  619   
  620           public void remove() {
  621               //empty operation
  622           }
  623       }
  624   
  625       /**
  626        *
  627        * <p>Title: Internal heartbeat thread</p>
  628        *
  629        * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class
  630        * is created</p>
  631        *
  632        * @version 1.0
  633        */
  634       public static class HeartbeatThread extends Thread {
  635           protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(HeartbeatThread.class);
  636           protected static int counter = 1;
  637           protected static synchronized int inc() {
  638               return counter++;
  639           }
  640   
  641           protected boolean doRun = true;
  642           protected GroupChannel channel;
  643           protected long sleepTime;
  644           public HeartbeatThread(GroupChannel channel, long sleepTime) {
  645               super();
  646               this.setPriority(MIN_PRIORITY);
  647               setName("GroupChannel-Heartbeat-"+inc());
  648               setDaemon(true);
  649               this.channel = channel;
  650               this.sleepTime = sleepTime;
  651           }
  652           public void stopHeartbeat() {
  653               doRun = false;
  654               interrupt();
  655           }
  656   
  657           public void run() {
  658               while (doRun) {
  659                   try {
  660                       Thread.sleep(sleepTime);
  661                       channel.heartbeat();
  662                   } catch ( InterruptedException x ) {
  663                       interrupted();
  664                   } catch ( Exception x ) {
  665                       log.error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x);
  666                   }//catch
  667               }//while
  668           }//run
  669       }//HeartbeatThread
  670   
  671   
  672   
  673   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » tribes » group » [javadoc | source]