聊聊reactive streams的parallel flux
序
本文主要研究下reactive streams的flux的parallel运行方式.
目的
在一些涉及IO操作,比如读取文件,访问数据库等,通常建议使用异步线程以parallel模式运行,以提升性能。
实例
@Test public void testParallelRunOn(){ Flux.range(1, 1000) .log() .parallel(8) .runOn(Schedulers.parallel()) //parallel flux .sequential() //必须使用sequential来将这些异步线程的执行结果汇集成一个stream .map(e -> { LOGGER.info("map thread:{},e:{}",Thread.currentThread().getName(),e); return e*10; }) .subscribe(e -> { LOGGER.info("subscribe thread:{},e:{}",Thread.currentThread().getName(),e); }); }
部分输出
2:38:53.949 [main] INFO reactor.Flux.Range.1 - | onNext(13) 22:38:53.949 [parallel-2] INFO com.example.demo.ParallelTest - subscribe thread:parallel-2,e:120 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(14) 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:13 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:130 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(15) 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:14 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:140 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(16) 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:15 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:150 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(17) 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:16 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(18) 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:160 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:17 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(19) 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:170 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:18
小结
- parallel来指定线程池线程个数
- runOn启动parallel flux
- sequential将异步线程池执行结果汇集成一个stream
doc
相关推荐
flx 2020-08-31
ithzhang 2020-06-21
winc 2020-03-03
LaySwift 2019-12-31
第号 2019-11-09
flx 2019-08-25
水龙吟的备忘录 2017-07-12
banana000 2019-07-01
红雪中国 2016-10-08
flx 2019-06-30
凌燕 2019-06-29
banana000 2019-06-29
BrotherWind 2019-06-28
banana000 2019-06-27
LOGGER.info("compose executed");return stringFlux.map(e -> e + "$");LOGGER.info("flatMap executed&qu
flx 2019-06-27
julykobe 2019-06-27
ShaLiWa 2019-06-27
banana000 2019-06-26