1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.ipc;
20
21 import java.lang.reflect.Proxy;
22 import java.lang.reflect.Method;
23 import java.lang.reflect.Array;
24 import java.lang.reflect.InvocationHandler;
25 import java.lang.reflect.InvocationTargetException;
26
27 import java.net.ConnectException;
28 import java.net.InetSocketAddress;
29 import java.net.SocketTimeoutException;
30 import java.io;
31
32 import org.apache.commons.logging;
33
34 import org.apache.hadoop.io;
35 import org.apache.hadoop.conf;
36
37 /** A simple RPC mechanism.
38 *
39 * A <i>protocol</i> is a Java interface. All parameters and return types must
40 * be one of:
41 *
42 * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
43 * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
44 * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
45 *
46 * <li>a {@link String}; or</li>
47 *
48 * <li>a {@link Writable}; or</li>
49 *
50 * <li>an array of the above types</li> </ul>
51 *
52 * All methods in the protocol should throw only IOException. No field data of
53 * the protocol instance is transmitted.
54 */
55 public class RPC {
56 private static final Log LOG =
57 LogFactory.getLog("org.apache.hadoop.ipc.RPC");
58
59 private RPC() {} // no public ctor
60
61
62 /** A method invocation, including the method name and its parameters.*/
63 private static class Invocation implements Writable, Configurable {
64 private String methodName;
65 private Class[] parameterClasses;
66 private Object[] parameters;
67 private Configuration conf;
68
69 public Invocation() {}
70
71 public Invocation(Method method, Object[] parameters) {
72 this.methodName = method.getName();
73 this.parameterClasses = method.getParameterTypes();
74 this.parameters = parameters;
75 }
76
77 /** The name of the method invoked. */
78 public String getMethodName() { return methodName; }
79
80 /** The parameter classes. */
81 public Class[] getParameterClasses() { return parameterClasses; }
82
83 /** The parameter instances. */
84 public Object[] getParameters() { return parameters; }
85
86 public void readFields(DataInput in) throws IOException {
87 methodName = UTF8.readString(in);
88 parameters = new Object[in.readInt()];
89 parameterClasses = new Class[parameters.length];
90 ObjectWritable objectWritable = new ObjectWritable();
91 for (int i = 0; i < parameters.length; i++) {
92 parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
93 parameterClasses[i] = objectWritable.getDeclaredClass();
94 }
95 }
96
97 public void write(DataOutput out) throws IOException {
98 UTF8.writeString(out, methodName);
99 out.writeInt(parameterClasses.length);
100 for (int i = 0; i < parameterClasses.length; i++) {
101 ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
102 conf);
103 }
104 }
105
106 public String toString() {
107 StringBuffer buffer = new StringBuffer();
108 buffer.append(methodName);
109 buffer.append("(");
110 for (int i = 0; i < parameters.length; i++) {
111 if (i != 0)
112 buffer.append(", ");
113 buffer.append(parameters[i]);
114 }
115 buffer.append(")");
116 return buffer.toString();
117 }
118
119 public void setConf(Configuration conf) {
120 this.conf = conf;
121 }
122
123 public Configuration getConf() {
124 return this.conf;
125 }
126
127 }
128
129 private static Client CLIENT;
130
131 private static synchronized Client getClient(Configuration conf) {
132 // Construct & cache client. The configuration is only used for timeout,
133 // and Clients have connection pools. So we can either (a) lose some
134 // connection pooling and leak sockets, or (b) use the same timeout for all
135 // configurations. Since the IPC is usually intended globally, not
136 // per-job, we choose (a).
137 if (CLIENT == null) {
138 CLIENT = new Client(ObjectWritable.class, conf);
139 }
140 return CLIENT;
141 }
142
143 /**
144 * Stop all RPC client connections
145 */
146 public static synchronized void stopClient(){
147 if (CLIENT != null) {
148 CLIENT.stop();
149 CLIENT = null;
150 }
151 }
152
153 private static class Invoker implements InvocationHandler {
154 private InetSocketAddress address;
155 private Client client;
156
157 public Invoker(InetSocketAddress address, Configuration conf) {
158 this.address = address;
159 this.client = getClient(conf);
160 }
161
162 public Object invoke(Object proxy, Method method, Object[] args)
163 throws Throwable {
164 long startTime = System.currentTimeMillis();
165 ObjectWritable value = (ObjectWritable)
166 client.call(new Invocation(method, args), address);
167 long callTime = System.currentTimeMillis() - startTime;
168 LOG.debug("Call: " + method.getName() + " " + callTime);
169 return value.get();
170 }
171 }
172
173 /**
174 * A version mismatch for the RPC protocol.
175 */
176 public static class VersionMismatch extends IOException {
177 private String interfaceName;
178 private long clientVersion;
179 private long serverVersion;
180
181 /**
182 * Create a version mismatch exception
183 * @param interfaceName the name of the protocol mismatch
184 * @param clientVersion the client's version of the protocol
185 * @param serverVersion the server's version of the protocol
186 */
187 public VersionMismatch(String interfaceName, long clientVersion,
188 long serverVersion) {
189 super("Protocol " + interfaceName + " version mismatch. (client = " +
190 clientVersion + ", server = " + serverVersion + ")");
191 this.interfaceName = interfaceName;
192 this.clientVersion = clientVersion;
193 this.serverVersion = serverVersion;
194 }
195
196 /**
197 * Get the interface name
198 * @return the java class name
199 * (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
200 */
201 public String getInterfaceName() {
202 return interfaceName;
203 }
204
205 /**
206 * Get the client's prefered version
207 */
208 public long getClientVersion() {
209 return clientVersion;
210 }
211
212 /**
213 * Get the server's agreed to version.
214 */
215 public long getServerVersion() {
216 return serverVersion;
217 }
218 }
219
220 public static VersionedProtocol waitForProxy(Class protocol,
221 long clientVersion,
222 InetSocketAddress addr,
223 Configuration conf
224 ) throws IOException {
225 while (true) {
226 try {
227 return getProxy(protocol, clientVersion, addr, conf);
228 } catch(ConnectException se) { // namenode has not been started
229 LOG.info("Server at " + addr + " not available yet, Zzzzz...");
230 } catch(SocketTimeoutException te) { // namenode is busy
231 LOG.info("Problem connecting to server: " + addr);
232 }
233 try {
234 Thread.sleep(1000);
235 } catch (InterruptedException ie) {
236 // IGNORE
237 }
238 }
239 }
240 /** Construct a client-side proxy object that implements the named protocol,
241 * talking to a server at the named address. */
242 public static VersionedProtocol getProxy(Class protocol, long clientVersion,
243 InetSocketAddress addr, Configuration conf) throws IOException {
244 VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
245 protocol.getClassLoader(),
246 new Class[] { protocol },
247 new Invoker(addr, conf));
248 long serverVersion = proxy.getProtocolVersion(protocol.getName(),
249 clientVersion);
250 if (serverVersion == clientVersion) {
251 return proxy;
252 } else {
253 throw new VersionMismatch(protocol.getName(), clientVersion,
254 serverVersion);
255 }
256 }
257
258 /** Expert: Make multiple, parallel calls to a set of servers. */
259 public static Object[] call(Method method, Object[][] params,
260 InetSocketAddress[] addrs, Configuration conf)
261 throws IOException {
262
263 Invocation[] invocations = new Invocation[params.length];
264 for (int i = 0; i < params.length; i++)
265 invocations[i] = new Invocation(method, params[i]);
266 Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
267
268 if (method.getReturnType() == Void.TYPE) {
269 return null;
270 }
271
272 Object[] values =
273 (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
274 for (int i = 0; i < values.length; i++)
275 if (wrappedValues[i] != null)
276 values[i] = ((ObjectWritable)wrappedValues[i]).get();
277
278 return values;
279 }
280
281 /** Construct a server for a protocol implementation instance listening on a
282 * port and address. */
283 public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
284 throws IOException {
285 return getServer(instance, bindAddress, port, 1, false, conf);
286 }
287
288 /** Construct a server for a protocol implementation instance listening on a
289 * port and address. */
290 public static Server getServer(final Object instance, final String bindAddress, final int port,
291 final int numHandlers,
292 final boolean verbose, Configuration conf)
293 throws IOException {
294 return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
295 }
296
297 /** An RPC Server. */
298 public static class Server extends org.apache.hadoop.ipc.Server {
299 private Object instance;
300 private Class<?> implementation;
301 private boolean verbose;
302
303 /** Construct an RPC server.
304 * @param instance the instance whose methods will be called
305 * @param conf the configuration to use
306 * @param bindAddress the address to bind on to listen for connection
307 * @param port the port to listen for connections on
308 */
309 public Server(Object instance, Configuration conf, String bindAddress, int port)
310 throws IOException {
311 this(instance, conf, bindAddress, port, 1, false);
312 }
313
314 /** Construct an RPC server.
315 * @param instance the instance whose methods will be called
316 * @param conf the configuration to use
317 * @param bindAddress the address to bind on to listen for connection
318 * @param port the port to listen for connections on
319 * @param numHandlers the number of method handler threads to run
320 * @param verbose whether each call should be logged
321 */
322 public Server(Object instance, Configuration conf, String bindAddress, int port,
323 int numHandlers, boolean verbose) throws IOException {
324 super(bindAddress, port, Invocation.class, numHandlers, conf);
325 this.instance = instance;
326 this.implementation = instance.getClass();
327 this.verbose = verbose;
328 }
329
330 public Writable call(Writable param) throws IOException {
331 try {
332 Invocation call = (Invocation)param;
333 if (verbose) log("Call: " + call);
334
335 Method method =
336 implementation.getMethod(call.getMethodName(),
337 call.getParameterClasses());
338
339 long startTime = System.currentTimeMillis();
340 Object value = method.invoke(instance, call.getParameters());
341 long callTime = System.currentTimeMillis() - startTime;
342 LOG.debug("Served: " + call.getMethodName() + " " + callTime);
343 if (verbose) log("Return: "+value);
344
345 return new ObjectWritable(method.getReturnType(), value);
346
347 } catch (InvocationTargetException e) {
348 Throwable target = e.getTargetException();
349 if (target instanceof IOException) {
350 throw (IOException)target;
351 } else {
352 IOException ioe = new IOException(target.toString());
353 ioe.setStackTrace(target.getStackTrace());
354 throw ioe;
355 }
356 } catch (Throwable e) {
357 IOException ioe = new IOException(e.toString());
358 ioe.setStackTrace(e.getStackTrace());
359 throw ioe;
360 }
361 }
362 }
363
364 private static void log(String value) {
365 if (value!= null && value.length() > 55)
366 value = value.substring(0, 55)+"...";
367 LOG.info(value);
368 }
369
370 }