hadoop_rpc简介
1.RPC简介
1.1 RPC (remote procedure call)远程过程调用,
指的是java进程,即一个java进程调用另一个java进程中对象的方法。
1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
1.4 RPC是hadoop构建的基础。
1.5 RPC实际上就是socket通信,只要知道了对方地址和端口,即可实现通信,因此RPC可以实现多进程间交流,更适合于Hadoop集群不同地址(分布式)下的通信
2 模拟rpc机制,自定义代码写法:
2.1 需要一个接口和具体业务类,在接口中定义好需要被远程调用的业务类
(相当于公约,定义好公共规则-要调用的方法,然后客户端调用时直接传递规约中指定好的方法,最终调用到服务的这个方法上)
2.2 需要有RPC客户端和服务端类
2.3 代码如下:
1 接口: import org.apache.hadoop.ipc.VersionedProtocol; public interface MyBizAble extends VersionedProtocol{ public String sayHello(String name); } 2 业务实现类: public class MyBiz implements MyBizAble{ public String sayHello(String name){ return "hello " + name; } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return 0; } } 3 服务端 public class MyServer { public static String bindAddress = "localhost"; public static int port = 1001; public static void main(String[] args) throws IOException { MyBiz myBiz = new MyBiz(); /** Construct an RPC server. * @param instance the instance whose methods will be called * @param conf the configuration to use * @param bindAddress the address to bind on to listen for connection * @param port the port to listen for connections on */ 将服务端实例myBiz 服务端地址 端口交给RPC管理监听,客户端调用服务端对应方法时,会通过RPC 关联到服务端对应方法,服务端处理好后将结果在通过RPC 网络流向RPC客户端 Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration()); server.start(); } } 4 客户端 public class MyClient { public static void main(String[] args) throws IOException { /** * 构建一个客户端代理对象 * 该代理对象实现了命名协议 * 代理对象会与指定地址的服务端通讯 */ MyBizAble proxy = (MyBizAble)RPC.waitForProxy(MyBizAble.class, 1001, new InetSocketAddress(MyServer.bindAddress, MyServer.port), new Configuration()); String result = proxy.sayHello("zm"); System.out.println(result); /* MyBizAble proxy = (MyBizAble)RPC.waitForProxy( MyBizAble.class, MyBizAble.VERSION, new InetSocketAddress(MyServer.ADDRESS, MyServer.PORT), new Configuration()); final String result = proxy.hello("world"); System.out.println("客户端结果:"+result); //关闭网络连接 RPC.stopProxy(proxy);*/ } }
3 hadoop中rpc结构图
4 查看namenode源码,确认是否为rpc通讯的一部分,是rpc的服务端
4.1 是否长时间运行,一直等待客户端请求(socket server端就是这种典型标志)
4.2 看是否含有服务端代表典型标志:
Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());
4.3 源码跟踪和解释如下:
namenode rpc代码跟踪如下: public class NameNode implements ClientProtocol, DatanodeProtocol, NamenodeProtocol,.... { ....... /** RPC server */ private Server server; ...... public static void main(String argv[]) throws Exception { try { StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); NameNode namenode = createNameNode(argv, null); // 点击进入 if (namenode != null) namenode.join(); } catch (Throwable e) { LOG.error(StringUtils.stringifyException(e)); System.exit(-1); } } 如下: public static NameNode createNameNode(String argv[], Configuration conf) throws IOException { ............... NameNode namenode = new NameNode(conf); // 点击进入 ............... } 如下: /* ................ upgrade and create a snapshot of the current file system state 在 ............... */ public NameNode(Configuration conf) throws IOException { try { initialize(conf);// 点击进入 } catch (IOException e) { this.stop(); throw e; } } 如下: private void initialize(Configuration conf) throws IOException { ............... this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); ............... } } 源码参考完毕,解释如下: 一般的RPC server端代码在调用时: 第一个参数the instance whose methods will be called 表示要调用的业务类的方法, Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration()); 上面Namenode中,第一个参数this表示我的业务类是Namonode,同时RPC.getServer又出现在了Namenode类,因此Namenode 即表示了业务实现类,又担当了RPC server的功能。 this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); NameNode 作为业务实现类,需要实现公约,才能被客户端调用到, 看代码 public class NameNode implements ClientProtocol, DatanodeProtocol,NamenodeProtocol, 其中: ClientProtocol, DatanodeProtocol,NamenodeProtocol 就是公约,分别代表: ClientProtocol: 定义用户和namenode打交道的方法的接口公约 DatanodeProtocol:Protocol that a DFS datanode uses to communicate with the NameNode NamenodeProtocol: secondary NameNode uses to communicate with the NameNode
5 通过代码查看client是如何通过rpc机制来调用到namenode节点的
0 客户端上传文件到hdfs为例: private static void putData(FileSystem fileSystem) { try { System.out.println(fileSystem.getClass().getName());//运行后打印结果如下 org.apache.hadoop.hdfs.DistributedFileSystem FSDataOutputStream out = fileSystem.create(new Path(FILE)); // 点击进入 FileInputStream in = new FileInputStream("E:/seq100w.txt"); IOUtils.copyBytes(in, out, 1024, true); } catch (Exception e) { e.printStackTrace(); } } 如下: public FSDataOutputStream create(Path f) throws IOException { return create(f, true); // 点击进入 } 不断点击进入,直到如下: 如下: public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException; 去org.apache.hadoop.hdfs.DistributedFileSystem中找方法create: public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); return new FSDataOutputStream (dfs.create(getPathName(f), permission, // 点击进入 dfs.create overwrite, true, replication, blockSize, progress, bufferSize), statistics); } // 返回类是一个流, 流就需要一个目的地, 如下: 进入到类DFSClient.java Create a new dfs file with the specified block replication * with write-progress reporting and return an output stream for writing * into the file. // 主要关注两件事: 1 在namenode上创建了一个dfs file 2 创建用于存储用户上传文件数据的文件流 public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize ) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getDefault(); } FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf)); LOG.debug(src + ": masked=" + masked); final DFSOutputStream result = new DFSOutputStream(src, masked, // 点击进入 overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); beginFileLease(src, result); return result; } 如下: 在类DFSClient.java内,有内部类DFSOutputStream DFSClient.java { public final ClientProtocol namenode; /** * Create a new output stream to the given DataNode.//创建指向特定datanode的输出流 * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) */ DFSOutputStream(String src, FsPermission masked, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum, replication); computePacketChunkSize(writePacketSize, bytesPerChecksum); try { // Make sure the regular create() is done through the old create(). // This is done to ensure that newer clients (post-1.0) can talk to // older clusters (pre-1.0). Older clusters lack the new create() // method accepting createParent as one of the arguments. if (createParent) { namenode.create( // 看这里,这里的namenode是ClientProtocol类型, // 此时才真正是 DFSClient.java通过ClientProtocol来和NamoNode搭上线 // 此时RCP客户端已经调用了公约ClientProtocol的create方法, src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create( src, masked, clientName, overwrite, false, replication, blockSize); } } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileAlreadyExistsException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } streamer.start(); } 而 ClientProtocol namenode的初始化操作在 DFSClient构造函数中, DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) { ...... 277行 this.namenode = createNamenode(this.rpcNamenode, conf); 进行了初始化操作 } 而DFSClient初始化在类DistributedFileSystem: public void initialize(URI uri, Configuration conf) throws IOException { ... this.dfs = new DFSClient(namenode, conf, statistics); // 100行 ... } }流程图如下: