1小时让你掌握响应式编程,并入门Reactor

1小时让你掌握响应式编程,并入门Reactor

 我看同步阻塞

“你知道什么是同步阻塞吗”,当然知道了。“那你怎么看它呢”,这个。。。

在同步阻塞的世界里,代码执行到哪里,数据就跟到哪里。如果数据很慢跟不上来,代码就停在那里等待数据的到来,然后再带着数据一起往下执行。

可以说是,代码执行和数据是结伴而行,不离不弃。执子之手与子偕老。让人老感动了。

如果还不太理解的话,可以认为代码执行其实就是一些行为动作,这些行为动作的目的就是为了获取/操作数据。

例如加法,这里的行为动作就是执行相加,数据就是加数和被加数。操作结果就是得到了另一个数据,即两个数的和。

只是在这个加法里,数据跑的特别快,(CPU的寄存器,能不快吗),我们几乎觉察不到执行动作在等数据的过程。怎么办呢,那就看一个能把它们拉开的例子。

那自然非数据库查询莫属了,既有网络I/O,又有磁盘I/O,肯定会慢一些。

假设我的业务是这样的,代码先去数据库查询一个用户,接着修改用户的密码,然后再更新回数据库,最后代码返回成功。

如果网速和数据库都很慢的话,可能是这样的。代码执行一个查询数据库动作,然后等啊等啊等,等的花都谢了,终于数据库把用户返回过来了,接着,代码飞快的修改了密码,并执行一个更新数据库的动作,然后又是等啊等啊等,等的花又开了,数据库终于回话了,更新成功。然后代码返回成功,全部执行完了。

所以同步阻塞代码的最大特点就是,带着数据上路,数据不到位就阻塞住。

最后来个小小的升华:

  • 所谓同步就是快的等慢的,然后一起往前走,表示的是目的。
  • 所谓阻塞就是想办法让快的停滞不前,等待慢的到来,表示的是手段。

一言以蔽之,同步是目的,阻塞是手段。

我看异步非阻塞

“你知道什么是异步非阻塞吗”,当然知道了,不过我不知道该怎么看它。“哦,恭喜你都会抢答了。。。”。

我们生活在异步的世界,却是最不懂异步的人。

你去饭店吃饭,服务员把你的菜单写好,交给厨房后就去服务别人了。

厨房把饭做好后,通过按铃通知服务员,服务员再把饭送到你的位置上。

服务员是主(或I/O)线程,把任务交给厨房这个工作线程去执行,厨房接到任务的同时还要记住送来该任务的服务员,然后厨房去执行任务,服务员也去忙别的了。

厨房执行完任务后,对当时的那个服务员进行通知,服务员接到通知后,再去执行接下来的内容,如把饭送到客人餐桌。

这是一个非常常见的异步场景,由于其中一方不愿意等待(或时时刻刻关注)另一方,但又不知道对方什么时候能做完,所以只能寄希望于对方做完的时候告诉自己一声,然后自己再进行后续的工作。

这就是我们常说的异步回调(或通知)。

早上项目经理开完会,给大家分好任务,并把测试用例代码也给了大家,说谁做完了跑一边测试用例,通了就可以了。然后就散会,各自忙去了。

下午5点你做完了,开始跑测试用例,很幸运,一次性全部通过。你的任务就算完成了,接下来就可以干自己想干的事情,比如看“编程新说”公众号。

项目经理是主(或I/O)线程,把任务交给各个开发人员这些工作线程,并给每个人一段逻辑代码,告诉他们在自己的任务完成后再执行这一段逻辑代码。

开发人员完成任务后,接着执行逻辑代码,执行完逻辑代码后,就算已经结束了。不再需要告知项目经理一声。

这也是一个常见的异步场景,一方给另一方安排好任务后,再给它一段逻辑代码,接着彼此就分道扬镳。之后的日子里,你走你的阳关道,我过我的独木桥,井水不犯河水,老死不相往来。

这段逻辑代码通常是由一个Runnable接口传入,且是在任务完成时执行,就暂且称它为的“完成执行”吧。

所以异步非阻塞代码的最大特点就是,我给你分配任务,你完事给我回复,咱俩互相不耽误。

最后来个小小的升华:

  • 所谓异步就是你走你的,我走我的,大家各自往前走,表示的是一种事实形态。
  • 所谓非阻塞就是快的快走,慢的慢走,一刻都不为你停留,表示的是一种直观现象。

一言以蔽之,异步是形态,非阻塞是现象。

异步非阻塞它本身并没有什么明显的可圈可点的特征,注意我说的是它“本身”。因为我们整个世界都是按照异步非阻塞模式在运行。

上厕所的时候玩手机,等车的时候玩手机,上班的时候玩手机,等饭的时候玩手机,回家以后玩手机,睡觉做梦玩手机。第二天还是这样的。哈哈。一个人就没有被阻塞住的时候。

不可否认,我们生活的社会又很复杂,主要是因为人和人之间的沟通、交流和协调有时并非一件容易之事。

同理,异步非阻塞“本身”并不难,难就难在怎么实现它。毕竟让一群听不懂人话的二货线程们互相沟通协调更非一件易事。

我看响应式

所谓响应式就是外界发生了变化,你要做出反应。所以响应式编程就是围绕着变化来构建的。

如何收集到原始变化,如何把这个变化告知相关处理者,处理者如何做出反应,做出反应的过程其实就是引发了新的变化,这个新的变化又该如何被收集,又该如何告知下一个处理者,如此往复,直至全部结束。

可以说整个自然界都是响应式的,因为它们都会对外界的变化或自身的变化产生反应。

先说人类,冷的时候加衣,饿的时候吃饭,病的时候去医院。看到绿色放松,看到蓝色镇定,看到红色易激动。

再说动植物,向日葵围绕太阳转叫趋光性,植物的根系朝水多的地方生长叫趋水性,鸽子可以磁场辨别方向,鲸鱼、海归都可以利用磁场记住自己走过的路。

所以响应式“本身”是一个很简单的模型,你给我一个变化,我做出一个反应。

动植物都有一套完善的感觉器官,能够感受到外界变化。同时他们又有超高的智商或完善的一套生物系统能够对这种变化作出反应。这是数万年甚至数千万年进化的结果,是基因决定的,所以看起来很自然。

再来看看编程界的响应式,也是这两个问题,一是如何知道外界的变化,二是如何对这种变化作出反应。

代码可是没有生命的,那就只能简单粗暴了。如何知道变化,那就让别人告诉你呗。如何做出反应,那就执行一段逻辑代码呗。

别人告诉你就等于异步回调/通知,执行的这段逻辑代码,可以是外界传入的,也可以是自己本身的一个方法。

现在明白了吧,异步非阻塞就是响应式。

最后来个小小升华:

所谓响应式就是一个概念,或是一种编程模式,它并不是一个知识,也不是一个技术。但它需要用到一个技术,那就是实现异步非阻塞的技术。

我看Reactor

在传统的编码中,会将逻辑处理代码写成方法,需要的数据由方法参数传入,处理过的数据由方法的返回值返回。

执行时以main方法为入口点启动,按照一定的顺序执行这些方法,数据依次流入流出每个方法,当所有的方法执行完时,数据也处理完了,就结束了。

整个过程是以逻辑代码的执行为主线,数据只是一个必须的参与者而已,因为代码要处理数据,如果数据不到位,代码就停下来不执行,等待数据的到来。

这就是典型的同步阻塞式的执行过程,非常简单,易于理解,而且代码也很好写。

到目前为止,我们提到的都是响应式的理论,那应该怎样去实现它呢,一时间还真没有头绪。

响应式是异步非阻塞,和同步阻塞应该是相对的。那我们不妨就拿响应式往同步阻塞上套一下,看看能得到什么有价值的发现。

响应式关注两点,变化和反应,而且是变化在前,反应在后。同步阻塞也关注两点,执行逻辑和数据,而且是执行逻辑在前,数据在后。

那就开始建立对应关系。因为“反应”是一系列行为动作,所以应该和“执行逻辑”对应。那“变化”只能和“数据”对应,其实这是对的,“数据”由不可用到可用,本身就是发生了一个“变化”。

这个对应关系建立的很完美,但是逻辑顺序却完全冲突。响应式是由变化主导反应,这很好理解,我都没有变化,你无须做出反应。同步阻塞是由执行逻辑主导数据,这也很好理解,我代码都没执行呢,根本不需要数据。

可见,它们的对应关系非常完美,但主导顺序完全相反,这就是一个非常非常有价值的发现。

因为我们只需把同步阻塞倒过来,就是实现响应式的大致方向。这样的推理貌似是对的,但实际当中是这样的吗?嗯,是这样的。

现在请大家和我一起扭转思维。原来以逻辑代码执行作为主线,数据作为参与者。现在以数据作为主线,逻辑代码执行作为参与者。说的再白一些,原来是数据传递到逻辑代码里,现在是逻辑代码传递到数据里。

有人也许会问,逻辑代码怎么传递?哈哈,Lambda表达式呀,函数式编程呀。

想象一下,有一个长长的管子,里面的水一直在流。

如果你想让水变成橙色的,只需在管子上开个口,加装一个可以持续投放橙色染料的装置,结果流经它的水都变成橙色的了。

如果你想让橙色的水变甜的话,只需在后面的管子上开个口,加装一个可以持续投放白糖的装置,结果流经它的水都变成甜的了。

同理,可以在后面继续加装投放柠檬酸的装置,让水变酸,在后面继续加装压入二氧化碳的装置,让水带气泡。

最后发现,自来水经过多道工序处理后变成了芬达。

如果把水流看作是数据流,把投放装置看作是逻辑代码,就变成了,数据先流入第一个逻辑代码,处理后再流入第二个逻辑代码,依次流下去直至结束。

这就是以数据作为主线,逻辑代码只是参与者,同时它也是Reactor实现响应式编程的原理,Spring官方使用的响应式类库就是Reactor。

其中,“以数据为主线”和“在变化时通知处理者”这两个功能Reactor库都已经实现了,我们需要做的就是“对变化做出反应”,即插入逻辑代码。

Reactor入门

在Reactor中,有两个非常重要的类,就是Mono和Flux,它们都是数据源,在它们内部都已经实现了“以数据为主线”和“在变化时通知处理者”这两个功能,而且还提供了方法让我们来插入逻辑代码用于“对变化做出反应”。

Mono表示0个或1个数据,Flux表示0到多个数据。先从简单的Mono开始。

设计一个简单的示例,首先创建一个数据源,只包含一个数据10,第一个处理就是加1,第二个处理就是奇偶性过滤,第三个处理就是把这个数据消费掉,然后就结束了。

为了清楚地看出来主线程执行的是哪些代码,工作线程执行的是哪些代码,特意打印了很多信息。

  1. public static void main(String[] args) { 
  2.  displayCurrTime(1); 
  3.  displayCurrThreadId(1); 
  4.  //创建一个数据源 
  5.  Mono.just(10) 
  6.  //延迟5秒再发射数据 
  7.  .delayElement(Duration.ofSeconds(5)) 
  8.  //在数据上执行一个转换 
  9.  .map(n -> { 
  10.  displayCurrTime(2); 
  11.  displayCurrThreadId(2); 
  12.  displayValue(n); 
  13.  delaySeconds(2); 
  14.  return n + 1; 
  15.  }) 
  16.  //在数据上执行一个过滤 
  17.  .filter(n -> { 
  18.  displayCurrTime(3); 
  19.  displayCurrThreadId(3); 
  20.  displayValue(n); 
  21.  delaySeconds(3); 
  22.  return n % 2 == 0; 
  23.  }) 
  24.  //如果数据没了就用默认值 
  25.  .defaultIfEmpty(9) 
  26.  //订阅一个消费者把数据消费了 
  27.  .subscribe(n -> { 
  28.  displayCurrTime(4); 
  29.  displayCurrThreadId(4); 
  30.  displayValue(n); 
  31.  delaySeconds(2); 
  32.  System.out.println(n + " consumed, worker Thread over, exit."); 
  33.  }); 
  34.  displayCurrTime(5); 
  35.  displayCurrThreadId(5); 
  36.  pause(); 
  37. //显示当前时间 
  38. static void displayCurrTime(int point) { 
  39.  System.out.println(point + " : " + LocalTime.now()); 
  40. //显示当前线程Id 
  41. static void displayCurrThreadId(int point) { 
  42.  System.out.println(point + " : " + Thread.currentThread().getId()); 
  43. //显示当前的数值 
  44. static void displayValue(int n) { 
  45.  System.out.println("input : " + n); 
  46. //延迟若干秒 
  47. static void delaySeconds(int seconds) { 
  48.  try { 
  49.  TimeUnit.SECONDS.sleep(seconds); 
  50.  } catch (InterruptedException e) { 
  51.  e.printStackTrace(); 
  52.  } 
  53. //主线程暂停 
  54. static void pause() { 
  55.  try { 
  56.  System.out.println("main Thread over, paused."); 
  57.  System.in.read(); 
  58.  } catch (IOException e) { 
  59.  e.printStackTrace(); 
  60.  } 

以下是输出结果:

  1. 1 : 15:00:39.809 
  2. 1 : 1 
  3. 5 : 15:00:40.158 
  4. 5 : 1 
  5. main Thread over, paused. 
  6. 2 : 15:00:45.158 
  7. 2 : 9 
  8. input : 10 
  9. 3 : 15:00:47.160 
  10. 3 : 9 
  11. input : 11 
  12. 4 : 15:00:50.162 
  13. 4 : 9 
  14. input : 9 
  15. 9 consumed, worker Thread over, exit. 

可以看到不到1秒钟时间主线程就执行完了。然后5秒后数据从数据源发射出来进入第一步处理,2秒后进入第二步处理,3秒后进入第三步处理,数据被消费掉,就结束了。其中主线程Id是1,工作线程Id是9。

这段代码其实是建立了一个数据通道,在通道的指定位置上插入处理逻辑,等待数据到来。

主线程执行的是建立通道的代码,主线程很快执行完,通道就建好了。此时只是一个空的通道,根本就没有数据。

在数据到来时,由工作线程执行每个节点的逻辑代码来处理数据,然后把数据传入下一个节点,如此反复直至结束。

相关推荐