HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)
终于把RS的定位问题搞清楚了些些,时间不等人,马上看看connection.processBatch中,step2是如何把任务提交到服务端的吧
之前已经看到,首先创建了一个Callable<MuiltyResponse>对象,而该对象的call方法实际上又创建了一个ServerCallable<MultiResponse>对象,然后调用了它的withoutRetries方法。
这个方法很简单,调用了connect方法和multi方法
一个个开始啃吧,先看看connect,其中server是ServerCallable的成员,HRegionInterface类型
// 备注【1】:这是ServerCallable默认的connect方法 public void connect(final boolean reload) throws IOException { this.location = connection.getRegionLocation(tableName, row, reload); this.server = connection.getHRegionConnection(location.getHostname(), location.getPort()); } // 备注【1】:这是createCallable的时候重写ServerCallable connect方法 // 此前的代码中已经在step1获取到了rowkey对应的region信息,所以这里就不再重复获取了,这毕竟是一个重量级的操作 public void connect(boolean reload) throws IOException { server = connection.getHRegionConnection(loc.getHostname(), loc.getPort()); }
HRegionInterface
public HRegionInterface getHRegionConnection(final String hostname, final int port) throws IOException { return getHRegionConnection(hostname, port, false); } @Override public HRegionInterface getHRegionConnection(final String hostname,final int port, final boolean master) throws IOException { return getHRegionConnection(hostname, port, null, master); } HRegionInterface getHRegionConnection(final String hostname, final int port, final InetSocketAddress isa, final boolean master) throws IOException { if (master) getMaster();//如果是链接master,就去找吧,这里先不深究了 HRegionInterface server; String rsName = null; if (isa != null) { rsName = Addressing.createHostAndPortStr(isa.getHostName(), isa.getPort()); } else { rsName = Addressing.createHostAndPortStr(hostname, port); } ensureZookeeperTrackers(); // See if we already have a connection (common case) server = this.servers.get(rsName); if (server == null) { // create a unique lock for this RS (if necessary) this.connectionLock.putIfAbsent(rsName, rsName); // get the RS lock synchronized (this.connectionLock.get(rsName)) { // do one more lookup in case we were stalled above server = this.servers.get(rsName); if (server == null) { try { // Only create isa when we need to. InetSocketAddress address = isa != null ? isa : new InetSocketAddress(hostname, port); // definitely a cache miss. establish an RPC for this RS // 前面都是构建地址,缓存判断、操作之类,这里是核心代码 server = HBaseRPC.waitForProxy(this.rpcEngine, serverInterfaceClass, HRegionInterface.VERSION, address, this.conf, this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); this.servers.put(Addressing.createHostAndPortStr( address.getHostName(), address.getPort()), server); } catch (RemoteException e) { LOG.warn("RemoteException connecting to RS", e); // Throw what the RemoteException was carrying. throw e.unwrapRemoteException(); } } } } return server; }
这段代码在regionlocation的博文中已经帖过,主要是利用servernamestring做了一个hash缓存,如果已经存在则返回之,否则创建并缓存之
HBaseRPC.waitForProxy,参数比较多
后面5个就不说了,常见参数,先看看前三个吧
RpcEngine:注释说的很清楚,RPC实现。具体是什么后续跟进
protocol:HRegionInterface的子类
clientVersion:HRegionInterface.VERSION,当前为常量29。不多说了,啃代码。主代码目测也太简单了,直接循环尝试返回rpcClient.getProxy,然后处理异常。看来还得往下挖
public static <T extends VersionedProtocol> T waitForProxy(RpcEngine rpcClient, Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int maxAttempts,//hbase.client.rpc.maxattempts,默认1,这个坑货配置文件里还没有 int rpcTimeout, long timeout//hbase.rpc.timeout,默认60秒,亲,你们的客户端能忍受这么长的超时等待吗 ) throws IOException { // HBase does limited number of reconnects which is different from hadoop. long startTime = System.currentTimeMillis(); IOException ioe; int reconnectAttempts = 0; while (true) { try { return rpcClient.getProxy(protocol, clientVersion, addr, conf, rpcTimeout); } catch (SocketTimeoutException te) { // namenode is busy LOG.info("Problem connecting to server: " + addr); ioe = te; } catch (IOException ioex) { // We only handle the ConnectException. ConnectException ce = null; if (ioex instanceof ConnectException) { ce = (ConnectException) ioex; ioe = ce; } else if (ioex.getCause() != null && ioex.getCause() instanceof ConnectException) { ce = (ConnectException) ioex.getCause(); ioe = ce; } else if (ioex.getMessage().toLowerCase() .contains("connection refused")) { ce = new ConnectException(ioex.getMessage()); ioe = ce; } else { // This is the exception we can't handle. ioe = ioex; } if (ce != null) { handleConnectionException(++reconnectAttempts, maxAttempts, protocol, addr, ce); } } // check if timed out if (System.currentTimeMillis() - timeout >= startTime) { throw ioe; } // wait for retry try { Thread.sleep(1000); } catch (InterruptedException ie) { // IGNORE } } }
好吧,原来rpcClient.getProxy才是重头戏,一次次的浇熄哥接近终点的热情
if (rpcEngine == null) { this.rpcEngine = HBaseRPC.getProtocolEngine(conf); } public static synchronized RpcEngine getProtocolEngine(Configuration conf) { // check for a configured default engine Class<?> impl = conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class); LOG.debug("Using RpcEngine: " + impl.getName()); RpcEngine engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf); return engine; }
RPCEgine:WritableRpcEngine,你也可以通过hbase.rpc.engine配置一个全限定的类名来覆盖它
public <T extends VersionedProtocol> T getProxy( Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout) throws IOException { if (this.client == null) { // client是一个HBaseClient实例,RPCEngine初始化的时候在setconf方法中注入// MY TODO throw new IOException("Client must be initialized by calling setConf(Configuration)"); } // 真的是创建了一个代理呢 T proxy = (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, new Invoker(client, protocol, addr, User.getCurrent(), conf, HBaseRPC.getRpcTimeout(rpcTimeout))); /* * TODO: checking protocol version only needs to be done once when we setup a new * HBaseClient.Connection. Doing it every time we retrieve a proxy instance is resulting * in unnecessary RPC traffic. */ //检查是否服务端版本号与客户端版本号是否一致,否则只能说再见了 long serverVersion = ((VersionedProtocol) proxy) .getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion != clientVersion) { throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion, serverVersion); } return proxy; }
这就拿到代理类了,Handler实现为Invoker,WritableRpcEngine的一个内部类
接着继续看Invoker吧,真的是代理哦,太简单了就是记录了时间,然后就没有然后了。几乎没有业务,实现放在了Invocation类中,由HBaseClient调用
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } HbaseObjectWritable value = (HbaseObjectWritable) client.call(new Invocation(method, protocol, args), address, protocol, ticket, rpcTimeout); if (logDebug) { // FIGURE HOW TO TURN THIS OFF! long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } public Invocation(Method method, Class<? extends VersionedProtocol> declaringClass, Object[] parameters) { this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; if (declaringClass.equals(VersionedProtocol.class)) { //VersionedProtocol is exempted from version check. clientVersion = 0; clientMethodsHash = 0; } else { try { Field versionField = declaringClass.getField("VERSION"); versionField.setAccessible(true); this.clientVersion = versionField.getLong(declaringClass); } catch (NoSuchFieldException ex) { throw new RuntimeException("The " + declaringClass, ex); } catch (IllegalAccessException ex) { throw new RuntimeException(ex); } this.clientMethodsHash = ProtocolSignature.getFingerprint( declaringClass.getMethods()); } }
Invocation类其实主要是将要执行的类,方法,以及方法参数做了一层基于writable的封装,依赖HbaseObjectWritable类序列化和反序列化参数。这个比较漫长,也比较枯燥,专门拿一天写一篇来解析吧。
现在执行序列回到getHRegionConnection,我们拿到并缓存了一个HRegionInterface的一个代理,这个代理在执行的时候实际上是调用HbaseClient的Call方法。至于Call方法里面都做了些神马,后续再看。
既然连接准备好了,就该调用call方法了,call方法实际上是调用了server.multi方法,也就是HRegionInterface的multi方法。这下终于轮到HbaseClient出场了,隐藏得好深。
public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { Call call = new Call(param);//param,这里就是封装好类名、方法名以及参数的invocation对象 Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);//这里开始连接,亲,你真的要连接了吗,不要骗人啊。 connection.sendParam(call); // MS真的连接了,还把执行代理发了出去 boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (call) { //如果请求没完成,连接没中断,线程没中断,没超时没错误,等吧。 while (!call.done) { if (connection.shouldCloseConnection.get()) { throw new IOException("Unexpected closed connection"); } try { call.wait(1000); // wait for the result } catch (InterruptedException ignored) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } // local exception throw wrapException(addr, call.error); } return call.value; } }