- Server:继承org.apache.hadoop.ipc.Server(Hadoop学习九:Hadoop-hdfs RPC源码 Server)。我们称之为RPC Server。
/** An RPC Server. */ public static class Server extends org.apache.hadoop.ipc.Server { //创建一个RPC server //注意调用父类IP.Server的构造函数时传入的就是Invocation.class public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()), secretManager); } //重写ipc.Server的call方法 public Writable call(Class<?> protocol, Writable param, long receivedTime){ Invocation call = (Invocation)param; if (verbose) log("Call: " + call); Method method = protocol.getMethod(call.getMethodName(), method.setAccessible(true); long startTime = System.currentTimeMillis(); //真正执行远程命令就体现在这里:client的方法最终在server上被执行,就是所谓的rpc Object value = method.invoke(instance, call.getParameters()); return new ObjectWritable(method.getReturnType(), value); } }
- Invocation: 我要在远程server上运行一条命令,也就是一个方法,我们把这个方法的方法名,参数,类型封装成一个Invocation对象。
//每次方法调用都会实例化一个Invocation对象 //将方法的方法名,参数类型,值封装成一个Invocation对象 private static class Invocation implements Writable, Configurable { private String methodName; private Class[] parameterClasses; private Object[] parameters; private Configuration conf; public Invocation(Method method, Object[] parameters) { this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; } }
- Invoker: 继承InvocationHandler,重写了invoke方法,invoke方法里面就是IPC Client向IPC Server发送Invocation。
//java反射 private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; //只有内部RPC.getProx时调用 private Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf); this.client = CLIENTS.getClient(conf, factory); } //重写invoke方法 //每次方法调用都将封装成Invocation对象,被发往server端 //返回server端运行此方法的结果 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);//client向server发生消息 if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } }
- ClientCache:缓存IPC Client对象。
- RPC:前两章学习了IPC Client和IPC Server,RPC就是综合了这两者以及java反射机制,为我们提供了许多静态方法,我们只需要调用RPC.*,即可获得代理类,而不需要关心它们是怎么完成的。
public class RPC { private RPC() {} // no public ctor //相比getProxy,waitForProxy肯定能获得一个代理类VersionedProtocol static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, long connTimeout) throws IOException { return getProxy(protocol, clientVersion, addr, conf, rpcTimeout); catch(ConnectException se) { // namenode has not been started LOG.info("Server at " + addr + " not available yet, Zzzzz..."); //你是在卖萌 } } //获得代理类,剩下的只需用代理类执行命令就行了 public static VersionedProtocol getProxy( Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); //反射获得代理类 VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker); long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); //协议检查 if (serverVersion == clientVersion) { return proxy; } } //每次发送一组方法调用到指定server //并没有经过反射机制 //而是直接发送方法名,参数类型,值到指定server;并获取server执行这些方法的结果集 //如果你确实有一组远程命令要执行,并且知道这些命令的的方法名,参数类型,值,并为每个命令指定执行server,那你可以调用此方法 public static Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) throws IOException, InterruptedException { //实例化Invocation Invocation[] invocations = new Invocation[params.length]; for (int i = 0; i < params.length; i++) invocations[i] = new Invocation(method, params[i]); Client client = CLIENTS.getClient(conf); try {//发生Invocation Writable[] wrappedValues = client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf); if (method.getReturnType() == Void.TYPE) { return null; } Object[] values = (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); for (int i = 0; i < values.length; i++) if (wrappedValues[i] != null) values[i] = ((ObjectWritable)wrappedValues[i]).get(); return values; } finally { CLIENTS.stopClient(client); } } //获得RPC Server public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); } }
Hadoop学习十:Hadoop-Hdfs RPC源码 RPC
一.RPC类图
二.详细描述
相关推荐
Runtimeclass 2020-10-20
Martian 2020-10-13
GimmeS 2020-06-17
范群松 2020-06-11
jerry00 2020-06-11
舍我其谁 2020-04-25
xuedabao 2020-04-10
chvnetcom 2020-02-19
rareli 2020-02-16
newfarhui 2020-01-30
jannal 2020-01-18
aNian 2020-01-18
jannal 2020-01-16
zhangll00 2019-12-28
风之翊 2019-12-29
XiaoqiangNan 2020-01-08
bapinggaitianli 2020-01-01
大步流星 2019-12-24
Ggaomiss 2019-12-14