Hadoop学习十:Hadoop-Hdfs RPC源码 RPC

一.RPC类图

Hadoop学习十:Hadoop-Hdfs RPC源码 RPC

二.详细描述

  1. Server:继承org.apache.hadoop.ipc.Server(Hadoop学习九:Hadoop-hdfs RPC源码 Server)。我们称之为RPC Server。
    /** An RPC Server. */
      public static class Server extends org.apache.hadoop.ipc.Server {
        //创建一个RPC server
        //注意调用父类IP.Server的构造函数时传入的就是Invocation.class
        public Server(Object instance, Configuration conf, String bindAddress,  int port,
                      int numHandlers, boolean verbose, 
                      SecretManager<? extends TokenIdentifier> secretManager) 
            throws IOException {
          super(bindAddress, port, Invocation.class, numHandlers, conf,
              classNameBase(instance.getClass().getName()), secretManager);
        }
    
        //重写ipc.Server的call方法
        public Writable call(Class<?> protocol, Writable param, long receivedTime){
            Invocation call = (Invocation)param;
            if (verbose) log("Call: " + call);
    
            Method method = protocol.getMethod(call.getMethodName(),
            method.setAccessible(true);
            long startTime = System.currentTimeMillis();
            //真正执行远程命令就体现在这里:client的方法最终在server上被执行,就是所谓的rpc
            Object value = method.invoke(instance, call.getParameters());
    
            return new ObjectWritable(method.getReturnType(), value);
        }
      }
  2. Invocation: 我要在远程server上运行一条命令,也就是一个方法,我们把这个方法的方法名,参数,类型封装成一个Invocation对象。
    //每次方法调用都会实例化一个Invocation对象
       //将方法的方法名,参数类型,值封装成一个Invocation对象
      private static class Invocation implements Writable, Configurable {
        private String methodName;
        private Class[] parameterClasses;
        private Object[] parameters;
        private Configuration conf;
    
        public Invocation(Method method, Object[] parameters) {
          this.methodName = method.getName();
          this.parameterClasses = method.getParameterTypes();
          this.parameters = parameters;
        }
      }
  3. Invoker: 继承InvocationHandler,重写了invoke方法,invoke方法里面就是IPC Client向IPC Server发送Invocation。
    //java反射
      private static class Invoker implements InvocationHandler {
        private Client.ConnectionId remoteId;
        private Client client;
        private boolean isClosed = false;
        //只有内部RPC.getProx时调用
        private Invoker(Class<? extends VersionedProtocol> protocol,
            InetSocketAddress address, UserGroupInformation ticket,
            Configuration conf, SocketFactory factory,
            int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
          this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
              ticket, rpcTimeout, connectionRetryPolicy, conf);
          this.client = CLIENTS.getClient(conf, factory);
        }
    
        //重写invoke方法
        //每次方法调用都将封装成Invocation对象,被发往server端
        //返回server端运行此方法的结果
        public Object invoke(Object proxy, Method method, Object[] args)
          throws Throwable {
          ObjectWritable value = (ObjectWritable)
            client.call(new Invocation(method, args), remoteId);//client向server发生消息
          if (logDebug) {
            long callTime = System.currentTimeMillis() - startTime;
            LOG.debug("Call: " + method.getName() + " " + callTime);
          }
          return value.get();
        }
      }
  4. ClientCache:缓存IPC Client对象。
  5. RPC:前两章学习了IPC Client和IPC Server,RPC就是综合了这两者以及java反射机制,为我们提供了许多静态方法,我们只需要调用RPC.*,即可获得代理类,而不需要关心它们是怎么完成的。
    public class RPC {
      private RPC() {}                                  // no public ctor
      
      //相比getProxy,waitForProxy肯定能获得一个代理类VersionedProtocol
      static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol,
                                                   long clientVersion,
                                                   InetSocketAddress addr,
                                                   Configuration conf,
                                                   int rpcTimeout,
                                                   long connTimeout)
                                                   throws IOException { 
            return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
            catch(ConnectException se) {  // namenode has not been started
            LOG.info("Server at " + addr + " not available yet, Zzzzz..."); //你是在卖萌
        }
      }
    
      //获得代理类,剩下的只需用代理类执行命令就行了
      public static VersionedProtocol getProxy(
          Class<? extends VersionedProtocol> protocol,
          long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
          Configuration conf, SocketFactory factory, int rpcTimeout,
          RetryPolicy connectionRetryPolicy) throws IOException {
    
        final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
            rpcTimeout, connectionRetryPolicy);
        //反射获得代理类
        VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[]{protocol}, invoker);
        long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
        //协议检查
        if (serverVersion == clientVersion) {
          return proxy;
        }
      }
    
      //每次发送一组方法调用到指定server
      //并没有经过反射机制
      //而是直接发送方法名,参数类型,值到指定server;并获取server执行这些方法的结果集
      //如果你确实有一组远程命令要执行,并且知道这些命令的的方法名,参数类型,值,并为每个命令指定执行server,那你可以调用此方法
      public static Object[] call(Method method, Object[][] params,
                                  InetSocketAddress[] addrs, 
                                  UserGroupInformation ticket, Configuration conf)
        throws IOException, InterruptedException {
        //实例化Invocation
        Invocation[] invocations = new Invocation[params.length];
        for (int i = 0; i < params.length; i++)
          invocations[i] = new Invocation(method, params[i]);
        Client client = CLIENTS.getClient(conf);
        try {//发生Invocation
        Writable[] wrappedValues = 
          client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf);
        
        if (method.getReturnType() == Void.TYPE) {
          return null;
        }
    
        Object[] values =
          (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
        for (int i = 0; i < values.length; i++)
          if (wrappedValues[i] != null)
            values[i] = ((ObjectWritable)wrappedValues[i]).get();
        
        return values;
        } finally {
          CLIENTS.stopClient(client);
        }
      }
     
      //获得RPC Server
      public static Server getServer(final Object instance, final String bindAddress, final int port,
                                     final int numHandlers,
                                     final boolean verbose, Configuration conf,
                                     SecretManager<? extends TokenIdentifier> secretManager) 
        throws IOException {
        return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
      }
      
    }
     

rpc

相关推荐