第六章:小朱笔记hadoop之源码分析-ipc分析 第四节:RPC类分析
第六章:小朱笔记hadoop之源码分析-ipc分析
第四节:RPC类分析
RPC类是对Server、Client的具体化。在RPC类中规定,客户程序发出请求调用时,参数类型必须是Invocation;从服务器返回的值类型必须是ObjectWritable。
RPC类是对Server、Client的包装,简化用户的使用。如果一个类需充当服务器,只需通过RPC类的静态方法getServer获得Server实例,然后start。同时此类提供协议接口的实现。如果一个类充当客户端,可以通过getProxy或者waitForProxy获得一个实现了协议接口的proxy object,与服务器端交互。
RPC类中有5个静态内部类,分别为:
写道
Invocation :用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache :用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server : org.apache.hadoop.ipc.Server的具体实现类,实现了抽象类的call方法,获得传入参数的call实例,再获取method方法,反射调用即可。
VersionMismatch:版本不匹配异常。
ClientCache :用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server : org.apache.hadoop.ipc.Server的具体实现类,实现了抽象类的call方法,获得传入参数的call实例,再获取method方法,反射调用即可。
VersionMismatch:版本不匹配异常。
(1)Invocation
/** A method invocation, including the method name and its parameters. */ private static class Invocation implements Writable, Configurable { private String methodName; // 方法名 private Class[] parameterClasses; // 参数类型集合 private Object[] parameters; // 参数值 private Configuration conf; // 配置类实例 ...... }
(2)ClientCache
/* Cache a client using its socket factory as the hash key */ static private class ClientCache { // 该内部类定义了一个缓存Map private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>(); /** * Construct & cache an IPC client with the user-provided SocketFactory * if no cached client exists. * 通过客户端org.apache.hadoop.ipc.Client的SocketFactory可以快速取出对应的Client实例 * * @param conf * Configuration * @return an IPC client */ /** * 从缓存Map中取出一个IPC Client实例,如果缓存够中不存在,就创建一个兵加入到缓存Map中 */ private synchronized Client getClient(Configuration conf, SocketFactory factory) { // Construct & cache client. The configuration is only used for // timeout, // and Clients have connection pools. So we can either (a) lose some // connection pooling and leak sockets, or (b) use the same timeout // for all // configurations. Since the IPC is usually intended globally, not // per-job, we choose (a). Client client = clients.get(factory); if (client == null) { client = new Client(ObjectWritable.class, conf, factory); // 通过反射实例化一个ObjectWritable对象,构造Client实例 clients.put(factory, client); } else { client.incCount(); } return client; } /** * Construct & cache an IPC client with the default SocketFactory if no * cached client exists. * * @param conf * Configuration * @return an IPC client */ private synchronized Client getClient(Configuration conf) { return getClient(conf, SocketFactory.getDefault()); } /** * Stop a RPC client connection A RPC client is closed only when its * reference count becomes zero. */ private void stopClient(Client client) { synchronized (this) { client.decCount(); // 该client实例的引用计数减1 if (client.isZeroReference()) { // 如果client实例的引用计数此时为0 clients.remove(client.getSocketFactory()); // 从缓存中删除 } } if (client.isZeroReference()) { // 如果client实例引用计数为0,需要关闭 client.stop(); // 停止所有与该client实例相关的线程 } } }
(3)Invoker
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } // 构造一个RPC.Invocation实例作为参数传递给调用程序,执行调用,返回值为value ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ synchronized private void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } } }
(4)Server
public Writable call(Class<?> protocol, Writable param, long receivedTime)throws IOException { Invocation call = (Invocation)param; // 通过反射,根据调用方法名和方法参数类型得到Method实例 Method method =protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true);// 设置反射的对象在使用时取消Java语言访问检查,提高效率 Object value = method.invoke(instance, call.getParameters());// 执行调用(instance是调用底层方法的对象,第二个参数是方法调用的参数) }