Dubbo异步转同步

Dubbo是一款开源的RPC中间件框架,底层数据传输默认使用的Netty,那么请求的处理理论上是异步的,为什么我们在使用的时候是同步的呢?肯定是Dubbo框架,做了异步转同步的处理。

首先我们来梳理下,异步转同步,我们的需求是怎样的?

1、调用方请求远程服务之后,需要等待结果,此刻,请求线程应该阻塞

2、远程服务返回结果后,唤醒请求线程,调用方得到结果

Dubbo异步转同步,核心类是DefaultFuture,核心方法是get(),received(Channel channel, Response response)。

DefaultFuture构造函数:

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();

   // 每次请求都会生成一个DefaultFuture对象,然后保存到FUTURES中,请求返回结果时,根据id从FUTURES中找到对应的DefaultFuture对象,并删除
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    // AtomicLong从0开始递增,创建Request对象时生成的id
    private final long id;
    private final Channel channel;
    // 请求对象
    private final Request request;
    // 超时的设置
    private final int timeout;
    // 这里使用Lock和Condition实现等待通知机制
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private final long start = System.currentTimeMillis();
    private volatile long sent;
    // 请求的返回结果
    private volatile Response response;
    private volatile ResponseCallback callback;

    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

get():

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        // isDone()方法就是判断Response是否有值(即是否有返回结果)
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 超时等待
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果有返回结果了,或者,超时了,就退出循环
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 如果是超时了,就抛出异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        // 远程服务正常返回结果,则返回给调用方
        return returnFromResponse();
    }

received(Channel channel, Response response):

public static void received(Channel channel, Response response) {
        try {
            // 根据请求id从FUTURES中获取DefaultFuture,并删除
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            // CHANNELS也删除
            CHANNELS.remove(response.getId());
        }
    }
private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                // 唤醒阻塞的线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

总结:Dubbo异步转同步的原理,其实就是利用Lock和Condition实现了等待通知机制。请求与返回结果进行匹配,则是通过传递以及接收请求id实现的。