Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.ha.framework » server » [javadoc | source]
    1   /*
    2     * JBoss, Home of Professional Open Source
    3     * Copyright 2005, JBoss Inc., and individual contributors as indicated
    4     * by the @authors tag. See the copyright.txt in the distribution for a
    5     * full listing of individual contributors.
    6     *
    7     * This is free software; you can redistribute it and/or modify it
    8     * under the terms of the GNU Lesser General Public License as
    9     * published by the Free Software Foundation; either version 2.1 of
   10     * the License, or (at your option) any later version.
   11     *
   12     * This software is distributed in the hope that it will be useful,
   13     * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15     * Lesser General Public License for more details.
   16     *
   17     * You should have received a copy of the GNU Lesser General Public
   18     * License along with this software; if not, write to the Free
   19     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21     */
   22   package org.jboss.ha.framework.server;
   23   
   24   import java.io.ByteArrayInputStream;
   25   import java.io.ByteArrayOutputStream;
   26   import java.io.IOException;
   27   import java.io.InputStream;
   28   import java.io.OutputStream;
   29   import java.io.Serializable;
   30   import java.lang.ref.WeakReference;
   31   import java.net.InetAddress;
   32   import java.text.SimpleDateFormat;
   33   import java.util.ArrayList;
   34   import java.util.Date;
   35   import java.util.HashMap;
   36   import java.util.Map;
   37   import java.util.Vector;
   38   import java.util.concurrent.ConcurrentHashMap;
   39   import java.util.concurrent.CountDownLatch;
   40   
   41   import javax.naming.Context;
   42   import javax.naming.InitialContext;
   43   import javax.naming.Name;
   44   import javax.naming.NameNotFoundException;
   45   import javax.naming.Reference;
   46   import javax.naming.StringRefAddr;
   47   
   48   import org.jboss.cache.Cache;
   49   import org.jboss.cache.CacheManager;
   50   import org.jboss.ha.framework.interfaces.ClusterNode;
   51   import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
   52   import org.jboss.ha.framework.interfaces.DistributedState;
   53   import org.jboss.ha.framework.interfaces.HAPartition;
   54   import org.jboss.invocation.MarshalledValueInputStream;
   55   import org.jboss.invocation.MarshalledValueOutputStream;
   56   import org.jboss.logging.Logger;
   57   import org.jboss.naming.NonSerializableFactory;
   58   import org.jboss.system.ServiceMBeanSupport;
   59   import org.jboss.system.server.ServerConfigUtil;
   60   import org.jboss.util.threadpool.ThreadPool;
   61   import org.jgroups.Address;
   62   import org.jgroups.Channel;
   63   import org.jgroups.ChannelFactory;
   64   import org.jgroups.ExtendedMembershipListener;
   65   import org.jgroups.ExtendedMessageListener;
   66   import org.jgroups.MembershipListener;
   67   import org.jgroups.MergeView;
   68   import org.jgroups.Message;
   69   import org.jgroups.MessageListener;
   70   import org.jgroups.Version;
   71   import org.jgroups.View;
   72   import org.jgroups.blocks.GroupRequest;
   73   import org.jgroups.blocks.MethodCall;
   74   import org.jgroups.blocks.RpcDispatcher;
   75   import org.jgroups.stack.IpAddress;
   76   import org.jgroups.util.Rsp;
   77   import org.jgroups.util.RspList;
   78   
   79   /**
   80    * {@link HAPartition} implementation based on a
   81    * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code>
   82    * and a multiplexed <code>JChannel</code>.
   83    *
   84    * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
   85    * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
   86    * @author Scott.Stark@jboss.org
   87    * @author brian.stansberry@jboss.com
   88    * @version $Revision: 74643 $
   89    */
   90   public class ClusterPartition
   91      extends ServiceMBeanSupport
   92      implements ExtendedMembershipListener, HAPartition,
   93                 AsynchEventHandler.AsynchEventProcessor,
   94                 ClusterPartitionMBean
   95   {
   96      public static final String DEFAULT_CACHE_CONFIG = "ha-partition";
   97      
   98      private static final byte EOF_VALUE   = -1;
   99      private static final byte NULL_VALUE   = 0;
  100      private static final byte SERIALIZABLE_VALUE = 1;
  101      // TODO add Streamable support
  102      // private static final byte STREAMABLE_VALUE = 2;
  103      
  104      /**
  105       * Returned when an RPC call arrives for a service that isn't registered.
  106       */
  107      private static class NoHandlerForRPC implements Serializable
  108      {
  109         static final long serialVersionUID = -1263095408483622838L;
  110      }
  111      
  112      private static class StateStreamEnd implements Serializable
  113      {
  114         /** The serialVersionUID */
  115         private static final long serialVersionUID = -3705345735451504946L;
  116      }
  117      
  118      /**
  119       * Used internally when an RPC call requires a custom classloader for unmarshalling
  120       */
  121      private static class HAServiceResponse implements Serializable
  122      {
  123         private static final long serialVersionUID = -6485594652749906437L;
  124         private final String serviceName;
  125         private final byte[] payload;
  126              
  127         public HAServiceResponse(String serviceName, byte[] payload)
  128         {
  129            this.serviceName = serviceName;
  130            this.payload = payload;
  131         }
  132              
  133         public String getServiceName()
  134         {
  135            return this.serviceName;
  136         }
  137              
  138         public byte[] getPayload()
  139         {
  140            return this.payload;
  141         }
  142      }
  143      
  144      private class ChannelConnectTask implements Runnable
  145      {
  146         private final CountDownLatch latch;
  147         
  148         private ChannelConnectTask(CountDownLatch latch)
  149         {
  150            this.latch = latch;
  151         }
  152         
  153         public void run()
  154         {
  155            try
  156            {
  157               ClusterPartition.this.channel.connect(ClusterPartition.this.getPartitionName());
  158            }
  159            catch (Exception e)
  160            {
  161               synchronized (ClusterPartition.this.channelLock)
  162               {
  163                  ClusterPartition.this.connectException = e;
  164               }
  165            }
  166            finally
  167            {
  168               this.latch.countDown();
  169            }
  170         }
  171      }
  172   
  173      // Constants -----------------------------------------------------
  174   
  175      // final MethodLookup method_lookup_clos = new MethodLookupClos();
  176   
  177      // Attributes ----------------------------------------------------
  178   
  179      private   CacheManager cacheManager;
  180      private   String cacheConfigName = DEFAULT_CACHE_CONFIG;
  181      private   Cache cache;
  182      private   ChannelFactory channelFactory;
  183      private   String stackName;
  184      private   String partitionName = ServerConfigUtil.getDefaultPartitionName();
  185      private   boolean deadlock_detection = false;
  186      private   InetAddress nodeAddress = null;
  187      private   long state_transfer_timeout=60000;
  188      private   long method_call_timeout=60000;
  189      
  190      /** Thread pool used to asynchronously start our channel */
  191      private   ThreadPool threadPool;
  192      
  193      protected Map<String, Object> rpcHandlers = new ConcurrentHashMap<String, Object>();
  194      protected Map<String, HAPartitionStateTransfer> stateHandlers = new HashMap<String, HAPartitionStateTransfer>();
  195      /** Do we send any membership change notifications synchronously? */
  196      protected boolean allowSyncListeners = false;
  197      /** The HAMembershipListener and HAMembershipExtendedListeners */
  198      protected ArrayList<HAMembershipListener> synchListeners = new ArrayList<HAMembershipListener>();
  199      /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
  200      protected ArrayList<HAMembershipListener> asynchListeners = new ArrayList<HAMembershipListener>();
  201      /** The handler used to send membership change notifications asynchronously */
  202      protected AsynchEventHandler asynchHandler;
  203      /** The current cluster partition members */
  204      protected Vector<ClusterNode> members = null;
  205      protected Vector<Address> jgmembers = null;
  206      protected Map<String, WeakReference<ClassLoader>> clmap =
  207                                             new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
  208   
  209      public Vector<String> history = new Vector<String>();
  210   
  211      /** The partition members other than this node */
  212      protected Vector<ClusterNode> otherMembers = null;
  213      protected Vector<Address> jgotherMembers = null;
  214      /** the local JG IP Address */
  215      protected Address localJGAddress = null;
  216      /** The cluster transport protocol address string */
  217      protected String nodeName;
  218      /** me as a ClusterNode */
  219      protected ClusterNode me = null;
  220      /** The JGroups partition channel */
  221      protected Channel channel;
  222      /** The cluster replicant manager */
  223      protected DistributedReplicantManagerImpl replicantManager;
  224      /** The DistributedState service we manage */
  225      protected DistributedStateImpl distributedState;
  226      /** The cluster instance log category */
  227      protected Logger log;
  228      protected Logger clusterLifeCycleLog;
  229      /** The current cluster view id */
  230      protected long currentViewId = -1;
  231      /** Whether to bind the partition into JNDI */
  232      protected boolean bindIntoJndi = true;
  233      
  234      private final ThreadGate flushBlockGate = new ThreadGate();
  235      
  236      private RpcDispatcher dispatcher = null;
  237   
  238      /**
  239       * True if serviceState was initialized during start-up.
  240       */
  241      protected boolean isStateSet = false;
  242   
  243      /**
  244       * An exception occuring upon fetch serviceState.
  245       */
  246      protected Exception setStateException;
  247      /**
  248       * An exception occuring during channel connect
  249       */
  250      protected Exception connectException;
  251      private final Object channelLock = new Object();
  252      private final MessageListenerAdapter messageListener = new MessageListenerAdapter();
  253   
  254      // Static --------------------------------------------------------
  255      
  256      private Channel createChannel()
  257      {
  258         ChannelFactory factory = this.getChannelFactory();
  259         if (factory == null)
  260         {
  261            throw new IllegalStateException("HAPartitionConfig has no JChannelFactory");
  262         }
  263         String stack = this.getChannelStackName();
  264         if (stack == null)
  265         {
  266            throw new IllegalStateException("HAPartitionConfig has no multiplexer stack");
  267         }
  268         try
  269         {
  270            return factory.createMultiplexerChannel(stack, this.getPartitionName());
  271         }
  272         catch (RuntimeException e)
  273         {
  274            throw e;
  275         }
  276         catch (Exception e)
  277         {
  278            throw new RuntimeException("Failure creating multiplexed Channel", e);
  279         }
  280      }
  281   
  282       // Constructors --------------------------------------------------
  283      
  284      public ClusterPartition()
  285      {
  286         this.logHistory("Partition object created");
  287      }
  288   
  289      // ------------------------------------------------------------ ServiceMBean
  290      
  291      // ----------------------------------------------------------------- Service
  292      
  293      protected void createService() throws Exception
  294      {
  295         if (this.replicantManager == null)
  296         {
  297            throw new IllegalStateException("DistributedReplicantManager property must be set before creating ClusterPartition service");
  298         }
  299   
  300         this.setupLoggers(this.getPartitionName());
  301         
  302         this.replicantManager.createService();
  303         
  304         if (this.distributedState != null)
  305         {
  306            this.distributedState.createService();
  307         }
  308         
  309         // Create the asynchronous handler for view changes
  310         this.asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
  311         
  312         this.log.debug("done initializing partition");
  313      }
  314      
  315      protected void startService() throws Exception
  316      {
  317         this.logHistory ("Starting partition");
  318         
  319         this.cache = this.cacheManager.getCache(this.cacheConfigName, true);
  320         this.channelFactory = this.cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
  321         this.stackName = this.cache.getConfiguration().getMultiplexerStack();
  322         
  323         if (this.channel == null || !this.channel.isOpen())
  324         {
  325            this.log.debug("Creating Channel for partition " + this.getPartitionName() +
  326                  " using stack " + this.getChannelStackName());
  327      
  328            this.channel = this.createChannel();
  329            
  330            this.channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
  331            this.channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
  332         }
  333         
  334         this.log.info("Initializing partition " + this.getPartitionName());
  335         this.logHistory ("Initializing partition " + this.getPartitionName());
  336         
  337         this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), this.getDeadlockDetection());
  338         
  339         // Subscribe to events generated by the channel
  340         this.log.debug("setMembershipListener");
  341         this.dispatcher.setMembershipListener(this);
  342         this.log.debug("setMessageListener");
  343         this.dispatcher.setMessageListener(this.messageListener);
  344         this.dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
  345         this.dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
  346         
  347         // Clear any old connectException
  348         this.connectException = null;
  349         CountDownLatch connectLatch = new CountDownLatch(1);
  350         
  351         if (this.threadPool == null)
  352         {
  353            this.channel.connect(this.getPartitionName());
  354            connectLatch.countDown();
  355         }
  356         else
  357         {
  358            // Do the channel connect in another thread while this
  359            // thread starts the cache and does that channel connect
  360            ChannelConnectTask task = new ChannelConnectTask(connectLatch);
  361            this.threadPool.run(task);
  362         }
  363         
  364         this.cache.start();
  365         
  366         try
  367         {
  368            // This will block waiting for any async channel connect above
  369            connectLatch.await();
  370            
  371            if (this.connectException != null)
  372            {
  373               throw this.connectException;
  374            }
  375            
  376            this.log.debug("Get current members");
  377            this.waitForView();
  378            
  379            // get current JG group properties
  380            this.log.debug("get nodeName");
  381            this.localJGAddress = this.channel.getLocalAddress();
  382            this.me = new ClusterNodeImpl((IpAddress) this.localJGAddress);
  383            this.nodeName = this.me.getName();
  384   
  385            this.verifyNodeIsUnique();
  386   
  387            this.fetchState();
  388            
  389            this.replicantManager.startService();
  390            
  391            if (this.distributedState != null)
  392            {
  393               this.distributedState.setClusteredCache(this.getClusteredCache());
  394               this.distributedState.startService();
  395            }
  396            
  397            // Start the asynch listener handler thread
  398            this.asynchHandler.start();
  399            
  400            // Register with the service locator
  401            HAPartitionLocator.getHAPartitionLocator().registerHAPartition(this);
  402            
  403            // Bind ourself in the public JNDI space if configured to do so
  404            if (this.bindIntoJndi)
  405            {
  406               Context ctx = new InitialContext();
  407               this.bind(HAPartitionLocator.getStandardJndiBinding(this.getPartitionName()),
  408                         this, ClusterPartition.class, ctx);
  409               this.log.debug("Bound in JNDI under /HAPartition/" + this.getPartitionName());
  410            }
  411         }
  412         catch (Throwable t)
  413         {
  414            this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
  415            this.channel.close();
  416            this.channel = null;
  417            throw (t instanceof Exception) ? (Exception) t : new RuntimeException(t);
  418         }
  419         
  420      }
  421   
  422      protected void stopService() throws Exception
  423      {
  424         this.logHistory ("Stopping partition");
  425         this.log.info("Stopping partition " + this.getPartitionName());
  426   
  427         try
  428         {
  429            this.asynchHandler.stop();
  430         }
  431         catch( Exception e)
  432         {
  433            this.log.warn("Failed to stop asynchHandler", e);
  434         }
  435         
  436         if (this.distributedState != null)
  437         {
  438            this.distributedState.stopService();
  439         }
  440   
  441         this.replicantManager.stopService();
  442         
  443         try
  444         {
  445            this.cacheManager.releaseCache(this.cacheConfigName);
  446         }
  447         catch (Exception e)
  448         {
  449            this.log.error("cache release failed", e);
  450         }
  451         
  452   //    NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
  453   //    add the destroyPartition() step
  454         try
  455         {
  456            if (this.channel != null && this.channel.isConnected())
  457            {
  458               this.channel.disconnect();
  459            }
  460         }
  461         catch (Exception e)
  462         {
  463            this.log.error("channel disconnection failed", e);
  464         }
  465   
  466         if (this.bindIntoJndi)
  467         {
  468            String boundName = HAPartitionLocator.getStandardJndiBinding(this.getPartitionName());
  469            InitialContext ctx = null;
  470            try
  471            {
  472               // the following statement fails when the server is being shut down (07/19/2007)
  473               ctx = new InitialContext();
  474               ctx.unbind(boundName);
  475            }
  476            catch (Exception e) {
  477               this.log.error("partition unbind operation failed", e);
  478            }
  479            finally
  480            {
  481               if (ctx != null)
  482               {
  483                  ctx.close();
  484               }
  485            }
  486            NonSerializableFactory.unbind(boundName);
  487         }
  488         
  489         HAPartitionLocator.getHAPartitionLocator().deregisterHAPartition(this);
  490   
  491         this.log.info("Partition " + this.getPartitionName() + " stopped.");
  492      }
  493      
  494      protected void destroyService()  throws Exception
  495      {
  496         this.log.debug("Destroying HAPartition: " + this.getPartitionName());
  497         
  498         if (this.distributedState != null)
  499         {
  500            this.distributedState.destroyService();
  501         }
  502   
  503         this.replicantManager.destroyService();
  504   
  505         try
  506         {
  507            if (this.channel != null && this.channel.isOpen())
  508            {
  509               this.channel.close();
  510            }
  511         }
  512         catch (Exception e)
  513         {
  514            this.log.error("Closing channel failed", e);
  515         }
  516   
  517         this.log.info("Partition " + this.getPartitionName() + " destroyed.");
  518      }
  519      
  520      // ---------------------------------------------------------- State Transfer
  521   
  522   
  523      protected void fetchState() throws Exception
  524      {
  525         this.log.info("Fetching serviceState (will wait for " + this.getStateTransferTimeout() +
  526               " milliseconds):");
  527         long start, stop;
  528         this.isStateSet = false;
  529         start = System.currentTimeMillis();
  530         boolean rc = this.channel.getState(null, this.getStateTransferTimeout());
  531         if (rc)
  532         {
  533            synchronized (this.channelLock)
  534            {
  535               while (!this.isStateSet)
  536               {
  537                  if (this.setStateException != null)
  538                  {
  539                     throw this.setStateException;
  540                  }
  541   
  542                  try
  543                  {
  544                     this.channelLock.wait();
  545                  }
  546                  catch (InterruptedException iex)
  547                  {
  548                  }
  549               }
  550            }
  551            stop = System.currentTimeMillis();
  552            this.log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
  553         }
  554         else
  555         {
  556            // No one provided us with serviceState.
  557            // We need to find out if we are the coordinator, so we must
  558            // block until viewAccepted() is called at least once
  559   
  560            synchronized (this.members)
  561            {
  562               while (this.members.size() == 0)
  563               {
  564                  this.log.debug("waiting on viewAccepted()");
  565                  try
  566                  {
  567                     this.members.wait();
  568                  }
  569                  catch (InterruptedException iex)
  570                  {
  571                  }
  572               }
  573            }
  574   
  575            if (this.isCurrentNodeCoordinator())
  576            {
  577               this.log.info("State could not be retrieved (we are the first member in group)");
  578            }
  579            else
  580            {
  581               throw new IllegalStateException("Initial serviceState transfer failed: " +
  582                  "Channel.getState() returned false");
  583            }
  584         }
  585      }
  586   
  587      private void getStateInternal(OutputStream stream) throws IOException
  588      {
  589         MarshalledValueOutputStream mvos = null; // don't create until we know we need it
  590         
  591         for (Map.Entry<String, HAPartitionStateTransfer> entry: this.stateHandlers.entrySet())
  592         {
  593            HAPartitionStateTransfer subscriber = entry.getValue();
  594            this.log.debug("getState for " + entry.getKey());
  595            Object state = subscriber.getCurrentState();
  596            if (state != null)
  597            {
  598               if (mvos == null)
  599               {
  600                  // This is our first write, so need to write the header first
  601                  stream.write(SERIALIZABLE_VALUE);
  602                  
  603                  mvos = new MarshalledValueOutputStream(stream);
  604               }
  605               
  606               mvos.writeObject(entry.getKey());
  607               mvos.writeObject(state);
  608            }
  609         }
  610         
  611         if (mvos == null)
  612         {
  613            // We never wrote any serviceState, so write the NULL header
  614            stream.write(NULL_VALUE);
  615         }
  616         else
  617         {
  618            mvos.writeObject(new StateStreamEnd());
  619            mvos.flush();
  620            mvos.close();
  621         }
  622         
  623      }
  624      
  625      private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
  626      {
  627         byte type = (byte) stream.read();
  628         
  629         if (type == EOF_VALUE)
  630         {
  631            this.log.debug("serviceState stream is empty");
  632            return;
  633         }
  634         else if (type == NULL_VALUE)
  635         {
  636            this.log.debug("serviceState is null");
  637            return;
  638         }
  639         
  640         long used_mem_before, used_mem_after;
  641         Runtime rt=Runtime.getRuntime();
  642         used_mem_before=rt.totalMemory() - rt.freeMemory();
  643         
  644         MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
  645         
  646         while (true)
  647         {
  648            Object obj = mvis.readObject();
  649            if (obj instanceof StateStreamEnd)
  650            {
  651               break;
  652            }
  653            
  654            String key = (String) obj;
  655            this.log.debug("setState for " + key);
  656            Object someState = mvis.readObject();
  657            HAPartitionStateTransfer subscriber = this.stateHandlers.get(key);
  658            if (subscriber != null)
  659            {
  660               try
  661               {
  662                  subscriber.setCurrentState((Serializable)someState);
  663               }
  664               catch (Exception e)
  665               {
  666                  // Don't let issues with one subscriber affect others
  667                  // unless it is DRM, which is really an internal function
  668                  // of the HAPartition
  669                  // FIXME remove this once DRM is JBC-based
  670                  if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
  671                  {
  672                     if (e instanceof RuntimeException)
  673                     {
  674                        throw (RuntimeException) e;
  675                     }
  676   
  677                     throw new RuntimeException(e);
  678                  }
  679   
  680                  this.log.error("Caught exception setting serviceState to " + subscriber, e);
  681               }
  682            }
  683            else
  684            {
  685               this.log.debug("There is no stateHandler for: " + key);
  686            }
  687         }
  688         
  689         try
  690         {
  691            stream.close();
  692         }
  693         catch(Exception e)
  694         {
  695            this.log.error("Caught exception closing serviceState stream", e);
  696         }
  697   
  698         used_mem_after=rt.totalMemory() - rt.freeMemory();
  699         this.log.debug("received serviceState; expanded memory by " +
  700               (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
  701               ", used memory after: " + used_mem_after + ")");
  702      }
  703   
  704      private void recordSetStateFailure(Throwable t)
  705      {
  706         this.log.error("failed setting serviceState", t);
  707         if (t instanceof Exception)
  708         {
  709            this.setStateException = (Exception) t;
  710         }
  711         else
  712         {
  713            this.setStateException = new Exception(t);
  714         }
  715      }
  716   
  717      private void notifyChannelLock()
  718      {
  719         synchronized (this.channelLock)
  720         {
  721            this.channelLock.notifyAll();
  722         }
  723      }
  724      
  725      // org.jgroups.MembershipListener implementation ----------------------------------------------
  726      
  727      public void suspect(org.jgroups.Address suspected_mbr)
  728      {
  729         this.logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
  730         if (this.isCurrentNodeCoordinator ())
  731         {
  732            this.clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
  733         }
  734         else
  735         {
  736            this.log.info("Suspected member: " + suspected_mbr);
  737         }
  738      }
  739   
  740      public void block()
  741      {
  742          this.flushBlockGate.close();
  743          this.log.debug("Block processed at " + this.me);
  744      }
  745      
  746      public void unblock()
  747      {
  748          this.flushBlockGate.open();
  749          this.log.debug("Unblock processed at " + this.me);
  750      }
  751      
  752      /** Notification of a cluster view change. This is done from the JG protocol
  753       * handlder thread and we must be careful to not unduly block this thread.
  754       * Because of this there are two types of listeners, synchronous and
  755       * asynchronous. The synchronous listeners are messaged with the view change
  756       * event using the calling thread while the asynchronous listeners are
  757       * messaged using a seperate thread.
  758       *
  759       * @param newView
  760       */
  761      public void viewAccepted(View newView)
  762      {
  763         try
  764         {
  765            // we update the view id
  766            this.currentViewId = newView.getVid().getId();
  767   
  768            // Keep a list of other members only for "exclude-self" RPC calls
  769            this.jgotherMembers = (Vector)newView.getMembers().clone();
  770            this.jgotherMembers.remove (this.channel.getLocalAddress());
  771            this.otherMembers = this.translateAddresses (this.jgotherMembers); // TRANSLATE!
  772            Vector<ClusterNode> translatedNewView = this.translateAddresses ((Vector)newView.getMembers().clone());
  773            this.logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
  774                        " (old view: " + this.members + " )");
  775   
  776   
  777            // Save the previous view and make a copy of the new view
  778            Vector<ClusterNode> oldMembers = this.members;
  779   
  780            Vector<Address> newjgMembers = (Vector)newView.getMembers().clone();
  781            Vector<ClusterNode> newMembers = this.translateAddresses(newjgMembers); // TRANSLATE
  782            this.members = newMembers;
  783            this.jgmembers = newjgMembers;
  784            
  785            if (oldMembers == null)
  786            {
  787               // Initial viewAccepted
  788               this.log.debug("ViewAccepted: initial members set for partition " + this.getPartitionName() + ": " +
  789                        this.currentViewId + " (" + this.members + ")");
  790               
  791               this.log.info("Number of cluster members: " + this.members.size());
  792               for(int m = 0; m > this.members.size(); m ++)
  793               {
  794                  Object node = this.members.get(m);
  795                  this.log.debug(node);
  796               }
  797               this.log.info ("Other members: " + this.otherMembers.size ());
  798               
  799               // Wake up the deployer thread blocking in waitForView
  800               this.notifyChannelLock();
  801               return;
  802            }
  803            
  804            int difference = newMembers.size() - oldMembers.size();
  805            
  806            if (this.isCurrentNodeCoordinator ())
  807            {
  808               this.clusterLifeCycleLog.info ("New cluster view for partition " + this.getPartitionName() + " (id: " +
  809                                         this.currentViewId + ", delta: " + difference + ") : " + this.members);
  810            }
  811            else
  812            {
  813               this.log.info("New cluster view for partition " + this.getPartitionName() + ": " +
  814                        this.currentViewId + " (" + this.members + " delta: " + difference + ")");
  815            }
  816   
  817            // Build a ViewChangeEvent for the asynch listeners
  818            ViewChangeEvent event = new ViewChangeEvent();
  819            event.viewId = this.currentViewId;
  820            event.allMembers = translatedNewView;
  821            event.deadMembers = this.getDeadMembers(oldMembers, event.allMembers);
  822            event.newMembers = this.getNewMembers(oldMembers, event.allMembers);
  823            event.originatingGroups = null;
  824            // if the new view occurs because of a merge, we first inform listeners of the merge
  825            if(newView instanceof MergeView)
  826            {
  827               MergeView mergeView = (MergeView) newView;
  828               event.originatingGroups = mergeView.getSubgroups();
  829            }
  830   
  831            this.log.debug("membership changed from " + oldMembers.size() + " to " + event.allMembers.size());
  832            // Put the view change to the asynch queue
  833            this.asynchHandler.queueEvent(event);
  834   
  835            // Broadcast the new view to the synchronous view change listeners
  836            if (this.allowSyncListeners)
  837            {
  838               this.notifyListeners(this.synchListeners, event.viewId, event.allMembers,
  839                     event.deadMembers, event.newMembers, event.originatingGroups);
  840            }
  841         }
  842         catch (Exception ex)
  843         {
  844            this.log.error("ViewAccepted failed", ex);
  845         }
  846      }
  847   
  848      private void waitForView() throws Exception
  849      {
  850         synchronized (this.channelLock)
  851         {
  852            if (this.members == null)
  853            {
  854               if (this.connectException != null)
  855               {
  856                  throw this.connectException;
  857               }
  858               
  859               try
  860               {
  861                  this.channelLock.wait(this.getMethodCallTimeout());
  862               }
  863               catch (InterruptedException iex)
  864               {
  865               }
  866               
  867               if (this.connectException != null)
  868               {
  869                  throw this.connectException;
  870               }
  871               
  872               if (this.members == null)
  873               {
  874                  throw new IllegalStateException("No view received from Channel");
  875               }
  876            }
  877         }
  878      }
  879   
  880      // HAPartition implementation ----------------------------------------------
  881      
  882      public String getNodeName()
  883      {
  884         return this.nodeName;
  885      }
  886      
  887      public String getPartitionName()
  888      {
  889         return this.partitionName;
  890      }
  891   
  892      public void setPartitionName(String newName)
  893      {
  894         this.partitionName = newName;
  895      }
  896      
  897      public DistributedReplicantManager getDistributedReplicantManager()
  898      {
  899         return this.replicantManager;
  900      }
  901      
  902      public DistributedState getDistributedStateService()
  903      {
  904         return this.distributedState;
  905      }
  906   
  907      public long getCurrentViewId()
  908      {
  909         return this.currentViewId;
  910      }
  911      
  912      public Vector<String> getCurrentView()
  913      {
  914         Vector<String> result = new Vector<String>(this.members.size());
  915         for (ClusterNode member: this.members)
  916         {
  917            result.add(member.getName());
  918         }
  919         return result;
  920      }
  921   
  922      public ClusterNode[] getClusterNodes ()
  923      {
  924         synchronized (this.members)
  925         {
  926            return this.members.toArray(new ClusterNode[this.members.size()]);
  927         }
  928      }
  929   
  930      public ClusterNode getClusterNode ()
  931      {
  932         return this.me;
  933      }
  934   
  935      public boolean isCurrentNodeCoordinator ()
  936      {
  937         if(this.members == null || this.members.size() == 0 || this.me == null)
  938         {
  939            return false;
  940         }
  941        return this.members.elementAt (0).equals (this.me);
  942      }
  943   
  944      // ***************************
  945      // ***************************
  946      // RPC multicast communication
  947      // ***************************
  948      // ***************************
  949      //
  950      public void registerRPCHandler(String objName, Object subscriber)
  951      {
  952         this.rpcHandlers.put(objName, subscriber);
  953      }
  954      
  955      public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
  956      {
  957         this.registerRPCHandler(objName, subscriber);
  958         this.clmap.put(objName, new WeakReference<ClassLoader>(classloader));
  959      }
  960      
  961      public void unregisterRPCHandler(String objName, Object subscriber)
  962      {
  963         this.rpcHandlers.remove(objName);
  964         this.clmap.remove(objName);
  965      }
  966   
  967      /**
  968       * This function is an abstraction of RpcDispatcher.
  969       */
  970      public ArrayList callMethodOnCluster(String objName, String methodName,
  971         Object[] args, Class[] types, boolean excludeSelf) throws Exception
  972      {
  973         return this.callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.getMethodCallTimeout());
  974      }
  975   
  976   
  977      public ArrayList callMethodOnCluster(String objName, String methodName,
  978          Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
  979      {
  980         RspList rsp = null;
  981         boolean trace = this.log.isTraceEnabled();
  982   
  983         MethodCall m = new MethodCall(objName + "." + methodName, args, types);
  984         
  985         if(this.channel.flushSupported())
  986         {
  987        	 this.flushBlockGate.await(this.getStateTransferTimeout());
  988         }
  989         if (excludeSelf)
  990         {
  991            if( trace )
  992            {
  993               this.log.trace("callMethodOnCluster(true), objName="+objName
  994                  +", methodName="+methodName+", members="+this.jgotherMembers);
  995            }
  996            rsp = this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
  997         }
  998         else
  999         {
 1000            if( trace )
 1001            {
 1002               this.log.trace("callMethodOnCluster(false), objName="+objName
 1003                  +", methodName="+methodName+", members="+this.members);
 1004            }
 1005            rsp = this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
 1006         }
 1007   
 1008         return this.processResponseList(rsp, trace);
 1009       }
 1010   
 1011      /**
 1012       * Calls method on Cluster coordinator node only.  The cluster coordinator node is the first node to join the
 1013       * cluster.
 1014       * and is replaced
 1015       * @param objName
 1016       * @param methodName
 1017       * @param args
 1018       * @param types
 1019       * @param excludeSelf
 1020       * @return an array of responses from remote nodes
 1021       * @throws Exception
 1022       */
 1023      public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
 1024             Object[] args, Class[] types,boolean excludeSelf) throws Exception
 1025      {
 1026         return this.callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, this.getMethodCallTimeout());
 1027      }
 1028   
 1029      /**
 1030       * Calls method on Cluster coordinator node only.  The cluster coordinator node is the first node to join the
 1031       * cluster.
 1032       * and is replaced
 1033       * @param objName
 1034       * @param methodName
 1035       * @param args
 1036       * @param types
 1037       * @param excludeSelf
 1038       * @param methodTimeout
 1039       * @return an array of responses from remote nodes
 1040       * @throws Exception
 1041       */
 1042      public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
 1043             Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
 1044      {
 1045         boolean trace = this.log.isTraceEnabled();
 1046   
 1047         MethodCall m = new MethodCall(objName + "." + methodName, args, types);
 1048         
 1049         if( trace )
 1050         {
 1051            this.log.trace("callMethodOnCoordinatorNode(false), objName="+objName
 1052               +", methodName="+methodName);
 1053         }
 1054   
 1055         // the first cluster view member is the coordinator
 1056         Vector<Address> coordinatorOnly = new Vector<Address>();
 1057         // If we are the coordinator, only call ourself if 'excludeSelf' is false
 1058         if (false == this.isCurrentNodeCoordinator () ||
 1059             false == excludeSelf)
 1060         {
 1061            coordinatorOnly.addElement(this.jgmembers.elementAt(0));
 1062         }
 1063         
 1064         RspList rsp = this.dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
 1065   
 1066         return this.processResponseList(rsp, trace);
 1067      }
 1068   
 1069       /**
 1070        * Calls method synchrounously on target node only.
 1071        * @param serviceName Name of the target service name on which calls are de-multiplexed
 1072        * @param methodName name of the Java method to be called on remote services
 1073        * @param args array of Java Object representing the set of parameters to be
 1074        * given to the remote method
 1075        * @param types The types of the parameters
 1076        * node of the partition or only on remote nodes
 1077        * @param targetNode is the target of the call
 1078        * @return the value returned by the target method
 1079        * @throws Exception Throws if a communication exception occurs
 1080        */
 1081      public Object callMethodOnNode(String serviceName, String methodName,
 1082              Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
 1083       {
 1084          if (!(targetNode instanceof ClusterNodeImpl))
 1085         {
 1086            throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
 1087                                             ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
 1088         }
 1089          boolean trace = this.log.isTraceEnabled();
 1090          
 1091          MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
 1092   
 1093          if( trace )
 1094          {
 1095             this.log.trace("callMethodOnNode( objName="+serviceName
 1096                +", methodName="+methodName);
 1097          }
 1098          Object rc = this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_FIRST, methodTimeout);
 1099          if (rc != null)
 1100          {
 1101             Object item = rc;
 1102             if (item instanceof Rsp)
 1103             {
 1104                Rsp response = (Rsp) item;
 1105                // Only include received responses
 1106                boolean wasReceived = response.wasReceived();
 1107                if( wasReceived == true )
 1108                {
 1109                   item = response.getValue();
 1110                   if (!(item instanceof NoHandlerForRPC))
 1111                  {
 1112                     rc = item;
 1113                  }
 1114                   }
 1115                   else if( trace )
 1116                  {
 1117                     this.log.trace("Ignoring non-received response: "+response);
 1118                  }
 1119                }
 1120                else
 1121                {
 1122                   if (!(item instanceof NoHandlerForRPC))
 1123                  {
 1124                     rc = item;
 1125                  }
 1126                  else if( trace )
 1127                  {
 1128                     this.log.trace("Ignoring NoHandlerForRPC");
 1129                  }
 1130                }
 1131             }
 1132          return rc;
 1133        }
 1134   
 1135   
 1136      /**
 1137        * Calls method on target node only.
 1138        * @param serviceName Name of the target service name on which calls are de-multiplexed
 1139        * @param methodName name of the Java method to be called on remote services
 1140        * @param args array of Java Object representing the set of parameters to be
 1141        * given to the remote method
 1142        * @param types The types of the parameters
 1143        * node of the partition or only on remote nodes
 1144        * @param targetNode is the target of the call
 1145        * @return none
 1146        * @throws Exception Throws if a communication exception occurs
 1147        */
 1148      public void callAsyncMethodOnNode(String serviceName, String methodName,
 1149              Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
 1150      {
 1151         if (!(targetNode instanceof ClusterNodeImpl))
 1152         {
 1153            throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
 1154                                            ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
 1155         }
 1156          boolean trace = this.log.isTraceEnabled();
 1157   
 1158          MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
 1159   
 1160          if( trace )
 1161          {
 1162             this.log.trace("callAsyncMethodOnNode( objName="+serviceName
 1163                +", methodName="+methodName);
 1164          }
 1165          this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_NONE, methodTimeout);
 1166      }
 1167   
 1168      private ArrayList processResponseList(RspList rsp, boolean trace)
 1169      {
 1170         ArrayList rtn = new ArrayList();
 1171         if (rsp != null)
 1172         {
 1173            for (Object item : rsp.values())
 1174            {
 1175               if (item instanceof Rsp)
 1176               {
 1177                  Rsp response = (Rsp) item;
 1178                  // Only include received responses
 1179                  boolean wasReceived = response.wasReceived();
 1180                  if( wasReceived == true )
 1181                  {
 1182                     item = response.getValue();
 1183                     if (!(item instanceof NoHandlerForRPC))
 1184                     {
 1185                        rtn.add(item);
 1186                     }
 1187                  }
 1188                  else if( trace )
 1189                  {
 1190                     this.log.trace("Ignoring non-received response: "+response);
 1191                  }
 1192               }
 1193               else
 1194               {
 1195                  if (!(item instanceof NoHandlerForRPC))
 1196                  {
 1197                     rtn.add(item);
 1198                  }
 1199                  else if( trace )
 1200                  {
 1201                     this.log.trace("Ignoring NoHandlerForRPC");
 1202                  }
 1203               }
 1204            }
 1205            
 1206         }
 1207         return rtn;
 1208      }
 1209   
 1210      /**
 1211       * This function is an abstraction of RpcDispatcher for asynchronous messages
 1212       */
 1213      public void callAsynchMethodOnCluster(String objName, String methodName,
 1214         Object[] args, Class[] types, boolean excludeSelf) throws Exception
 1215      {
 1216         boolean trace = this.log.isTraceEnabled();
 1217   
 1218         MethodCall m = new MethodCall(objName + "." + methodName, args, types);
 1219   
 1220         if(this.channel.flushSupported())
 1221         {
 1222        	 this.flushBlockGate.await(this.getStateTransferTimeout());
 1223         }
 1224         if (excludeSelf)
 1225         {
 1226            if( trace )
 1227            {
 1228               this.log.trace("callAsynchMethodOnCluster(true), objName="+objName
 1229                  +", methodName="+methodName+", members="+this.jgotherMembers);
 1230            }
 1231            this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
 1232         }
 1233         else
 1234         {
 1235            if( trace )
 1236            {
 1237               this.log.trace("callAsynchMethodOnCluster(false), objName="+objName
 1238                  +", methodName="+methodName+", members="+this.members);
 1239            }
 1240            this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
 1241         }
 1242      }
 1243      
 1244      // *************************
 1245      // *************************
 1246      // State transfer management
 1247      // *************************
 1248      // *************************
 1249      //
 1250      public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
 1251      {
 1252         this.stateHandlers.put(objectName, subscriber);
 1253      }
 1254      
 1255      public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
 1256      {
 1257         this.stateHandlers.remove(objectName);
 1258      }
 1259      
 1260      // *************************
 1261      // *************************
 1262      // Group Membership listeners
 1263      // *************************
 1264      // *************************
 1265      //
 1266      public void registerMembershipListener(HAMembershipListener listener)
 1267      {
 1268         boolean isAsynch = (this.allowSyncListeners == false)
 1269               || (listener instanceof AsynchHAMembershipListener)
 1270               || (listener instanceof AsynchHAMembershipExtendedListener);
 1271         if( isAsynch ) {
 1272            synchronized(this.asynchListeners) {
 1273               this.asynchListeners.add(listener);
 1274            }
 1275         }
 1276         else  {
 1277            synchronized(this.synchListeners) {
 1278               this.synchListeners.add(listener);
 1279            }
 1280         }
 1281      }
 1282      
 1283      public void unregisterMembershipListener(HAMembershipListener listener)
 1284      {
 1285         boolean isAsynch = (this.allowSyncListeners == false)
 1286               || (listener instanceof AsynchHAMembershipListener)
 1287               || (listener instanceof AsynchHAMembershipExtendedListener);
 1288         if( isAsynch ) {
 1289            synchronized(this.asynchListeners) {
 1290               this.asynchListeners.remove(listener);
 1291            }
 1292         }
 1293         else  {
 1294            synchronized(this.synchListeners) {
 1295               this.synchListeners.remove(listener);
 1296            }
 1297         }
 1298      }
 1299      
 1300      public boolean getAllowSynchronousMembershipNotifications()
 1301      {
 1302         return this.allowSyncListeners;
 1303      }
 1304   
 1305      public void setAllowSynchronousMembershipNotifications(boolean allowSync)
 1306      {
 1307         this.allowSyncListeners = allowSync;
 1308      }
 1309      
 1310      // AsynchEventHandler.AsynchEventProcessor -----------------------
 1311   
 1312      public void processEvent(Object event)
 1313      {
 1314         ViewChangeEvent vce = (ViewChangeEvent) event;
 1315         this.notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
 1316               vce.deadMembers, vce.newMembers, vce.originatingGroups);
 1317         
 1318      }
 1319      
 1320      
 1321      // Public ------------------------------------------------------------------
 1322      
 1323      public void setDistributedStateImpl(DistributedStateImpl distributedState)
 1324      {
 1325         this.distributedState = distributedState;
 1326      }
 1327      
 1328      public void setDistributedReplicantManagerImpl(DistributedReplicantManagerImpl drm)
 1329      {
 1330         if (this.replicantManager != null  && !(this.replicantManager == drm))
 1331         {
 1332            throw new IllegalStateException("DistributedReplicantManager already set");
 1333         }
 1334   
 1335         this.replicantManager = drm;
 1336         if (this.replicantManager != null)
 1337         {
 1338            this.replicantManager.setHAPartition(this);
 1339         }
 1340      }
 1341      
 1342      
 1343      // Protected -----------------------------------------------------
 1344   
 1345      protected void verifyNodeIsUnique () throws IllegalStateException
 1346      {
 1347         ClusterNodeImpl matched = null;
 1348         for (ClusterNode member : this.getClusterNodes())
 1349         {
 1350            if (member.equals(this.me))
 1351            {
 1352               if (matched == null)
 1353               {
 1354                  // We of course are in the view, so we expect one match
 1355                  // Just track that we've had one
 1356                  matched = (ClusterNodeImpl) member;
 1357               }
 1358               else
 1359               {
 1360                  // Two nodes in view match us; try to figure out which one isn't us
 1361                  ClusterNodeImpl other = matched;
 1362                  if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)this.me).getOriginalJGAddress()))
 1363                  {
 1364                     other = (ClusterNodeImpl) member;
 1365                  }
 1366                  throw new IllegalStateException("Found member " + other +
 1367                        " in current view that duplicates us (" + this.me + "). This" +
 1368                        " node cannot join partition until duplicate member has" +
 1369                        " been removed");
 1370               }
 1371            }
 1372         }
 1373      }
 1374   
 1375      /**
 1376       * Helper method that binds the partition in the JNDI tree.
 1377       * @param jndiName Name under which the object must be bound
 1378       * @param who Object to bind in JNDI
 1379       * @param classType Class type under which should appear the bound object
 1380       * @param ctx Naming context under which we bind the object
 1381       * @throws Exception Thrown if a naming exception occurs during binding
 1382       */
 1383      protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
 1384      {
 1385         // Ah ! This service isn't serializable, so we use a helper class
 1386         //
 1387         NonSerializableFactory.bind(jndiName, who);
 1388         Name n = ctx.getNameParser("").parse(jndiName);
 1389         while (n.size () > 1)
 1390         {
 1391            String ctxName = n.get (0);
 1392            try
 1393            {
 1394               ctx = (Context)ctx.lookup (ctxName);
 1395            }
 1396            catch (NameNotFoundException e)
 1397            {
 1398               this.log.debug ("creating Subcontext " + ctxName);
 1399               ctx = ctx.createSubcontext (ctxName);
 1400            }
 1401            n = n.getSuffix (1);
 1402         }
 1403   
 1404         // The helper class NonSerializableFactory uses address type nns, we go on to
 1405         // use the helper class to bind the service object in JNDI
 1406         //
 1407         StringRefAddr addr = new StringRefAddr("nns", jndiName);
 1408         Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
 1409         ctx.rebind (n.get (0), ref);
 1410      }
 1411      
 1412      /**
 1413       * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
 1414       * Dead members are old - new members.
 1415       * @param oldMembers Vector of old members
 1416       * @param newMembers Vector of new members
 1417       * @return Vector of members that have died between the two views, can be empty.
 1418       */
 1419      protected Vector<ClusterNode> getDeadMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> newMembers)
 1420      {
 1421         if(oldMembers == null)
 1422         {
 1423            oldMembers=new Vector<ClusterNode>();
 1424         }
 1425         if(newMembers == null)
 1426         {
 1427            newMembers=new Vector<ClusterNode>();
 1428         }
 1429         Vector<ClusterNode> dead=(Vector)oldMembers.clone();
 1430         dead.removeAll(newMembers);
 1431         this.log.debug("dead members: " + dead);
 1432         return dead;
 1433      }
 1434      
 1435      /**
 1436       * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
 1437       * @param oldMembers Vector of old members
 1438       * @param allMembers Vector of new members
 1439       * @return Vector of members that have joined the partition between the two views
 1440       */
 1441      protected Vector<ClusterNode> getNewMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> allMembers)
 1442      {
 1443         if(oldMembers == null)
 1444         {
 1445            oldMembers=new Vector<ClusterNode>();
 1446         }
 1447         if(allMembers == null)
 1448         {
 1449            allMembers=new Vector<ClusterNode>();
 1450         }
 1451         Vector<ClusterNode> newMembers=(Vector)allMembers.clone();
 1452         newMembers.removeAll(oldMembers);
 1453         return newMembers;
 1454      }
 1455   
 1456      protected void notifyListeners(ArrayList<HAMembershipListener> theListeners, long viewID,
 1457         Vector<ClusterNode> allMembers, Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
 1458         Vector<View> originatingGroups)
 1459      {
 1460         this.log.debug("Begin notifyListeners, viewID: "+viewID);
 1461         synchronized(theListeners)
 1462         {
 1463            // JBAS-3619 -- don't hold synch lock while notifying
 1464            theListeners = (ArrayList) theListeners.clone();
 1465         }
 1466         
 1467         for (int i = 0; i < theListeners.size(); i++)
 1468         {
 1469            HAMembershipListener aListener = null;
 1470            try
 1471            {
 1472               aListener = theListeners.get(i);
 1473               if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
 1474               {
 1475                  HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
 1476                  exListener.membershipChangedDuringMerge (deadMembers, newMembers,
 1477                     allMembers, originatingGroups);
 1478               }
 1479               else
 1480               {
 1481                  aListener.membershipChanged(deadMembers, newMembers, allMembers);
 1482               }
 1483            }
 1484            catch (Throwable e)
 1485            {
 1486               // a problem in a listener should not prevent other members to receive the new view
 1487               this.log.warn("HAMembershipListener callback failure: "+aListener, e);
 1488            }
 1489         }
 1490         
 1491         this.log.debug("End notifyListeners, viewID: "+viewID);
 1492      }
 1493      
 1494      /*
 1495       * Allows caller to specify whether the partition instance should be bound into JNDI.  Default value is true.
 1496       * This method must be called before the partition is started as the binding occurs during startup.
 1497       * 
 1498       * @param bind  Whether to bind the partition into JNDI.
 1499       */
 1500      public void setBindIntoJndi(boolean bind)
 1501      {
 1502          this.bindIntoJndi = bind;
 1503      }
 1504      
 1505      /*
 1506       * Allows caller to determine whether the partition instance has been bound into JNDI.
 1507       * 
 1508       * @return true if the partition has been bound into JNDI.
 1509       */
 1510      public boolean getBindIntoJndi()
 1511      {
 1512          return this.bindIntoJndi;
 1513      }
 1514   
 1515      public ThreadPool getThreadPool()
 1516      {
 1517         return this.threadPool;
 1518      }
 1519   
 1520      public void setThreadPool(ThreadPool threadPool)
 1521      {
 1522         this.threadPool = threadPool;
 1523      }
 1524   
 1525      protected Vector<ClusterNode> translateAddresses(Vector<Address> addresses)
 1526      {
 1527         if (addresses == null)
 1528         {
 1529            return null;
 1530         }
 1531   
 1532         Vector<ClusterNode> result = new Vector<ClusterNode>(addresses.size());
 1533         for (Address address: addresses)
 1534         {
 1535            result.add(new ClusterNodeImpl((IpAddress) address));
 1536         }
 1537   
 1538         return result;
 1539      }
 1540   
 1541      public void logHistory (String message)
 1542      {
 1543         try
 1544         {
 1545            this.history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
 1546         }
 1547         catch (Exception ignored){}
 1548      }
 1549   
 1550      // --------------------------------------------------- ClusterPartitionMBean
 1551      
 1552      public String showHistory ()
 1553      {
 1554         StringBuffer buff = new StringBuffer();
 1555         Vector<String> data = new Vector<String>(this.history);
 1556         for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
 1557         {
 1558            String info = row.next();
 1559            buff.append(info).append("\n");
 1560         }
 1561         return buff.toString();
 1562      }
 1563   
 1564      public String showHistoryAsXML ()
 1565      {
 1566         StringBuffer buff = new StringBuffer();
 1567         buff.append("<events>\n");
 1568         Vector<String> data = new Vector<String>(this.history);
 1569         for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
 1570         {
 1571            buff.append("   <event>\n      ");
 1572            String info = row.next();
 1573            buff.append(info);
 1574            buff.append("\n   </event>\n");
 1575         }
 1576         buff.append("</events>\n");
 1577         return buff.toString();
 1578      }
 1579      
 1580      public Cache getClusteredCache()
 1581      {
 1582         return this.cache;
 1583      }
 1584      
 1585      public boolean getDeadlockDetection()
 1586      {
 1587         return this.deadlock_detection;
 1588      }
 1589   
 1590      public void setDeadlockDetection(boolean doit)
 1591      {
 1592         this.deadlock_detection = doit;
 1593      }
 1594   
 1595      public HAPartition getHAPartition()
 1596      {
 1597         return this;
 1598      }
 1599   
 1600      public String getJGroupsVersion()
 1601      {
 1602         return Version.description + "( " + Version.cvs + ")";
 1603      }
 1604   
 1605      public ChannelFactory getChannelFactory()
 1606      {
 1607         return this.channelFactory;
 1608      }
 1609   
 1610      public CacheManager getCacheManager()
 1611      {
 1612         return this.cacheManager;
 1613      }
 1614   
 1615      public void setCacheManager(CacheManager cacheManager)
 1616      {
 1617         this.cacheManager = cacheManager;
 1618      }
 1619   
 1620      public String getCacheConfigName()
 1621      {
 1622         return this.cacheConfigName;
 1623      }
 1624   
 1625      public void setCacheConfigName(String cacheConfigName)
 1626      {
 1627         this.cacheConfigName = cacheConfigName;
 1628      }
 1629   
 1630      public String getChannelStackName()
 1631      {
 1632         return this.stackName;
 1633      }
 1634   
 1635      public InetAddress getNodeAddress()
 1636      {
 1637         return this.nodeAddress;
 1638      }
 1639   
 1640      public void setNodeAddress(InetAddress address)
 1641      {
 1642         this.nodeAddress = address;
 1643      }
 1644   
 1645      public long getStateTransferTimeout() {
 1646         return this.state_transfer_timeout;
 1647      }
 1648   
 1649      public void setStateTransferTimeout(long timeout)
 1650      {
 1651         this.state_transfer_timeout = timeout;
 1652      }
 1653   
 1654      public long getMethodCallTimeout() {
 1655         return this.method_call_timeout;
 1656      }
 1657   
 1658      public void setMethodCallTimeout(long timeout)
 1659      {
 1660         this.method_call_timeout = timeout;
 1661      }
 1662   
 1663      // Protected --------------------------------------------------------------
 1664         
 1665      /**
 1666       * Creates an object from a byte buffer
 1667       */
 1668      protected Object objectFromByteBufferInternal (byte[] buffer) throws Exception
 1669      {
 1670         if(buffer == null)
 1671         {
 1672            return null;
 1673         }
 1674   
 1675         ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
 1676         MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
 1677         return mvis.readObject();
 1678      }
 1679      
 1680      /**
 1681       * Serializes an object into a byte buffer.
 1682       * The object has to implement interface Serializable or Externalizable
 1683       */
 1684      protected byte[] objectToByteBufferInternal (Object obj) throws Exception
 1685      {
 1686         ByteArrayOutputStream baos = new ByteArrayOutputStream();
 1687         MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
 1688         mvos.writeObject(obj);
 1689         mvos.flush();
 1690         return baos.toByteArray();
 1691      }
 1692      
 1693      /**
 1694       * Creates a response object from a byte buffer - optimized for response marshalling
 1695       */
 1696      protected Object objectFromByteBufferResponseInternal (byte[] buffer) throws Exception
 1697      {
 1698         if(buffer == null)
 1699         {
 1700            return null;
 1701         }
 1702   
 1703         if (buffer[0] == NULL_VALUE)
 1704         {
 1705            return null;
 1706         }
 1707   
 1708         ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
 1709         // read past the null/serializable byte
 1710         bais.read();
 1711         MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
 1712         return mvis.readObject();
 1713      }
 1714      
 1715      /**
 1716       * Serializes a response object into a byte buffer, optimized for response marshalling.
 1717       * The object has to implement interface Serializable or Externalizable
 1718       */
 1719      protected byte[] objectToByteBufferResponseInternal (Object obj) throws Exception
 1720      {
 1721         if (obj == null)
 1722         {
 1723            return new byte[]{NULL_VALUE};
 1724         }
 1725   
 1726         ByteArrayOutputStream baos = new ByteArrayOutputStream();
 1727         // write a marker to stream to distinguish from null value stream
 1728         baos.write(SERIALIZABLE_VALUE);
 1729         MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
 1730         mvos.writeObject(obj);
 1731         mvos.flush();
 1732         return baos.toByteArray();
 1733      }
 1734      
 1735      // Private -------------------------------------------------------
 1736      
 1737      // Inner classes -------------------------------------------------
 1738   
 1739      private class MessageListenerAdapter
 1740            implements ExtendedMessageListener
 1741      {
 1742         
 1743         public void getState(OutputStream stream)
 1744         {
 1745            ClusterPartition.this.logHistory ("getState called on partition");
 1746            
 1747            ClusterPartition.this.log.debug("getState called.");
 1748            try
 1749            {
 1750               ClusterPartition.this.getStateInternal(stream);
 1751            }
 1752            catch (Exception ex)
 1753            {
 1754               ClusterPartition.this.log.error("getState failed", ex);
 1755            }
 1756            
 1757         }
 1758         
 1759         public void getState(String state_id, OutputStream ostream)
 1760         {
 1761            throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
 1762         }
 1763   
 1764         public byte[] getState(String state_id)
 1765         {
 1766            throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
 1767         }
 1768         
 1769         public void setState(InputStream stream)
 1770         {
 1771            ClusterPartition.this.logHistory ("setState called on partition");
 1772            try
 1773            {
 1774               if (stream == null)
 1775               {
 1776                  ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
 1777               }
 1778               else
 1779               {
 1780                  ClusterPartition.this.setStateInternal(stream);
 1781               }
 1782               
 1783               ClusterPartition.this.isStateSet = true;
 1784            }
 1785            catch (Throwable t)
 1786            {
 1787               ClusterPartition.this.recordSetStateFailure(t);
 1788            }
 1789            finally
 1790            {
 1791               // Notify waiting thread that serviceState has been set.
 1792               ClusterPartition.this.notifyChannelLock();
 1793            }
 1794         }
 1795   
 1796         public byte[] getState()
 1797         {
 1798            ClusterPartition.this.logHistory ("getState called on partition");
 1799            
 1800            ClusterPartition.this.log.debug("getState called.");
 1801            try
 1802            {
 1803               ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
 1804               ClusterPartition.this.getStateInternal(baos);
 1805               return baos.toByteArray();
 1806            }
 1807            catch (Exception ex)
 1808            {
 1809               ClusterPartition.this.log.error("getState failed", ex);
 1810            }
 1811            return null; // This will cause the receiver to get a "false" on the channel.getState() call
 1812         }
 1813   
 1814         public void setState(String state_id, byte[] state)
 1815         {
 1816            throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
 1817         }
 1818   
 1819         public void setState(String state_id, InputStream istream)
 1820         {
 1821            throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
 1822         }
 1823   
 1824         public void receive(org.jgroups.Message msg)
 1825         { /* complete */}
 1826         
 1827         public void setState(byte[] obj)
 1828         {
 1829            ClusterPartition.this.logHistory ("setState called on partition");
 1830            try
 1831            {
 1832               if (obj == null)
 1833               {
 1834                  ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
 1835               }
 1836               else
 1837               {
 1838                  ByteArrayInputStream bais = new ByteArrayInputStream(obj);
 1839                  ClusterPartition.this.setStateInternal(bais);
 1840                  bais.close();
 1841               }
 1842               
 1843               ClusterPartition.this.isStateSet = true;
 1844            }
 1845            catch (Throwable t)
 1846            {
 1847               ClusterPartition.this.recordSetStateFailure(t);
 1848            }
 1849            finally
 1850            {
 1851               // Notify waiting thread that serviceState has been set.
 1852               ClusterPartition.this.notifyChannelLock();
 1853            }
 1854         }
 1855         
 1856      }
 1857   
 1858      /**
 1859       * A simple data class containing the view change event needed to
 1860       * notify the HAMembershipListeners
 1861       */
 1862      private static class ViewChangeEvent
 1863      {
 1864         long viewId;
 1865         Vector<ClusterNode> deadMembers;
 1866         Vector<ClusterNode> newMembers;
 1867         Vector<ClusterNode> allMembers;
 1868         Vector<View> originatingGroups;
 1869      }
 1870      
 1871      private class RequestMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
 1872      {
 1873   
 1874         public Object objectFromByteBuffer(byte[] buf) throws Exception
 1875         {
 1876            return ClusterPartition.this.objectFromByteBufferInternal(buf);
 1877         }
 1878   
 1879         public byte[] objectToByteBuffer(Object obj) throws Exception
 1880         {
 1881            // wrap MethodCall in Object[service_name, byte[]] so that service name is available during demarshalling
 1882            if (obj instanceof MethodCall)
 1883            {
 1884               String name = ((MethodCall)obj).getName();
 1885               int idx = name.lastIndexOf('.');
 1886               String serviceName = name.substring(0, idx);
 1887               return ClusterPartition.this.objectToByteBufferInternal(new Object[]{serviceName, ClusterPartition.this.objectToByteBufferInternal(obj)});
 1888            }
 1889   
 1890            return ClusterPartition.this.objectToByteBufferInternal(obj);
 1891         }
 1892      }
 1893      
 1894      private class ResponseMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
 1895      {
 1896         
 1897         public Object objectFromByteBuffer(byte[] buf) throws Exception
 1898         {
 1899            boolean trace = ClusterPartition.this.log.isTraceEnabled();
 1900            Object retval = ClusterPartition.this.objectFromByteBufferResponseInternal(buf);
 1901            // HAServiceResponse is only received when a scoped classloader is required for unmarshalling
 1902            if (!(retval instanceof HAServiceResponse))
 1903            {
 1904               return retval;
 1905            }
 1906             
 1907            String serviceName = ((HAServiceResponse)retval).getServiceName();
 1908            byte[] payload = ((HAServiceResponse)retval).getPayload();
 1909   
 1910            ClassLoader previousCL = null;
 1911            boolean overrideCL = false;
 1912            try
 1913            {
 1914               WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(serviceName);
 1915               if (weak != null) // this should always be true since we only use HAServiceResponse when classloader is specified
 1916               {
 1917                  previousCL = Thread.currentThread().getContextClassLoader();
 1918                  ClassLoader loader = weak.get();
 1919                  if( trace )
 1920                  {
 1921                     ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);
 1922                  }
 1923                  overrideCL = true;
 1924                  Thread.currentThread().setContextClassLoader(loader);
 1925               }
 1926               retval = ClusterPartition.this.objectFromByteBufferResponseInternal(payload);
 1927      
 1928               return retval;
 1929            }
 1930            finally
 1931            {
 1932               if (overrideCL == true)
 1933               {
 1934                  ClusterPartition.this.log.trace("resetting response classloader");
 1935                  Thread.currentThread().setContextClassLoader(previousCL);
 1936               }
 1937            }
 1938         }
 1939   
 1940         public byte[] objectToByteBuffer(Object obj) throws Exception
 1941         {
 1942            return ClusterPartition.this.objectToByteBufferResponseInternal(obj);
 1943         }
 1944      }
 1945      
 1946      /**
 1947       * Overrides RpcDispatcher.Handle so that we can dispatch to many
 1948       * different objects.
 1949       */
 1950      private class RpcHandler extends RpcDispatcher
 1951      {
 1952         private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
 1953               boolean deadlock_detection)
 1954         {
 1955            super(channel, l, l2, server_obj, deadlock_detection);
 1956         }
 1957         
 1958         /**
 1959          * Analyze the MethodCall contained in <code>req</code> to find the
 1960          * registered service object to invoke against, and then execute it
 1961          * against *that* object and return result.
 1962          *
 1963          * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
 1964          * @param req The org.jgroups. representation of the method invocation
 1965          * @return The serializable return value from the invocation
 1966          */
 1967         public Object handle(Message req)
 1968         {
 1969            Object body = null;
 1970            Object retval = null;
 1971            Object handler = null;
 1972            boolean trace = this.log.isTraceEnabled();
 1973            boolean overrideCL = false;
 1974            ClassLoader previousCL = null;
 1975            String service = null;
 1976            byte[] request_bytes = null;
 1977            
 1978            if( trace )
 1979            {
 1980               this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " received msg");
 1981            }
 1982            if(req == null || req.getBuffer() == null)
 1983            {
 1984               this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message or message buffer is null!");
 1985               return null;
 1986            }
 1987            
 1988            try
 1989            {
 1990               Object wrapper = ClusterPartition.this.objectFromByteBufferInternal(req.getBuffer());
 1991               if(wrapper == null || !(wrapper instanceof Object[]))
 1992               {
 1993                  this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message wrapper does not contain Object[] object!");
 1994                  return null;
 1995               }
 1996   
 1997               // wrapper should be Object[]{service_name, byte[]}
 1998               Object[] temp = (Object[])wrapper;
 1999               service = (String)temp[0];
 2000               request_bytes = (byte[])temp[1];
 2001   
 2002               // see if this node has registered to handle this service
 2003               handler = ClusterPartition.this.rpcHandlers.get(service);
 2004               if (handler == null)
 2005               {
 2006                  if( trace )
 2007                  {
 2008                     this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " no rpc handler registered under service " + service);
 2009                  }
 2010                  return new NoHandlerForRPC();
 2011               }
 2012            }
 2013            catch(Exception e)
 2014            {
 2015               this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
 2016               return null;
 2017            }
 2018            
 2019            try
 2020            {
 2021               // If client registered the service with a classloader, override the thread classloader here
 2022               WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(service);
 2023               if (weak != null)
 2024               {
 2025                  if( trace )
 2026                  {
 2027                     this.log.trace("overriding Thread ContextClassLoader for RPC service " + service);
 2028                  }
 2029                  previousCL = Thread.currentThread().getContextClassLoader();
 2030                  ClassLoader loader = weak.get();
 2031                  overrideCL = true;
 2032                  Thread.currentThread().setContextClassLoader(loader);
 2033               }
 2034               body = ClusterPartition.this.objectFromByteBufferInternal(request_bytes);
 2035            }
 2036            catch (Exception e)
 2037            {
 2038               this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed extracting message body from request bytes", e);
 2039               return null;
 2040            }
 2041            finally
 2042            {
 2043               if (overrideCL)
 2044               {
 2045                  this.log.trace("resetting Thread ContextClassLoader");
 2046                  Thread.currentThread().setContextClassLoader(previousCL);
 2047               }
 2048            }
 2049            
 2050            if(body == null || !(body instanceof MethodCall))
 2051            {
 2052               this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message does not contain a MethodCall object!");
 2053               return null;
 2054            }
 2055            
 2056            // get method call information
 2057            MethodCall method_call = (MethodCall)body;
 2058            String methodName = method_call.getName();
 2059            
 2060            if( trace )
 2061            {
 2062               this.log.trace("full methodName: " + methodName);
 2063            }
 2064            
 2065            int idx = methodName.lastIndexOf('.');
 2066            String handlerName = methodName.substring(0, idx);
 2067            String newMethodName = methodName.substring(idx + 1);
 2068            if( trace )
 2069            {
 2070               this.log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
 2071               this.log.trace("Handle: " + methodName);
 2072            }
 2073            
 2074            // prepare method call
 2075            method_call.setName(newMethodName);
 2076   
 2077            /* Invoke it and just return any exception with trace level logging of
 2078            the exception. The exception semantics of a group rpc call are weak as
 2079            the return value may be a normal return value or the exception thrown.
 2080            */
 2081            try
 2082            {
 2083               retval = method_call.invoke(handler);
 2084               if (overrideCL)
 2085               {
 2086                  // wrap the response so that the service name can be accessed during unmarshalling of the response
 2087                  byte[] retbytes = ClusterPartition.this.objectToByteBufferResponseInternal(retval);
 2088                  retval = new HAServiceResponse(handlerName, retbytes);
 2089               }
 2090               if( trace )
 2091               {
 2092                  this.log.trace("rpc call return value: " + retval);
 2093               }
 2094            }
 2095            catch (Throwable t)
 2096            {
 2097               if( trace )
 2098               {
 2099                  this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " rpc call threw exception", t);
 2100               }
 2101               retval = t;
 2102            }
 2103   
 2104            return retval;
 2105         }
 2106         
 2107      }
 2108      /**
 2109       * Copyright (c) 2005 Brian Goetz and Tim Peierls
 2110       * Released under the Creative Commons Attribution License
 2111       * (http://creativecommons.org/licenses/by/2.5)
 2112       * Official home: http://www.jcip.net
 2113       * 
 2114       * ThreadGate <p/> Recloseable gate using wait and notifyAll
 2115       * 
 2116       * @author Brian Goetz and Tim Peierls
 2117       */
 2118   
 2119      private static class ThreadGate {
 2120          // CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
 2121          private boolean isOpen;
 2122   
 2123          private int generation;
 2124   
 2125          public synchronized void close()
 2126          {
 2127              this.isOpen = false;
 2128          }
 2129   
 2130          public synchronized void open()
 2131          {
 2132              ++this.generation;
 2133              this.isOpen = true;
 2134              this.notifyAll();
 2135          }
 2136   
 2137          // BLOCKS-UNTIL: opened-since(generation on entry)
 2138          public synchronized void await() throws InterruptedException
 2139          {
 2140              int arrivalGeneration = this.generation;
 2141              while(!this.isOpen && arrivalGeneration == this.generation)
 2142            {
 2143               this.wait();
 2144            }
 2145          }
 2146          
 2147          // BLOCKS-UNTIL: opened-since(generation on entry)
 2148          public synchronized void await(long timeout) throws InterruptedException
 2149          {
 2150              int arrivalGeneration = this.generation;
 2151              while(!this.isOpen && arrivalGeneration == this.generation)
 2152            {
 2153               this.wait(timeout);
 2154            }
 2155          }
 2156      }
 2157      
 2158      private void setupLoggers(String partitionName)
 2159      {
 2160         if (partitionName == null)
 2161         {
 2162            this.log = Logger.getLogger(HAPartition.class.getName());
 2163            this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle");
 2164         }
 2165         else
 2166         {
 2167            this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
 2168            this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
 2169         }
 2170      }
 2171      
 2172   }

Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.ha.framework » server » [javadoc | source]