Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » ipc » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.ipc;
   20   
   21   import java.lang.reflect.Proxy;
   22   import java.lang.reflect.Method;
   23   import java.lang.reflect.Array;
   24   import java.lang.reflect.InvocationHandler;
   25   import java.lang.reflect.InvocationTargetException;
   26   
   27   import java.net.ConnectException;
   28   import java.net.InetSocketAddress;
   29   import java.net.SocketTimeoutException;
   30   import java.io;
   31   
   32   import org.apache.commons.logging;
   33   
   34   import org.apache.hadoop.io;
   35   import org.apache.hadoop.conf;
   36   
   37   /** A simple RPC mechanism.
   38    *
   39    * A <i>protocol</i> is a Java interface.  All parameters and return types must
   40    * be one of:
   41    *
   42    * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
   43    * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
   44    * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
   45    *
   46    * <li>a {@link String}; or</li>
   47    *
   48    * <li>a {@link Writable}; or</li>
   49    *
   50    * <li>an array of the above types</li> </ul>
   51    *
   52    * All methods in the protocol should throw only IOException.  No field data of
   53    * the protocol instance is transmitted.
   54    */
   55   public class RPC {
   56     private static final Log LOG =
   57       LogFactory.getLog("org.apache.hadoop.ipc.RPC");
   58   
   59     private RPC() {}                                  // no public ctor
   60   
   61   
   62     /** A method invocation, including the method name and its parameters.*/
   63     private static class Invocation implements Writable, Configurable {
   64       private String methodName;
   65       private Class[] parameterClasses;
   66       private Object[] parameters;
   67       private Configuration conf;
   68   
   69       public Invocation() {}
   70   
   71       public Invocation(Method method, Object[] parameters) {
   72         this.methodName = method.getName();
   73         this.parameterClasses = method.getParameterTypes();
   74         this.parameters = parameters;
   75       }
   76   
   77       /** The name of the method invoked. */
   78       public String getMethodName() { return methodName; }
   79   
   80       /** The parameter classes. */
   81       public Class[] getParameterClasses() { return parameterClasses; }
   82   
   83       /** The parameter instances. */
   84       public Object[] getParameters() { return parameters; }
   85   
   86       public void readFields(DataInput in) throws IOException {
   87         methodName = UTF8.readString(in);
   88         parameters = new Object[in.readInt()];
   89         parameterClasses = new Class[parameters.length];
   90         ObjectWritable objectWritable = new ObjectWritable();
   91         for (int i = 0; i < parameters.length; i++) {
   92           parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
   93           parameterClasses[i] = objectWritable.getDeclaredClass();
   94         }
   95       }
   96   
   97       public void write(DataOutput out) throws IOException {
   98         UTF8.writeString(out, methodName);
   99         out.writeInt(parameterClasses.length);
  100         for (int i = 0; i < parameterClasses.length; i++) {
  101           ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
  102                                      conf);
  103         }
  104       }
  105   
  106       public String toString() {
  107         StringBuffer buffer = new StringBuffer();
  108         buffer.append(methodName);
  109         buffer.append("(");
  110         for (int i = 0; i < parameters.length; i++) {
  111           if (i != 0)
  112             buffer.append(", ");
  113           buffer.append(parameters[i]);
  114         }
  115         buffer.append(")");
  116         return buffer.toString();
  117       }
  118   
  119       public void setConf(Configuration conf) {
  120         this.conf = conf;
  121       }
  122   
  123       public Configuration getConf() {
  124         return this.conf;
  125       }
  126   
  127     }
  128   
  129     private static Client CLIENT;
  130   
  131     private static synchronized Client getClient(Configuration conf) {
  132       // Construct & cache client.  The configuration is only used for timeout,
  133       // and Clients have connection pools.  So we can either (a) lose some
  134       // connection pooling and leak sockets, or (b) use the same timeout for all
  135       // configurations.  Since the IPC is usually intended globally, not
  136       // per-job, we choose (a).
  137       if (CLIENT == null) {
  138         CLIENT = new Client(ObjectWritable.class, conf);
  139       }
  140       return CLIENT;
  141     }
  142   
  143     /**
  144      * Stop all RPC client connections
  145      */
  146     public static synchronized void stopClient(){
  147       if (CLIENT != null) {
  148         CLIENT.stop();
  149         CLIENT = null;
  150       }
  151     }
  152   
  153     private static class Invoker implements InvocationHandler {
  154       private InetSocketAddress address;
  155       private Client client;
  156   
  157       public Invoker(InetSocketAddress address, Configuration conf) {
  158         this.address = address;
  159         this.client = getClient(conf);
  160       }
  161   
  162       public Object invoke(Object proxy, Method method, Object[] args)
  163         throws Throwable {
  164         long startTime = System.currentTimeMillis();
  165         ObjectWritable value = (ObjectWritable)
  166           client.call(new Invocation(method, args), address);
  167         long callTime = System.currentTimeMillis() - startTime;
  168         LOG.debug("Call: " + method.getName() + " " + callTime);
  169         return value.get();
  170       }
  171     }
  172   
  173     /**
  174      * A version mismatch for the RPC protocol.
  175      */
  176     public static class VersionMismatch extends IOException {
  177       private String interfaceName;
  178       private long clientVersion;
  179       private long serverVersion;
  180       
  181       /**
  182        * Create a version mismatch exception
  183        * @param interfaceName the name of the protocol mismatch
  184        * @param clientVersion the client's version of the protocol
  185        * @param serverVersion the server's version of the protocol
  186        */
  187       public VersionMismatch(String interfaceName, long clientVersion,
  188                              long serverVersion) {
  189         super("Protocol " + interfaceName + " version mismatch. (client = " +
  190               clientVersion + ", server = " + serverVersion + ")");
  191         this.interfaceName = interfaceName;
  192         this.clientVersion = clientVersion;
  193         this.serverVersion = serverVersion;
  194       }
  195       
  196       /**
  197        * Get the interface name
  198        * @return the java class name 
  199        *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
  200        */
  201       public String getInterfaceName() {
  202         return interfaceName;
  203       }
  204       
  205       /**
  206        * Get the client's prefered version
  207        */
  208       public long getClientVersion() {
  209         return clientVersion;
  210       }
  211       
  212       /**
  213        * Get the server's agreed to version.
  214        */
  215       public long getServerVersion() {
  216         return serverVersion;
  217       }
  218     }
  219     
  220     public static VersionedProtocol waitForProxy(Class protocol,
  221                                                  long clientVersion,
  222                                                  InetSocketAddress addr,
  223                                                  Configuration conf
  224                                                  ) throws IOException {
  225       while (true) {
  226         try {
  227           return getProxy(protocol, clientVersion, addr, conf);
  228         } catch(ConnectException se) {  // namenode has not been started
  229           LOG.info("Server at " + addr + " not available yet, Zzzzz...");
  230         } catch(SocketTimeoutException te) {  // namenode is busy
  231           LOG.info("Problem connecting to server: " + addr);
  232         }
  233         try {
  234           Thread.sleep(1000);
  235         } catch (InterruptedException ie) {
  236           // IGNORE
  237         }
  238       }
  239     }
  240     /** Construct a client-side proxy object that implements the named protocol,
  241      * talking to a server at the named address. */
  242     public static VersionedProtocol getProxy(Class protocol, long clientVersion,
  243                                              InetSocketAddress addr, Configuration conf) throws IOException {
  244       VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
  245                                                                            protocol.getClassLoader(),
  246                                                                            new Class[] { protocol },
  247                                                                            new Invoker(addr, conf));
  248       long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
  249                                                     clientVersion);
  250       if (serverVersion == clientVersion) {
  251         return proxy;
  252       } else {
  253         throw new VersionMismatch(protocol.getName(), clientVersion, 
  254                                   serverVersion);
  255       }
  256     }
  257   
  258     /** Expert: Make multiple, parallel calls to a set of servers. */
  259     public static Object[] call(Method method, Object[][] params,
  260                                 InetSocketAddress[] addrs, Configuration conf)
  261       throws IOException {
  262   
  263       Invocation[] invocations = new Invocation[params.length];
  264       for (int i = 0; i < params.length; i++)
  265         invocations[i] = new Invocation(method, params[i]);
  266       Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
  267       
  268       if (method.getReturnType() == Void.TYPE) {
  269         return null;
  270       }
  271   
  272       Object[] values =
  273         (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
  274       for (int i = 0; i < values.length; i++)
  275         if (wrappedValues[i] != null)
  276           values[i] = ((ObjectWritable)wrappedValues[i]).get();
  277       
  278       return values;
  279     }
  280   
  281     /** Construct a server for a protocol implementation instance listening on a
  282      * port and address. */
  283     public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
  284       throws IOException {
  285       return getServer(instance, bindAddress, port, 1, false, conf);
  286     }
  287   
  288     /** Construct a server for a protocol implementation instance listening on a
  289      * port and address. */
  290     public static Server getServer(final Object instance, final String bindAddress, final int port,
  291                                    final int numHandlers,
  292                                    final boolean verbose, Configuration conf) 
  293       throws IOException {
  294       return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
  295     }
  296   
  297     /** An RPC Server. */
  298     public static class Server extends org.apache.hadoop.ipc.Server {
  299       private Object instance;
  300       private Class<?> implementation;
  301       private boolean verbose;
  302   
  303       /** Construct an RPC server.
  304        * @param instance the instance whose methods will be called
  305        * @param conf the configuration to use
  306        * @param bindAddress the address to bind on to listen for connection
  307        * @param port the port to listen for connections on
  308        */
  309       public Server(Object instance, Configuration conf, String bindAddress, int port) 
  310         throws IOException {
  311         this(instance, conf,  bindAddress, port, 1, false);
  312       }
  313   
  314       /** Construct an RPC server.
  315        * @param instance the instance whose methods will be called
  316        * @param conf the configuration to use
  317        * @param bindAddress the address to bind on to listen for connection
  318        * @param port the port to listen for connections on
  319        * @param numHandlers the number of method handler threads to run
  320        * @param verbose whether each call should be logged
  321        */
  322       public Server(Object instance, Configuration conf, String bindAddress,  int port,
  323                     int numHandlers, boolean verbose) throws IOException {
  324         super(bindAddress, port, Invocation.class, numHandlers, conf);
  325         this.instance = instance;
  326         this.implementation = instance.getClass();
  327         this.verbose = verbose;
  328       }
  329   
  330       public Writable call(Writable param) throws IOException {
  331         try {
  332           Invocation call = (Invocation)param;
  333           if (verbose) log("Call: " + call);
  334           
  335           Method method =
  336             implementation.getMethod(call.getMethodName(),
  337                                      call.getParameterClasses());
  338   
  339           long startTime = System.currentTimeMillis();
  340           Object value = method.invoke(instance, call.getParameters());
  341           long callTime = System.currentTimeMillis() - startTime;
  342           LOG.debug("Served: " + call.getMethodName() + " " + callTime);
  343           if (verbose) log("Return: "+value);
  344   
  345           return new ObjectWritable(method.getReturnType(), value);
  346   
  347         } catch (InvocationTargetException e) {
  348           Throwable target = e.getTargetException();
  349           if (target instanceof IOException) {
  350             throw (IOException)target;
  351           } else {
  352             IOException ioe = new IOException(target.toString());
  353             ioe.setStackTrace(target.getStackTrace());
  354             throw ioe;
  355           }
  356         } catch (Throwable e) {
  357           IOException ioe = new IOException(e.toString());
  358           ioe.setStackTrace(e.getStackTrace());
  359           throw ioe;
  360         }
  361       }
  362     }
  363   
  364     private static void log(String value) {
  365       if (value!= null && value.length() > 55)
  366         value = value.substring(0, 55)+"...";
  367       LOG.info(value);
  368     }
  369     
  370   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » ipc » [javadoc | source]