FunDA(3)- 流动数据行操作:FDAPipeLine operations using scalaz-stream-fs2

在上节讨论里我们介绍了数据行流式操作的设想,主要目的是把后台数据库的数据载入前端内存再拆分为强类型的数据行,这样我们可以对每行数据进行使用和处理。形象点描述就是对内存里的一个数据流(data-stream)进行逐行操作。我们在上节用foreach模拟了一个流控来示范数据行的操作处理。在这节我们讨论一下用scalaz-stream-fs2作为数据流管理工具来实现FunDA的数据行流动管理功能。fs2的Stream是一种自然的拖动型(pull-model)数据流。而fs2的Pipe类型则像是管道的阀门(valve),我们可以在Pipe里截获流动中的数据行。我们看看下面的fs2 Stream例子:

1   def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
2     _.evalMap {row => Task.delay {println(s"$prompt> $row"); row}}
3                                                   //> log: [ROW](prompt: String)fs2.Pipe[fs2.Task,ROW,ROW]
4   Stream.range(1,5).through(log("")).run.unsafeRun//> > 1
5                                                   //| > 2
6                                                   //| > 3
7                                                   //| > 4

函数log是个Pipe类型。我们看到Pipe类型可以截获Stream中的流动元素,在函数log里我们通过evalMap来立即运算了println把当前的元素内容显示出来。所以我们并没有用runLog来收集Stream的元素(runLog也只能在完成所有元素的收集后才能显示结果)。

按照FunDA设计要求:从后台数据库中读取数据、载入内存然后逐行进行处理,那么我们可以用这个Pipe类型来实现数据的逐行处理,包括控制数据流动以及任意插入一些自定义数据元素。下面我们就试试通过定义Pipe类型的不同功能来实现行数据处理:

1   def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => {
 2     def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => {
 3       h.receive1Option {
 4         case Some((r,h)) => if ( 3 == r) Pull.done
 5                             else Pull.output1(r) >> go(h)
 6         case None => Pull.done
 7       }
 8     }
 9     in.pull(go)
10   }                                               //> stopOn3: [ROW]=> fs2.Pipe[fs2.Task,ROW,ROW]
11   Stream(4,2,9,3,8,1)
12    .through(log("before"))
13    .through(stopOn3)
14    .through(log("after"))
15    .run
16    .unsafeRun                                     //> before> 4
17                                                   //| after> 4
18                                                   //| before> 2
19                                                   //| after> 2
20                                                   //| before> 9
21                                                   //| after> 9
22                                                   //| before> 3

stopOn3是个自定义Pipe。它的功能是截取当前元素、检查当前元素值、如果遇到3则终止数据流。从运算结果看:当before> 3时数据流停止流动(停止向下游发送元素)。虽然成功地实现了它的目的,函数stopOn3的设计者必须对fs2有较深的了解。而对于FunDA的终端用户来说不要说需要掌握fs2的运算机制,就连那些复杂的fs2类型就已经不可接受了。我想了一下:如果我们提供一个像stopOn3这样的Pipe函数、由用户提供有关的功能函数作为传入参数,这样的方式应该有比较大的接收空间。我们先从类型开始:重新模拟一套简明的与fs2类型相对应的FunDA类型:

//数据处理管道
   type FDAPipeLine[ROW] = Stream[Task,ROW]
   //数据作业节点
   type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW]
   //数据管道开关阀门,从此处获得管道内数据
   type FDAValve[ROW] = Handle[Task,ROW]
   //管道连接器
   type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit]

下面是用这些类型向用户提供的帮助函数(helpers):

//库提供:停止数据流动
   def fda_haltFlow = Pull.done                        //> fda_haltFlow: => fs2.Pull[Nothing,Nothing,Nothing]
   //库提供:向下游发送一个ROW
   def fda_sendRow[ROW](row: ROW) = Pull.output1(row)  //> fda_sendRow: [ROW](row: ROW)fs2.Pull[Nothing,ROW,Unit]
   //库提供:处理当前数据。运行用户提供的功能wf
   def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = {
     def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
       h.receive1Option {
         case Some((r,h)) => wf(r) >> go(h)
         case None => fda_haltFlow
       }
     }
     in => in.pull(go)
   }   //> fda_doWork: [ROW](wf: ROW => demo.ws.FDAPipe.FDAPipeJoint[ROW])demo.ws.FDAPipe.FDAWorkNode[ROW]

现在看来貌似一旦用户可以提供一个ROW => FDAPipeJoint[ROW]函数,就可以用fda_doWork函数来运算这个函数了。我们按上面例子的功能要求来设计一个这样的函数:

1  //样板用户提供数据处理功能函数
 2   def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => {
 3      if (3 == row ) fda_haltFlow
 4      else fda_sendRow(row)
 5   }                                               //> breakOn3: [ROW]=> ROW => demo.ws.FDAPipe.FDAPipeJoint[ROW]
 6   //测试运算
 7   Stream(4,2,9,3,8,1)
 8    .through(log("before"))
 9    .through(fda_doWork(breakOn3))
10    .through(log("after"))
11    .run
12    .unsafeRun                                     //> before> 4
13                                                   //| after> 4
14                                                   //| before> 2
15                                                   //| after> 2
16                                                   //| before> 9
17                                                   //| after> 9
18                                                   //| before> 3

成功实现功能。下面是这篇讨论中的示范代码:

1 import fs2._
 2 object FDAPipe {
 3   def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
 4     _.evalMap {row => Task.delay {println(s"$prompt> $row"); row}}
 5   Stream.range(1,5).through(log("")).run.unsafeRun
 6   def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => {
 7     def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => {
 8       h.receive1Option {
 9         case Some((r,h)) => if ( 3 == r) Pull.done
10                             else Pull.output1(r) >> go(h)
11         case None => Pull.done
12       }
13     }
14     in.pull(go)
15   }
16   Stream(4,2,9,3,8,1)
17    .through(log("before"))
18    .through(stopOn3)
19    .through(log("after"))
20    .run
21    .unsafeRun
22   //数据处理管道
23   type FDAPipeLine[ROW] = Stream[Task,ROW]
24   //数据作业节点
25   type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW]
26   //数据管道开关阀门,从此处获得管道内数据
27   type FDAValve[ROW] = Handle[Task,ROW]
28   //管道连接器
29   type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit]
30   
31   //库提供:停止数据流动
32   def fda_haltFlow = Pull.done
33   //库提供:向下游发送一个ROW
34   def fda_sendRow[ROW](row: ROW) = Pull.output1(row)
35   //库提供:处理当前数据。运行用户提供的功能wf
36   def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = {
37     def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
38       h.receive1Option {
39         case Some((r,h)) => wf(r) >> go(h)
40         case None => fda_haltFlow
41       }
42     }
43     in => in.pull(go)
44   }
45   //用户提供数据处理功能函数
46   def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => {
47      if (3 == row ) fda_haltFlow
48      else fda_sendRow(row)
49   }
50   //测试运算
51   Stream(4,2,9,3,8,1)
52    .through(log("before"))
53    .through(fda_doWork(breakOn3))
54    .through(log("after"))
55    .run
56    .unsafeRun
57 }

相关推荐