【自己读源码】Netty4.X系列(四) Netty中的异步调用

Netty中的异步调用

如果大家观察仔细,会发现我们之前所写的代码都是串行执行的,这是什么意思?就是我们看到代码是什么顺序,最后程序就是按什么顺序执行的。

但是Netty作为一个高性能网络框架,他的调用很多都是异步的,这样,就可以不等上一步做完,继续行进下一步,达到多任务并行的作用。

实现概述

Netty是怎么实现他的异步调用呢,大致总结了下由以下几个核心部分
组成:

  • 异步执行(executor)
  • 异步结果(future and promise)
  • Listener
  • 同步接口

首先,既然是异步调用,肯定要有异步执行,同学们这里肯定想到的是使用线程,没错,他的底层确实也是线程,只不过netty自身封装成了executor,增强了线程的调度。

其次,是要能获取到这次执行的结果,有的同学可能会说使用callable,没错这确实是一种解决方案,但是netty并没有使用这种,而是使用了一种更为巧妙的设计(也就是通过promise对象来传递执行的结果)来完成这种操作,下面我们会详细说明。

最后就是promise对象提供的各种接口,比如Listener:可以监听执行的完成。或者是同步接口:保证异步执行的方法顺序也是同步的。这篇文章中,我们主要就讲这两,三个,其他的各位童鞋可以自己去看源码。

Executor实现

Netty中每个Channel都有一个eventloop对象,实现还蛮复杂的,在这里不是重点,所以我们先实现一个,具有异步调用功能的exector。

自定义executor很简单,只要实现Executor接口就行

public class MyNettyExecutor implements Executor {
private ThreadFactory factory;

public MyNettyExecutor(ThreadFactory factory) {
    this.factory = factory;
}

public void execute(Runnable command) {
    factory.newThread(command).start();
}
}

然后在需要使用的时候,实例化这个类,这里为了增强使用,我们在类内部提供一个静态初始化方法,并提供最简单factory实现。

public static Executor newExecutor(){
        return new MyNettyExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });
    }

Promise详解

对furture/promise的理解

我对future的认识最开始源于Java的FutureTask框架,简单来说,FutureTask是Future接口的一种实现,Future则是异步执行的结果。
而promise,从接口注释上来看,是一种可修改的Future

/**
 * Special {@link Future} which is writable.
 */

那么现在来看,一个异步结果的程序主要有下面几步

  1. 生成promise对象
  2. 具体调用的地方传入promise参数
  3. 异步调用完成后,设置promise为完成
  4. 返回future对象

其中,第三步是发生在异步调用里的,所以我们看到的顺序其实就是1->2>4,让我们来画一张图。
【自己读源码】Netty4.X系列(四) Netty中的异步调用

这其实可以用一个现实中的例子来讲述。

今天是小明女朋友的生日,小明想给她一个惊喜,于是想到了订一个蛋糕给她,所以小明打电话给蛋糕店预定,店员回复他说:好的,我们知道了,制作好了会通知你的。于是小明就开开心心的打游戏去了。

在上面的例子中,预定蛋糕就是一个异步过程,我只要通知需要做这件事的人(execute),并拿到回复(Future),然后就可以做其他事情了。然后过一段时间打电话询问蛋糕做好没(isDone),如果没做好,那就请他做好的时候通知我(listener)

所以现在我们有了异步执行,还需要什么呢?

  1. Future和Promise的定义接口
  2. Promise实现

然后,我们理一下需要哪些接口

  1. isDone 判断任务是否完成
  2. addListener
  3. trySuccess 设置任务完成并通知所有listener
  4. sync 同步方法,等待任务完成

定义

首先定义接口

/*listener接口,提供complete方法**/
public interface MyFutureListener<F extends MyFuture<?>>  extends EventListener {

    void operationComplete(F future);
}
/*Future接口**/
public interface MyFuture<V> {

    boolean isDone();

    MyFuture<V> sync() throws InterruptedException ;

    MyFuture<V> addListener(MyFutureListener<? extends MyFuture<? super V>> listener);


}
/*promise接口**/
public interface MyPromise<V> extends MyFuture<V>{

    boolean trySuccess();

    @Override
    MyPromise<V> addListener(MyFutureListener<? extends MyFuture<? super V>> listener);


}

isDone

我们假设只有完成和未完成两个状态,Promise内维护着这个状态值(初始为null),那么判断是否完成只需要判断这个值不为空就行了。

private volatile Object result = null;

    @Override
    public boolean isDone() {
        return result != null;
    }

trySucess

那么最简单的success实现就是给这个对象赋值

@Override
    public boolean trySuccess() {
        result = new Object();
        return true;
    }

当然,这里很不严谨,我们后面再说。

Listener接口实现

上面我们定义了listener接口,这里要实现addListener方法

private List<MyFutureListener<? extends MyFuture<?>>> listeners;

@Override
    public MyPromise<Void> addListener(MyFutureListener<? extends MyFuture<? super Void>> listener) {
        synchronized (this) {
            if(listeners == null){
                listeners = new ArrayList<MyFutureListener<? extends MyFuture<?>>>();
                listeners.add(listener);
            }else {
                listeners.add(listener);
            }
        }
        if (isDone()){
            for (MyFutureListener f: listeners
                    ) {
                f.operationComplete(this);
            }
        }
        return this;
    }

然后完善下success方法,成功的时候调用每一个listener的complete方法。

@Override
    public boolean trySuccess() {
        result = new Object();

        for (MyFutureListener f: listeners
             ) {
            f.operationComplete(this);
        }

        return true;
    }

同步接口实现

同步也很简单,就是先判断任务是否完成,没有完成就wait一下。注意,wait之前我们要保持同步,引入synchronized原语。

@Override
    public MyFuture<Void> sync() throws InterruptedException {
        if (isDone()){
            return this;
        }
       
         synchronized (this){
            while (!isDone()) {
            waiters++;
                try {
                    wait();
                }finally {
                    waiters--;
                }
            }
        }
      
        return this;
    }

同理,需要有地方去唤醒它,我们继续完善success方法,最终我们的trySuccess方法如下

private synchronized void checkNotify(){
        if (waiters > 0){
            notifyAll();
        }
    }

 @Override
    public boolean trySuccess() {
        result = new Object();
        checkNotify();
        for (MyFutureListener f: listeners
             ) {
            f.operationComplete(this);
        }

        return true;
    }

Demo

轮子造好了,是时候写个demo测试一下

public class MyExecutorDemo {
    public static void main(String[] args) {

        MyFuture future = asyncHello().addListener((MyFutureListener<MyPromise<Void>>) future1 -> System.out.println("监听到完成"));
        if (future.isDone()){
            System.out.println("异步执行完成");
        }else{
            try {
                future.sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static MyFuture asyncHello(){
        Executor executor = MyNettyExecutor.newExecutor();
        final DefaultPromise promise = new DefaultPromise();
        executor.execute(() -> {
            System.out.println("Hello Async");
            try {
                //模拟一些操作
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            promise.trySuccess();
        });
        return promise;
    }
}

警告

不可用于生产,这个Future/promise的设计仅仅为了说明异步执行和结果,距离netty中的异步框架还缺少很多。

  1. NULL值检查,整个设计中均没有对对象做NULL的检查,容易引起NullPointException。
  2. 异常处理缺失,对可能失败的地方做异常处理(这也是是否能用于生产的合格检验)
  3. 非完全异步,listener的通知没有使用异步
  4. 待补充(以我现在的水平,暂时想不到)

相关推荐