1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.ipc;
20
21 import java.net.Socket;
22 import java.net.InetSocketAddress;
23 import java.net.SocketTimeoutException;
24 import java.net.UnknownHostException;
25
26 import java.io.IOException;
27 import java.io.EOFException;
28 import java.io.DataInputStream;
29 import java.io.DataOutputStream;
30 import java.io.BufferedInputStream;
31 import java.io.BufferedOutputStream;
32 import java.io.FilterInputStream;
33 import java.io.FilterOutputStream;
34 import java.io.OutputStream;
35
36 import java.util.Hashtable;
37 import java.util.Iterator;
38
39 import org.apache.commons.logging;
40
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.conf.Configurable;
43 import org.apache.hadoop.dfs.FSConstants;
44 import org.apache.hadoop.io.Writable;
45 import org.apache.hadoop.io.WritableUtils;
46 import org.apache.hadoop.io.DataOutputBuffer;
47 import org.apache.hadoop.util.ReflectionUtils;
48 import org.apache.hadoop.util.StringUtils;
49
50 /** A client for an IPC service. IPC calls take a single {@link Writable} as a
51 * parameter, and return a {@link Writable} as their value. A service runs on
52 * a port and is defined by a parameter class and a value class.
53 *
54 * @see Server
55 */
56 public class Client {
57 /** Should the client send the header on the connection? */
58 private static final boolean SEND_HEADER = true;
59 private static final byte CURRENT_VERSION = 0;
60
61 public static final Log LOG =
62 LogFactory.getLog("org.apache.hadoop.ipc.Client");
63 private Hashtable<InetSocketAddress, Connection> connections =
64 new Hashtable<InetSocketAddress, Connection>();
65
66 private Class valueClass; // class of call values
67 private int timeout;// timeout for calls
68 private int counter; // counter for call ids
69 private boolean running = true; // true while client runs
70 private Configuration conf;
71 private int maxIdleTime; //connections will be culled if it was idle for
72 //maxIdleTime msecs
73 private int maxRetries; //the max. no. of retries for socket connections
74
75 /** A call waiting for a value. */
76 private class Call {
77 int id; // call id
78 Writable param; // parameter
79 Writable value; // value, null if error
80 String error; // exception, null if value
81 String errorClass; // class of exception
82 long lastActivity; // time of last i/o
83 boolean done; // true when call is done
84
85 protected Call(Writable param) {
86 this.param = param;
87 synchronized (Client.this) {
88 this.id = counter++;
89 }
90 touch();
91 }
92
93 /** Called by the connection thread when the call is complete and the
94 * value or error string are available. Notifies by default. */
95 public synchronized void callComplete() {
96 notify(); // notify caller
97 }
98
99 /** Update lastActivity with the current time. */
100 public synchronized void touch() {
101 lastActivity = System.currentTimeMillis();
102 }
103
104 /** Update lastActivity with the current time. */
105 public synchronized void setResult(Writable value,
106 String errorClass,
107 String error) {
108 this.value = value;
109 this.error = error;
110 this.errorClass =errorClass;
111 this.done = true;
112 }
113
114 }
115
116 /** Thread that reads responses and notifies callers. Each connection owns a
117 * socket connected to a remote address. Calls are multiplexed through this
118 * socket: responses may be delivered out of order. */
119 private class Connection extends Thread {
120 private InetSocketAddress address; // address of server
121 private Socket socket = null; // connected socket
122 private DataInputStream in;
123 private DataOutputStream out;
124 // currently active calls
125 private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
126 private Call readingCall;
127 private Call writingCall;
128 private int inUse = 0;
129 private long lastActivity = 0;
130 private boolean shouldCloseConnection = false;
131
132 public Connection(InetSocketAddress address) throws IOException {
133 if (address.isUnresolved()) {
134 throw new UnknownHostException("unknown host: " + address.getHostName());
135 }
136 this.address = address;
137 this.setName("IPC Client connection to " + address.toString());
138 this.setDaemon(true);
139 }
140
141 public synchronized void setupIOstreams() throws IOException {
142 if (socket != null) {
143 notify();
144 return;
145 }
146 short failures = 0;
147 while (true) {
148 try {
149 this.socket = new Socket();
150 this.socket.connect(address, FSConstants.READ_TIMEOUT);
151 break;
152 } catch (IOException ie) { //SocketTimeoutException is also caught
153 if (failures == maxRetries) {
154 //reset inUse so that the culler gets a chance to throw this
155 //connection object out of the table. We don't want to increment
156 //inUse to infinity (everytime getConnection is called inUse is
157 //incremented)!
158 inUse = 0;
159 // set socket to null so that the next call to setupIOstreams
160 // can start the process of connect all over again.
161 socket.close();
162 socket = null;
163 throw ie;
164 }
165 failures++;
166 LOG.info("Retrying connect to server: " + address +
167 ". Already tried " + failures + " time(s).");
168 try {
169 Thread.sleep(1000);
170 } catch (InterruptedException iex){
171 }
172 }
173 }
174 socket.setSoTimeout(timeout);
175 this.in = new DataInputStream
176 (new BufferedInputStream
177 (new FilterInputStream(socket.getInputStream()) {
178 public int read(byte[] buf, int off, int len) throws IOException {
179 int value = super.read(buf, off, len);
180 if (readingCall != null) {
181 readingCall.touch();
182 }
183 return value;
184 }
185 }));
186 this.out = new DataOutputStream
187 (new BufferedOutputStream
188 (new FilterOutputStream(socket.getOutputStream()) {
189 public void write(byte[] buf, int o, int len) throws IOException {
190 out.write(buf, o, len);
191 if (writingCall != null) {
192 writingCall.touch();
193 }
194 }
195 }));
196 if (SEND_HEADER) {
197 out.write(Server.HEADER.array());
198 out.write(CURRENT_VERSION);
199 }
200 notify();
201 }
202
203 private synchronized boolean waitForWork() {
204 //wait till someone signals us to start reading RPC response or
205 //close the connection. If we are idle long enough (blocked in wait),
206 //the ConnectionCuller thread will wake us up and ask us to close the
207 //connection.
208 //We need to wait when inUse is 0 or socket is null (it may be null if
209 //the Connection object has been created but the socket connection
210 //has not been setup yet). We stop waiting if we have been asked to close
211 //connection
212 while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
213 try {
214 wait();
215 } catch (InterruptedException e) {}
216 }
217 return !shouldCloseConnection;
218 }
219
220 private synchronized void incrementRef() {
221 inUse++;
222 }
223
224 private synchronized void decrementRef() {
225 lastActivity = System.currentTimeMillis();
226 inUse--;
227 }
228
229 public synchronized boolean isIdle() {
230 //check whether the connection is in use or just created
231 if (inUse != 0) return false;
232 long currTime = System.currentTimeMillis();
233 if (currTime - lastActivity > maxIdleTime)
234 return true;
235 return false;
236 }
237
238 public InetSocketAddress getRemoteAddress() {
239 return address;
240 }
241
242 public void setCloseConnection() {
243 shouldCloseConnection = true;
244 }
245
246 public void run() {
247 if (LOG.isDebugEnabled())
248 LOG.debug(getName() + ": starting");
249 try {
250 while (running) {
251 int id;
252 //wait here for work - read connection or close connection
253 if (waitForWork() == false)
254 break;
255 try {
256 id = in.readInt(); // try to read an id
257 } catch (SocketTimeoutException e) {
258 continue;
259 }
260
261 if (LOG.isDebugEnabled())
262 LOG.debug(getName() + " got value #" + id);
263
264 Call call = (Call)calls.remove(id);
265 boolean isError = in.readBoolean(); // read if error
266 if (isError) {
267 call.setResult(null, WritableUtils.readString(in),
268 WritableUtils.readString(in));
269 } else {
270 Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
271 try {
272 readingCall = call;
273 value.readFields(in); // read value
274 } finally {
275 readingCall = null;
276 }
277 call.setResult(value, null, null);
278 }
279 call.callComplete(); // deliver result to caller
280 //received the response. So decrement the ref count
281 decrementRef();
282 }
283 } catch (EOFException eof) {
284 // This is what happens when the remote side goes down
285 } catch (Exception e) {
286 LOG.info(StringUtils.stringifyException(e));
287 } finally {
288 //If there was no exception thrown in this method, then the only
289 //way we reached here is by breaking out of the while loop (after
290 //waitForWork). And if we took that route to reach here, we have
291 //already removed the connection object in the ConnectionCuller thread.
292 //We don't want to remove this again as some other thread might have
293 //actually put a new Connection object in the table in the meantime.
294 synchronized (connections) {
295 if (connections.get(address) == this) {
296 connections.remove(address);
297 }
298 }
299 close();
300 }
301 }
302
303 /** Initiates a call by sending the parameter to the remote server.
304 * Note: this is not called from the Connection thread, but by other
305 * threads.
306 */
307 public void sendParam(Call call) throws IOException {
308 boolean error = true;
309 try {
310 calls.put(call.id, call);
311 synchronized (out) {
312 if (LOG.isDebugEnabled())
313 LOG.debug(getName() + " sending #" + call.id);
314 try {
315 writingCall = call;
316 DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
317 //data to be written
318 d.writeInt(call.id);
319 call.param.write(d);
320 byte[] data = d.getData();
321 int dataLength = d.getLength();
322
323 out.writeInt(dataLength); //first put the data length
324 out.write(data, 0, dataLength);//write the data
325 out.flush();
326 } finally {
327 writingCall = null;
328 }
329 }
330 error = false;
331 } finally {
332 if (error) {
333 synchronized (connections) {
334 if (connections.get(address) == this)
335 connections.remove(address);
336 }
337 close(); // close on error
338 }
339 }
340 }
341
342 /** Close the connection. */
343 public void close() {
344 //socket may be null if the connection could not be established to the
345 //server in question, and the culler asked us to close the connection
346 if (socket == null) return;
347 try {
348 socket.close(); // close socket
349 } catch (IOException e) {}
350 if (LOG.isDebugEnabled())
351 LOG.debug(getName() + ": closing");
352 }
353 }
354
355 /** Call implementation used for parallel calls. */
356 private class ParallelCall extends Call {
357 private ParallelResults results;
358 private int index;
359
360 public ParallelCall(Writable param, ParallelResults results, int index) {
361 super(param);
362 this.results = results;
363 this.index = index;
364 }
365
366 /** Deliver result to result collector. */
367 public void callComplete() {
368 results.callComplete(this);
369 }
370 }
371
372 /** Result collector for parallel calls. */
373 private static class ParallelResults {
374 private Writable[] values;
375 private int size;
376 private int count;
377
378 public ParallelResults(int size) {
379 this.values = new Writable[size];
380 this.size = size;
381 }
382
383 /** Collect a result. */
384 public synchronized void callComplete(ParallelCall call) {
385 values[call.index] = call.value; // store the value
386 count++; // count it
387 if (count == size) // if all values are in
388 notify(); // then notify waiting caller
389 }
390 }
391
392 private class ConnectionCuller extends Thread {
393
394 public static final int MIN_SLEEP_TIME = 1000;
395
396 public void run() {
397
398 LOG.debug(getName() + ": starting");
399
400 while (running) {
401 try {
402 Thread.sleep(MIN_SLEEP_TIME);
403 } catch (InterruptedException ie) {}
404
405 synchronized (connections) {
406 Iterator i = connections.values().iterator();
407 while (i.hasNext()) {
408 Connection c = (Connection)i.next();
409 if (c.isIdle()) {
410 //We don't actually close the socket here (i.e., don't invoke
411 //the close() method). We leave that work to the response receiver
412 //thread. The reason for that is since we have taken a lock on the
413 //connections table object, we don't want to slow down the entire
414 //system if we happen to talk to a slow server.
415 i.remove();
416 synchronized (c) {
417 c.setCloseConnection();
418 c.notify();
419 }
420 }
421 }
422 }
423 }
424 }
425 }
426
427 /** Construct an IPC client whose values are of the given {@link Writable}
428 * class. */
429 public Client(Class valueClass, Configuration conf) {
430 this.valueClass = valueClass;
431 this.timeout = conf.getInt("ipc.client.timeout", 10000);
432 this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
433 this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
434 this.conf = conf;
435
436 Thread t = new ConnectionCuller();
437 t.setDaemon(true);
438 t.setName(valueClass.getName() + " Connection Culler");
439 LOG.debug(valueClass.getName() +
440 "Connection culler maxidletime= " + maxIdleTime + "ms");
441 t.start();
442 }
443
444 /** Stop all threads related to this client. No further calls may be made
445 * using this client. */
446 public void stop() {
447 LOG.info("Stopping client");
448 running = false;
449 }
450
451 /** Sets the timeout used for network i/o. */
452 public void setTimeout(int timeout) { this.timeout = timeout; }
453
454 /** Make a call, passing <code>param</code>, to the IPC server running at
455 * <code>address</code>, returning the value. Throws exceptions if there are
456 * network problems or if the remote code threw an exception. */
457 public Writable call(Writable param, InetSocketAddress address)
458 throws InterruptedException, IOException {
459 Connection connection = getConnection(address);
460 Call call = new Call(param);
461 synchronized (call) {
462 connection.sendParam(call); // send the parameter
463 long wait = timeout;
464 do {
465 call.wait(wait); // wait for the result
466 wait = timeout - (System.currentTimeMillis() - call.lastActivity);
467 } while (!call.done && wait > 0);
468
469 if (call.error != null) {
470 throw new RemoteException(call.errorClass, call.error);
471 } else if (!call.done) {
472 throw new SocketTimeoutException("timed out waiting for rpc response");
473 } else {
474 return call.value;
475 }
476 }
477 }
478
479 /** Makes a set of calls in parallel. Each parameter is sent to the
480 * corresponding address. When all values are available, or have timed out
481 * or errored, the collected results are returned in an array. The array
482 * contains nulls for calls that timed out or errored. */
483 public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
484 throws IOException {
485 if (addresses.length == 0) return new Writable[0];
486
487 ParallelResults results = new ParallelResults(params.length);
488 synchronized (results) {
489 for (int i = 0; i < params.length; i++) {
490 ParallelCall call = new ParallelCall(params[i], results, i);
491 try {
492 Connection connection = getConnection(addresses[i]);
493 connection.sendParam(call); // send each parameter
494 } catch (IOException e) {
495 LOG.info("Calling "+addresses[i]+" caught: " +
496 StringUtils.stringifyException(e)); // log errors
497 results.size--; // wait for one fewer result
498 }
499 }
500 try {
501 results.wait(timeout); // wait for all results
502 } catch (InterruptedException e) {}
503
504 if (results.count == 0) {
505 throw new IOException("no responses");
506 } else {
507 return results.values;
508 }
509 }
510 }
511
512 /** Get a connection from the pool, or create a new one and add it to the
513 * pool. Connections to a given host/port are reused. */
514 private Connection getConnection(InetSocketAddress address)
515 throws IOException {
516 Connection connection;
517 synchronized (connections) {
518 connection = (Connection)connections.get(address);
519 if (connection == null) {
520 connection = new Connection(address);
521 connections.put(address, connection);
522 connection.start();
523 }
524 connection.incrementRef();
525 }
526 //we don't invoke the method below inside "synchronized (connections)"
527 //block above. The reason for that is if the server happens to be slow,
528 //it will take longer to establish a connection and that will slow the
529 //entire system down.
530 connection.setupIOstreams();
531 return connection;
532 }
533
534 }