Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.invocation.pooled » server » [javadoc | source]
    1   /*
    2   * JBoss, Home of Professional Open Source
    3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
    4   * by the @authors tag. See the copyright.txt in the distribution for a
    5   * full listing of individual contributors.
    6   *
    7   * This is free software; you can redistribute it and/or modify it
    8   * under the terms of the GNU Lesser General Public License as
    9   * published by the Free Software Foundation; either version 2.1 of
   10   * the License, or (at your option) any later version.
   11   *
   12   * This software is distributed in the hope that it will be useful,
   13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15   * Lesser General Public License for more details.
   16   *
   17   * You should have received a copy of the GNU Lesser General Public
   18   * License along with this software; if not, write to the Free
   19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21   */
   22   package org.jboss.invocation.pooled.server;
   23   
   24   import org.jboss.system.Registry;
   25   import javax.management.ObjectName;
   26   import org.jboss.invocation.Invocation;
   27   import org.jboss.invocation.MarshalledInvocation;
   28   import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy;
   29   import org.jboss.invocation.pooled.interfaces.ServerAddress;
   30   import java.util.HashMap;
   31   import org.jboss.invocation.Invoker;
   32   import org.jboss.invocation.InvokerHA;
   33   import org.jboss.invocation.jrmp.interfaces.JRMPInvokerProxyHA;
   34   import org.jboss.ha.framework.interfaces.HARMIResponse;
   35   import org.jboss.ha.framework.server.HATarget;
   36   import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
   37   import org.jboss.ha.framework.interfaces.GenericClusteringException;
   38   import javax.management.InstanceNotFoundException;
   39   import javax.management.ReflectionException;
   40   
   41   import java.util.ArrayList;
   42   
   43   /**
   44    * This invoker pools Threads and client connections to one server socket.
   45    * The purpose is to avoid a bunch of failings of RMI.
   46    * 
   47    * 1. Avoid making a client socket connection with every invocation call.
   48    *    This is very expensive.  Also on windows if too many clients try 
   49    *    to connect at the same time, you get connection refused exceptions.
   50    *    This invoker/proxy combo alleviates this.
   51    *
   52    * 2. Avoid creating a thread per invocation.  The client/server connection
   53    *    is preserved and attached to the same thread.
   54   
   55    * So we have connection pooling on the server and client side, and thread pooling
   56    * on the server side.  Pool, is an LRU pool, so resources should be cleaned up.
   57    * 
   58    *
   59    * @author    <a href="mailto:bill@jboss.org">Bill Burke</a>
   60    * @version $Revision: 69191 $
   61    *
   62    * @jmx:mbean extends="org.jboss.system.ServiceMBean"
   63    */
   64   public class PooledInvokerHA extends PooledInvoker implements InvokerHA
   65   {
   66      protected HashMap beanMap = new HashMap();
   67   
   68      protected void jmxBind()
   69      {
   70         Registry.bind(getServiceName(), this);
   71      }
   72   
   73      // JRMPInvoker.destroyService() does the right thing
   74   
   75      public java.io.Serializable getStub() 
   76      {
   77         ServerAddress sa = new ServerAddress(clientConnectAddress,
   78            clientConnectPort, enableTcpNoDelay, timeout, clientSocketFactory); 
   79         return new PooledInvokerProxy(sa, clientMaxPoolSize);
   80      }
   81   
   82      public void registerBean(ObjectName beanName, HATarget target) throws Exception
   83      {
   84         Integer hash = new Integer(beanName.hashCode());
   85         log.debug("registerBean: "+beanName);
   86         
   87         if (beanMap.containsKey(hash))
   88         {
   89            // FIXME [oleg] In theory this is possible!
   90            log.debug("Trying to register target " + target + " using an existing hashCode. Already registered: " + hash + "=" + beanMap.get(hash));
   91            throw new IllegalStateException("Trying to register target using an existing hashCode.");
   92         }
   93         beanMap.put(hash, target);
   94      }
   95      
   96      public Invoker createProxy(ObjectName beanName, LoadBalancePolicy policy,
   97         String proxyFamilyName) throws Exception
   98      {
   99         Integer hash = new Integer(beanName.hashCode());
  100         HATarget target = (HATarget) beanMap.get(hash);
  101         if (target == null)
  102         {
  103            throw new IllegalStateException("The bean hashCode not found");
  104         }
  105   
  106         String familyName = proxyFamilyName;
  107         if (familyName == null)
  108            familyName= target.getAssociatedPartition().getPartitionName() + "/" + beanName;
  109   
  110         JRMPInvokerProxyHA proxy = new JRMPInvokerProxyHA(target.getReplicants(), 
  111                                                           policy, 
  112                                                           familyName, 
  113                                                           target.getCurrentViewId ());
  114         return proxy;
  115      }
  116   
  117      public void unregisterBean(ObjectName beanName) throws Exception
  118      {
  119         Integer hash = new Integer(beanName.hashCode());
  120         beanMap.remove(hash);
  121      }
  122   
  123      /**
  124       * Invoke a Remote interface method.
  125       */
  126      public Object invoke(Invocation invocation)
  127         throws Exception
  128      {
  129         ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
  130         try
  131         {
  132            // Deserialize the transaction if it is there
  133            invocation.setTransaction(importTPC(((MarshalledInvocation) invocation).getTransactionPropagationContext()));
  134   
  135            // Extract the ObjectName, the rest is still marshalled
  136            ObjectName mbean = (ObjectName) Registry.lookup(invocation.getObjectName());
  137            long clientViewId = ((Long)invocation.getValue("CLUSTER_VIEW_ID")).longValue();
  138   
  139            HATarget target = (HATarget)beanMap.get(invocation.getObjectName());
  140            if (target == null) 
  141            {
  142               // We could throw IllegalStateException but we have a race condition that could occur:
  143               // when we undeploy a bean, the cluster takes some time to converge
  144               // and to recalculate a new viewId and list of replicant for each HATarget.
  145               // Consequently, a client could own an up-to-date list of the replicants
  146               // (before the cluster has converged) and try to perform an invocation
  147               // on this node where the HATarget no more exist, thus receiving a
  148               // wrong exception and no failover is performed with an IllegalStateException
  149               //
  150               throw new GenericClusteringException(GenericClusteringException.COMPLETED_NO, 
  151                                                    "target is not/no more registered on this node");            
  152            }
  153            
  154            if (!target.invocationsAllowed ())
  155               throw new GenericClusteringException(GenericClusteringException.COMPLETED_NO, 
  156                                           "invocations are currently not allowed on this target");            
  157   
  158            // The cl on the thread should be set in another interceptor
  159            Object rtn = getServer().invoke(mbean,
  160                                            "invoke",
  161                                            new Object[] { invocation },
  162                                            Invocation.INVOKE_SIGNATURE);
  163            
  164            HARMIResponse rsp = new HARMIResponse();
  165   
  166            if (clientViewId != target.getCurrentViewId())
  167            {
  168               rsp.newReplicants = new ArrayList(target.getReplicants());
  169               rsp.currentViewId = target.getCurrentViewId();
  170            }
  171            rsp.response = rtn;
  172            return rsp;
  173         }
  174         catch (InstanceNotFoundException e)
  175         {
  176            throw new GenericClusteringException(GenericClusteringException.COMPLETED_NO, e);
  177         }
  178         catch (ReflectionException e)
  179         {
  180            throw new GenericClusteringException(GenericClusteringException.COMPLETED_NO, e);
  181         }
  182         catch (Exception e)
  183         {
  184            org.jboss.mx.util.JMXExceptionDecoder.rethrow(e);
  185   
  186            // the compiler does not know an exception is thrown by the above
  187            throw new org.jboss.util.UnreachableStatementException();
  188         }
  189         finally
  190         {
  191            Thread.currentThread().setContextClassLoader(oldCl);
  192         }      
  193      }
  194   }
  195   // vim:expandtab:tabstop=3:shiftwidth=3

Save This Page
Home » jboss-5.0.0.CR1-src » org.jboss.invocation.pooled » server » [javadoc | source]