Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » ipc » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.ipc;
   20   
   21   import java.net.Socket;
   22   import java.net.InetSocketAddress;
   23   import java.net.SocketTimeoutException;
   24   import java.net.UnknownHostException;
   25   
   26   import java.io.IOException;
   27   import java.io.EOFException;
   28   import java.io.DataInputStream;
   29   import java.io.DataOutputStream;
   30   import java.io.BufferedInputStream;
   31   import java.io.BufferedOutputStream;
   32   import java.io.FilterInputStream;
   33   import java.io.FilterOutputStream;
   34   import java.io.OutputStream;
   35   
   36   import java.util.Hashtable;
   37   import java.util.Iterator;
   38   
   39   import org.apache.commons.logging;
   40   
   41   import org.apache.hadoop.conf.Configuration;
   42   import org.apache.hadoop.conf.Configurable;
   43   import org.apache.hadoop.dfs.FSConstants;
   44   import org.apache.hadoop.io.Writable;
   45   import org.apache.hadoop.io.WritableUtils;
   46   import org.apache.hadoop.io.DataOutputBuffer;
   47   import org.apache.hadoop.util.ReflectionUtils;
   48   import org.apache.hadoop.util.StringUtils;
   49   
   50   /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
   51    * parameter, and return a {@link Writable} as their value.  A service runs on
   52    * a port and is defined by a parameter class and a value class.
   53    * 
   54    * @see Server
   55    */
   56   public class Client {
   57     /** Should the client send the header on the connection? */
   58     private static final boolean SEND_HEADER = true;
   59     private static final byte CURRENT_VERSION = 0;
   60     
   61     public static final Log LOG =
   62       LogFactory.getLog("org.apache.hadoop.ipc.Client");
   63     private Hashtable<InetSocketAddress, Connection> connections =
   64       new Hashtable<InetSocketAddress, Connection>();
   65   
   66     private Class valueClass;                       // class of call values
   67     private int timeout;// timeout for calls
   68     private int counter;                            // counter for call ids
   69     private boolean running = true;                 // true while client runs
   70     private Configuration conf;
   71     private int maxIdleTime; //connections will be culled if it was idle for 
   72                              //maxIdleTime msecs
   73     private int maxRetries; //the max. no. of retries for socket connections
   74   
   75     /** A call waiting for a value. */
   76     private class Call {
   77       int id;                                       // call id
   78       Writable param;                               // parameter
   79       Writable value;                               // value, null if error
   80       String error;                                 // exception, null if value
   81       String errorClass;                            // class of exception
   82       long lastActivity;                            // time of last i/o
   83       boolean done;                                 // true when call is done
   84   
   85       protected Call(Writable param) {
   86         this.param = param;
   87         synchronized (Client.this) {
   88           this.id = counter++;
   89         }
   90         touch();
   91       }
   92   
   93       /** Called by the connection thread when the call is complete and the
   94        * value or error string are available.  Notifies by default.  */
   95       public synchronized void callComplete() {
   96         notify();                                 // notify caller
   97       }
   98   
   99       /** Update lastActivity with the current time. */
  100       public synchronized void touch() {
  101         lastActivity = System.currentTimeMillis();
  102       }
  103   
  104       /** Update lastActivity with the current time. */
  105       public synchronized void setResult(Writable value, 
  106                                          String errorClass,
  107                                          String error) {
  108         this.value = value;
  109         this.error = error;
  110         this.errorClass =errorClass;
  111         this.done = true;
  112       }
  113       
  114     }
  115   
  116     /** Thread that reads responses and notifies callers.  Each connection owns a
  117      * socket connected to a remote address.  Calls are multiplexed through this
  118      * socket: responses may be delivered out of order. */
  119     private class Connection extends Thread {
  120       private InetSocketAddress address;            // address of server
  121       private Socket socket = null;                 // connected socket
  122       private DataInputStream in;                   
  123       private DataOutputStream out;
  124       // currently active calls
  125       private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
  126       private Call readingCall;
  127       private Call writingCall;
  128       private int inUse = 0;
  129       private long lastActivity = 0;
  130       private boolean shouldCloseConnection = false;
  131   
  132       public Connection(InetSocketAddress address) throws IOException {
  133         if (address.isUnresolved()) {
  134           throw new UnknownHostException("unknown host: " + address.getHostName());
  135         }
  136         this.address = address;
  137         this.setName("IPC Client connection to " + address.toString());
  138         this.setDaemon(true);
  139       }
  140   
  141       public synchronized void setupIOstreams() throws IOException {
  142         if (socket != null) {
  143           notify();
  144           return;
  145         }
  146         short failures = 0;
  147         while (true) {
  148           try {
  149             this.socket = new Socket();
  150             this.socket.connect(address, FSConstants.READ_TIMEOUT);
  151             break;
  152           } catch (IOException ie) { //SocketTimeoutException is also caught 
  153             if (failures == maxRetries) {
  154               //reset inUse so that the culler gets a chance to throw this
  155               //connection object out of the table. We don't want to increment
  156               //inUse to infinity (everytime getConnection is called inUse is
  157               //incremented)!
  158               inUse = 0;
  159               // set socket to null so that the next call to setupIOstreams
  160               // can start the process of connect all over again.
  161               socket.close();
  162               socket = null;
  163               throw ie;
  164             }
  165             failures++;
  166             LOG.info("Retrying connect to server: " + address + 
  167                      ". Already tried " + failures + " time(s).");
  168             try { 
  169               Thread.sleep(1000);
  170             } catch (InterruptedException iex){
  171             }
  172           }
  173         }
  174         socket.setSoTimeout(timeout);
  175         this.in = new DataInputStream
  176           (new BufferedInputStream
  177            (new FilterInputStream(socket.getInputStream()) {
  178                public int read(byte[] buf, int off, int len) throws IOException {
  179                  int value = super.read(buf, off, len);
  180                  if (readingCall != null) {
  181                    readingCall.touch();
  182                  }
  183                  return value;
  184                }
  185              }));
  186         this.out = new DataOutputStream
  187           (new BufferedOutputStream
  188            (new FilterOutputStream(socket.getOutputStream()) {
  189                public void write(byte[] buf, int o, int len) throws IOException {
  190                  out.write(buf, o, len);
  191                  if (writingCall != null) {
  192                    writingCall.touch();
  193                  }
  194                }
  195              }));
  196         if (SEND_HEADER) {
  197           out.write(Server.HEADER.array());
  198           out.write(CURRENT_VERSION);
  199         }
  200         notify();
  201       }
  202   
  203       private synchronized boolean waitForWork() {
  204         //wait till someone signals us to start reading RPC response or
  205         //close the connection. If we are idle long enough (blocked in wait),
  206         //the ConnectionCuller thread will wake us up and ask us to close the
  207         //connection. 
  208         //We need to wait when inUse is 0 or socket is null (it may be null if
  209         //the Connection object has been created but the socket connection
  210         //has not been setup yet). We stop waiting if we have been asked to close
  211         //connection
  212         while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
  213           try {
  214             wait();
  215           } catch (InterruptedException e) {}
  216         }
  217         return !shouldCloseConnection;
  218       }
  219   
  220       private synchronized void incrementRef() {
  221         inUse++;
  222       }
  223   
  224       private synchronized void decrementRef() {
  225         lastActivity = System.currentTimeMillis();
  226         inUse--;
  227       }
  228   
  229       public synchronized boolean isIdle() {
  230         //check whether the connection is in use or just created
  231         if (inUse != 0) return false;
  232         long currTime = System.currentTimeMillis();
  233         if (currTime - lastActivity > maxIdleTime)
  234           return true;
  235         return false;
  236       }
  237   
  238       public InetSocketAddress getRemoteAddress() {
  239         return address;
  240       }
  241   
  242       public void setCloseConnection() {
  243         shouldCloseConnection = true;
  244       }
  245   
  246       public void run() {
  247         if (LOG.isDebugEnabled())
  248           LOG.debug(getName() + ": starting");
  249         try {
  250           while (running) {
  251             int id;
  252             //wait here for work - read connection or close connection
  253             if (waitForWork() == false)
  254               break;
  255             try {
  256               id = in.readInt();                    // try to read an id
  257             } catch (SocketTimeoutException e) {
  258               continue;
  259             }
  260   
  261             if (LOG.isDebugEnabled())
  262               LOG.debug(getName() + " got value #" + id);
  263   
  264             Call call = (Call)calls.remove(id);
  265             boolean isError = in.readBoolean();     // read if error
  266             if (isError) {
  267               call.setResult(null, WritableUtils.readString(in),
  268                              WritableUtils.readString(in));
  269             } else {
  270               Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
  271               try {
  272                 readingCall = call;
  273                 value.readFields(in);                 // read value
  274               } finally {
  275                 readingCall = null;
  276               }
  277               call.setResult(value, null, null);
  278             }
  279             call.callComplete();                   // deliver result to caller
  280             //received the response. So decrement the ref count
  281             decrementRef();
  282           }
  283         } catch (EOFException eof) {
  284           // This is what happens when the remote side goes down
  285         } catch (Exception e) {
  286           LOG.info(StringUtils.stringifyException(e));
  287         } finally {
  288           //If there was no exception thrown in this method, then the only
  289           //way we reached here is by breaking out of the while loop (after
  290           //waitForWork). And if we took that route to reach here, we have 
  291           //already removed the connection object in the ConnectionCuller thread.
  292           //We don't want to remove this again as some other thread might have
  293           //actually put a new Connection object in the table in the meantime.
  294           synchronized (connections) {
  295             if (connections.get(address) == this) {
  296               connections.remove(address);
  297             }
  298           }
  299           close();
  300         }
  301       }
  302   
  303       /** Initiates a call by sending the parameter to the remote server.
  304        * Note: this is not called from the Connection thread, but by other
  305        * threads.
  306        */
  307       public void sendParam(Call call) throws IOException {
  308         boolean error = true;
  309         try {
  310           calls.put(call.id, call);
  311           synchronized (out) {
  312             if (LOG.isDebugEnabled())
  313               LOG.debug(getName() + " sending #" + call.id);
  314             try {
  315               writingCall = call;
  316               DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
  317                                                            //data to be written
  318               d.writeInt(call.id);
  319               call.param.write(d);
  320               byte[] data = d.getData();
  321               int dataLength = d.getLength();
  322   
  323               out.writeInt(dataLength);      //first put the data length
  324               out.write(data, 0, dataLength);//write the data
  325               out.flush();
  326             } finally {
  327               writingCall = null;
  328             }
  329           }
  330           error = false;
  331         } finally {
  332           if (error) {
  333             synchronized (connections) {
  334               if (connections.get(address) == this)
  335                 connections.remove(address);
  336             }
  337             close();                                // close on error
  338           }
  339         }
  340       }  
  341   
  342       /** Close the connection. */
  343       public void close() {
  344         //socket may be null if the connection could not be established to the
  345         //server in question, and the culler asked us to close the connection
  346         if (socket == null) return;
  347         try {
  348           socket.close();                           // close socket
  349         } catch (IOException e) {}
  350         if (LOG.isDebugEnabled())
  351           LOG.debug(getName() + ": closing");
  352       }
  353     }
  354   
  355     /** Call implementation used for parallel calls. */
  356     private class ParallelCall extends Call {
  357       private ParallelResults results;
  358       private int index;
  359       
  360       public ParallelCall(Writable param, ParallelResults results, int index) {
  361         super(param);
  362         this.results = results;
  363         this.index = index;
  364       }
  365   
  366       /** Deliver result to result collector. */
  367       public void callComplete() {
  368         results.callComplete(this);
  369       }
  370     }
  371   
  372     /** Result collector for parallel calls. */
  373     private static class ParallelResults {
  374       private Writable[] values;
  375       private int size;
  376       private int count;
  377   
  378       public ParallelResults(int size) {
  379         this.values = new Writable[size];
  380         this.size = size;
  381       }
  382   
  383       /** Collect a result. */
  384       public synchronized void callComplete(ParallelCall call) {
  385         values[call.index] = call.value;            // store the value
  386         count++;                                    // count it
  387         if (count == size)                          // if all values are in
  388           notify();                                 // then notify waiting caller
  389       }
  390     }
  391   
  392     private class ConnectionCuller extends Thread {
  393   
  394       public static final int MIN_SLEEP_TIME = 1000;
  395   
  396       public void run() {
  397   
  398         LOG.debug(getName() + ": starting");
  399   
  400         while (running) {
  401           try {
  402             Thread.sleep(MIN_SLEEP_TIME);
  403           } catch (InterruptedException ie) {}
  404   
  405           synchronized (connections) {
  406             Iterator i = connections.values().iterator();
  407             while (i.hasNext()) {
  408               Connection c = (Connection)i.next();
  409               if (c.isIdle()) { 
  410                 //We don't actually close the socket here (i.e., don't invoke
  411                 //the close() method). We leave that work to the response receiver
  412                 //thread. The reason for that is since we have taken a lock on the
  413                 //connections table object, we don't want to slow down the entire
  414                 //system if we happen to talk to a slow server.
  415                 i.remove();
  416                 synchronized (c) {
  417                   c.setCloseConnection();
  418                   c.notify();
  419                 }
  420               }
  421             }
  422           }
  423         }
  424       }
  425     }
  426   
  427     /** Construct an IPC client whose values are of the given {@link Writable}
  428      * class. */
  429     public Client(Class valueClass, Configuration conf) {
  430       this.valueClass = valueClass;
  431       this.timeout = conf.getInt("ipc.client.timeout", 10000);
  432       this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
  433       this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
  434       this.conf = conf;
  435   
  436       Thread t = new ConnectionCuller();
  437       t.setDaemon(true);
  438       t.setName(valueClass.getName() + " Connection Culler");
  439       LOG.debug(valueClass.getName() + 
  440                 "Connection culler maxidletime= " + maxIdleTime + "ms");
  441       t.start();
  442     }
  443    
  444     /** Stop all threads related to this client.  No further calls may be made
  445      * using this client. */
  446     public void stop() {
  447       LOG.info("Stopping client");
  448       running = false;
  449     }
  450   
  451     /** Sets the timeout used for network i/o. */
  452     public void setTimeout(int timeout) { this.timeout = timeout; }
  453   
  454     /** Make a call, passing <code>param</code>, to the IPC server running at
  455      * <code>address</code>, returning the value.  Throws exceptions if there are
  456      * network problems or if the remote code threw an exception. */
  457     public Writable call(Writable param, InetSocketAddress address)
  458       throws InterruptedException, IOException {
  459       Connection connection = getConnection(address);
  460       Call call = new Call(param);
  461       synchronized (call) {
  462         connection.sendParam(call);                 // send the parameter
  463         long wait = timeout;
  464         do {
  465           call.wait(wait);                       // wait for the result
  466           wait = timeout - (System.currentTimeMillis() - call.lastActivity);
  467         } while (!call.done && wait > 0);
  468   
  469         if (call.error != null) {
  470           throw new RemoteException(call.errorClass, call.error);
  471         } else if (!call.done) {
  472           throw new SocketTimeoutException("timed out waiting for rpc response");
  473         } else {
  474           return call.value;
  475         }
  476       }
  477     }
  478   
  479     /** Makes a set of calls in parallel.  Each parameter is sent to the
  480      * corresponding address.  When all values are available, or have timed out
  481      * or errored, the collected results are returned in an array.  The array
  482      * contains nulls for calls that timed out or errored.  */
  483     public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
  484       throws IOException {
  485       if (addresses.length == 0) return new Writable[0];
  486   
  487       ParallelResults results = new ParallelResults(params.length);
  488       synchronized (results) {
  489         for (int i = 0; i < params.length; i++) {
  490           ParallelCall call = new ParallelCall(params[i], results, i);
  491           try {
  492             Connection connection = getConnection(addresses[i]);
  493             connection.sendParam(call);             // send each parameter
  494           } catch (IOException e) {
  495             LOG.info("Calling "+addresses[i]+" caught: " + 
  496                      StringUtils.stringifyException(e)); // log errors
  497             results.size--;                         //  wait for one fewer result
  498           }
  499         }
  500         try {
  501           results.wait(timeout);                    // wait for all results
  502         } catch (InterruptedException e) {}
  503   
  504         if (results.count == 0) {
  505           throw new IOException("no responses");
  506         } else {
  507           return results.values;
  508         }
  509       }
  510     }
  511   
  512     /** Get a connection from the pool, or create a new one and add it to the
  513      * pool.  Connections to a given host/port are reused. */
  514     private Connection getConnection(InetSocketAddress address)
  515       throws IOException {
  516       Connection connection;
  517       synchronized (connections) {
  518         connection = (Connection)connections.get(address);
  519         if (connection == null) {
  520           connection = new Connection(address);
  521           connections.put(address, connection);
  522           connection.start();
  523         }
  524         connection.incrementRef();
  525       }
  526       //we don't invoke the method below inside "synchronized (connections)"
  527       //block above. The reason for that is if the server happens to be slow,
  528       //it will take longer to establish a connection and that will slow the
  529       //entire system down.
  530       connection.setupIOstreams();
  531       return connection;
  532     }
  533   
  534   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » ipc » [javadoc | source]