Dubbo异步转同步
Dubbo是一款开源的RPC中间件框架,底层数据传输默认使用的Netty,那么请求的处理理论上是异步的,为什么我们在使用的时候是同步的呢?肯定是Dubbo框架,做了异步转同步的处理。
首先我们来梳理下,异步转同步,我们的需求是怎样的?
1、调用方请求远程服务之后,需要等待结果,此刻,请求线程应该阻塞
2、远程服务返回结果后,唤醒请求线程,调用方得到结果
Dubbo异步转同步,核心类是DefaultFuture,核心方法是get(),received(Channel channel, Response response)。
DefaultFuture构造函数:
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); // 每次请求都会生成一个DefaultFuture对象,然后保存到FUTURES中,请求返回结果时,根据id从FUTURES中找到对应的DefaultFuture对象,并删除 private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); // AtomicLong从0开始递增,创建Request对象时生成的id private final long id; private final Channel channel; // 请求对象 private final Request request; // 超时的设置 private final int timeout; // 这里使用Lock和Condition实现等待通知机制 private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private final long start = System.currentTimeMillis(); private volatile long sent; // 请求的返回结果 private volatile Response response; private volatile ResponseCallback callback; public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); }
get():
public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } // isDone()方法就是判断Response是否有值(即是否有返回结果) if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { // 超时等待 done.await(timeout, TimeUnit.MILLISECONDS); // 如果有返回结果了,或者,超时了,就退出循环 if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // 如果是超时了,就抛出异常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } // 远程服务正常返回结果,则返回给调用方 return returnFromResponse(); }
received(Channel channel, Response response):
public static void received(Channel channel, Response response) { try { // 根据请求id从FUTURES中获取DefaultFuture,并删除 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { // CHANNELS也删除 CHANNELS.remove(response.getId()); } }
private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { // 唤醒阻塞的线程 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
总结:Dubbo异步转同步的原理,其实就是利用Lock和Condition实现了等待通知机制。请求与返回结果进行匹配,则是通过传递以及接收请求id实现的。