Dubbo源码分析系列1---Dubbo异步通信

1、client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字。

2、将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object。

3、向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)。

4、将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去。

5、当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。

6、服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。

7、监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。

客户端部分源码:

//同步调用远程接口
public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {
        byte protocol = getProtocol(control);
        if (!TRConstants.isValidProtocol(protocol)) {
            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");
        }
        ResponseFuture future = invokeWithFuture(appRequest, control);
        return future.get();  //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback
}
public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {
         byte protocol = getProtocol(control);
         long timeout = getTimeout(control);
         ConnectionRequest request = new ConnectionRequest(appRequest);
         request.setSerializeProtocol(protocol);
         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);
         connection.sendRequestWithCallback(request, adapter, timeout);
         return adapter;
}

回调部分源码如下:

Callback2FutureAdapter implements ResponseFuture
public Object get() throws RemotingException, InterruptedException {
    synchronized (this) {  // 旋锁
        while (!isDone) {  // 是否有结果了
            wait(); //没结果是释放锁,让当前线程处于等待状态
        }
    }
    if (errorCode == TRConstants.RESULT_TIMEOUT) {
         throw new TimeoutException("Wait response timeout, request["
         + connectionRequest.getAppRequest() + "].");
    }
    else if (errorCode > 0) {
        throw new RemotingException(errorMsg);
    }
    else {
         return appResp;
    }
}
客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll()
public void handleResponse(Object _appResponse) {
         appResp = _appResponse; //将远程调用结果设置到callback中来
         setDone();
}
public void onRemotingException(int _errorType, String _errorMsg) {
         errorCode = _errorType;
         errorMsg = _errorMsg;
         setDone();
}
private void setDone() {
         isDone = true;
         synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了
             notifyAll(); // 唤醒处于等待的线程
         }
}

Dubbo通信部分源码:

// 用来存放请求和回调的MAP
private final ConcurrentHashMap<Long, Object[]> requestResidents;
 
//发送消息出去
void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {
         long requestId = connRequest.getId();
         long waitBegin = System.currentTimeMillis();
         long waitEnd = waitBegin + timeoutMs;
         Object[] queue = new Object[4];
         int idx = 0;
         queue[idx++] = waitEnd;
         queue[idx++] = waitBegin;   //用于记录日志
         queue[idx++] = connRequest; //用于记录日志
         queue[idx++] = callback;
         requestResidents.put(requestId, queue); // 记录响应队列
         write(connRequest);
 
         // 埋点记录等待响应的Map的大小
         StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),
                   1L);
}
public void write(final Object connectionMsg) {
//mina里的IoSession.write()发送消息
         WriteFuture writeFuture = ioSession.write(connectionMsg);
         // 注册FutureListener,当请求发送失败后,能够立即做出响应
         writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));
}
 
/**
* 在得到响应后,删除对应的请求队列,并执行回调
* 调用者:MINA线程
*/
public void putResponse(final ConnectionResponse connResp) {
         final long requestId = connResp.getRequestId();
         Object[] queue = requestResidents.remove(requestId);
         if (null == queue) {
             Object appResp = connResp.getAppResponse();
             String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();
             StringBuilder sb = new StringBuilder();
             sb.append("Not found response receiver for requestId=[").append(requestId).append("],");
             sb.append("from [").append(connResp.getHost()).append("],");
             sb.append("response type [").append(appRespClazz).append("].");
             LOGGER.warn(sb.toString());
             return;
         }
         int idx = 0;
         idx++;
         long waitBegin = (Long) queue[idx++];
         ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];
         ResponseCallback callback = (ResponseCallback) queue[idx++];
         // ** 把回调任务交给业务提供的线程池执行 **
         Executor callbackExecutor = callback.getExecutor();
         callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
 
         long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间
         logIfResponseError(connResp, duration, connRequest.getAppRequest());
}

以上代码是dubbo老版本的,但是思路却是和新版本完全一样的,在下一篇文章中会重点介绍新版本通信代码。。。。。未完待续。。。。。

相关推荐