【自己读源码】Netty4.X系列(四) Netty中的异步调用
Netty中的异步调用
如果大家观察仔细,会发现我们之前所写的代码都是串行执行的,这是什么意思?就是我们看到代码是什么顺序,最后程序就是按什么顺序执行的。
但是Netty作为一个高性能网络框架,他的调用很多都是异步的,这样,就可以不等上一步做完,继续行进下一步,达到多任务并行的作用。
实现概述
Netty是怎么实现他的异步调用呢,大致总结了下由以下几个核心部分
组成:
- 异步执行(executor)
- 异步结果(future and promise)
- Listener
- 同步接口
首先,既然是异步调用,肯定要有异步执行,同学们这里肯定想到的是使用线程,没错,他的底层确实也是线程,只不过netty自身封装成了executor,增强了线程的调度。
其次,是要能获取到这次执行的结果,有的同学可能会说使用callable,没错这确实是一种解决方案,但是netty并没有使用这种,而是使用了一种更为巧妙的设计(也就是通过promise对象来传递执行的结果)来完成这种操作,下面我们会详细说明。
最后就是promise对象提供的各种接口,比如Listener:可以监听执行的完成。或者是同步接口:保证异步执行的方法顺序也是同步的。这篇文章中,我们主要就讲这两,三个,其他的各位童鞋可以自己去看源码。
Executor实现
Netty中每个Channel都有一个eventloop对象,实现还蛮复杂的,在这里不是重点,所以我们先实现一个,具有异步调用功能的exector。
自定义executor很简单,只要实现Executor接口就行
public class MyNettyExecutor implements Executor { private ThreadFactory factory; public MyNettyExecutor(ThreadFactory factory) { this.factory = factory; } public void execute(Runnable command) { factory.newThread(command).start(); } }
然后在需要使用的时候,实例化这个类,这里为了增强使用,我们在类内部提供一个静态初始化方法,并提供最简单factory实现。
public static Executor newExecutor(){ return new MyNettyExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }); }
Promise详解
对furture/promise的理解
我对future的认识最开始源于Java的FutureTask框架,简单来说,FutureTask是Future接口的一种实现,Future则是异步执行的结果。
而promise,从接口注释上来看,是一种可修改的Future
/** * Special {@link Future} which is writable. */
那么现在来看,一个异步结果的程序主要有下面几步
- 生成promise对象
- 具体调用的地方传入promise参数
- 异步调用完成后,设置promise为完成
- 返回future对象
其中,第三步是发生在异步调用里的,所以我们看到的顺序其实就是1->2>4,让我们来画一张图。
这其实可以用一个现实中的例子来讲述。
今天是小明女朋友的生日,小明想给她一个惊喜,于是想到了订一个蛋糕给她,所以小明打电话给蛋糕店预定,店员回复他说:好的,我们知道了,制作好了会通知你的。于是小明就开开心心的打游戏去了。
在上面的例子中,预定蛋糕就是一个异步过程,我只要通知需要做这件事的人(execute),并拿到回复(Future),然后就可以做其他事情了。然后过一段时间打电话询问蛋糕做好没(isDone),如果没做好,那就请他做好的时候通知我(listener)
所以现在我们有了异步执行,还需要什么呢?
- Future和Promise的定义接口
- Promise实现
然后,我们理一下需要哪些接口
- isDone 判断任务是否完成
- addListener
- trySuccess 设置任务完成并通知所有listener
- sync 同步方法,等待任务完成
定义
首先定义接口
/*listener接口,提供complete方法**/ public interface MyFutureListener<F extends MyFuture<?>> extends EventListener { void operationComplete(F future); } /*Future接口**/ public interface MyFuture<V> { boolean isDone(); MyFuture<V> sync() throws InterruptedException ; MyFuture<V> addListener(MyFutureListener<? extends MyFuture<? super V>> listener); } /*promise接口**/ public interface MyPromise<V> extends MyFuture<V>{ boolean trySuccess(); @Override MyPromise<V> addListener(MyFutureListener<? extends MyFuture<? super V>> listener); }
isDone
我们假设只有完成和未完成两个状态,Promise内维护着这个状态值(初始为null),那么判断是否完成只需要判断这个值不为空就行了。
private volatile Object result = null; @Override public boolean isDone() { return result != null; }
trySucess
那么最简单的success实现就是给这个对象赋值
@Override public boolean trySuccess() { result = new Object(); return true; }
当然,这里很不严谨,我们后面再说。
Listener接口实现
上面我们定义了listener接口,这里要实现addListener方法
private List<MyFutureListener<? extends MyFuture<?>>> listeners; @Override public MyPromise<Void> addListener(MyFutureListener<? extends MyFuture<? super Void>> listener) { synchronized (this) { if(listeners == null){ listeners = new ArrayList<MyFutureListener<? extends MyFuture<?>>>(); listeners.add(listener); }else { listeners.add(listener); } } if (isDone()){ for (MyFutureListener f: listeners ) { f.operationComplete(this); } } return this; }
然后完善下success方法,成功的时候调用每一个listener的complete方法。
@Override public boolean trySuccess() { result = new Object(); for (MyFutureListener f: listeners ) { f.operationComplete(this); } return true; }
同步接口实现
同步也很简单,就是先判断任务是否完成,没有完成就wait一下。注意,wait之前我们要保持同步,引入synchronized原语。
@Override public MyFuture<Void> sync() throws InterruptedException { if (isDone()){ return this; } synchronized (this){ while (!isDone()) { waiters++; try { wait(); }finally { waiters--; } } } return this; }
同理,需要有地方去唤醒它,我们继续完善success方法,最终我们的trySuccess方法如下
private synchronized void checkNotify(){ if (waiters > 0){ notifyAll(); } } @Override public boolean trySuccess() { result = new Object(); checkNotify(); for (MyFutureListener f: listeners ) { f.operationComplete(this); } return true; }
Demo
轮子造好了,是时候写个demo测试一下
public class MyExecutorDemo { public static void main(String[] args) { MyFuture future = asyncHello().addListener((MyFutureListener<MyPromise<Void>>) future1 -> System.out.println("监听到完成")); if (future.isDone()){ System.out.println("异步执行完成"); }else{ try { future.sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } static MyFuture asyncHello(){ Executor executor = MyNettyExecutor.newExecutor(); final DefaultPromise promise = new DefaultPromise(); executor.execute(() -> { System.out.println("Hello Async"); try { //模拟一些操作 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } promise.trySuccess(); }); return promise; } }
警告
不可用于生产,这个Future/promise的设计仅仅为了说明异步执行和结果,距离netty中的异步框架还缺少很多。
- NULL值检查,整个设计中均没有对对象做NULL的检查,容易引起NullPointException。
- 异常处理缺失,对可能失败的地方做异常处理(这也是是否能用于生产的合格检验)
- 非完全异步,listener的通知没有使用异步
- 待补充(以我现在的水平,暂时想不到)
相关推荐
* 最好使用异步调用,否则可能产生UI阻塞错误。* myTextView.setText; 虽然这里能设定结果,* Called when the activity is first created.