ScalaPB(5):用akka-stream实现reactive-gRPC
在前面几篇讨论里我们介绍了scala-gRPC的基本功能和使用方法,我们基本确定了选择gRPC作为一种有效的内部系统集成工具,主要因为下面gRPC支持的几种服务模式:
、Unary-Call:独立的一对client-request/server-response,是我们常用的http交互模式 、Server-Streaming:client发出一个request后从server端接收一串多个response 、Client-Streaming:client向server发送一串多个request后从server接收一个response 、Bidirectional-Streaming:由client首先发送request启动连接,然后在这个连接上两端可以不断交互信息。
很明显,gRPC支持双向的streaming。那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。如果我们能用akka-stream编程方式实现gRPC服务调用的话,可能会遭遇下面的场景:在服务端我们只需要实现一种akka-stream的Flow把进来的request转化成出去的response,如下:
// Unary case Flow[Request].map(computeResponse) // Server streaming Flow[Request].flatMapConcat(computeResponses) // Client streaming Flow[Request].fold(defaultResponse)(computeResponse) // Bidirectional streaming Flow[Request].flatMapConcat(computeResponses)
当然,这是个akka-stream Flow,我们可以在这个Flow里调用任何akka-stream提供的功能,如:
Flow[Request] .throttle(1, 10.millis, 1, ThrottleMode.Shaping) .map(computeResponse)
在客户端我们可以直接经客户端stub调用Flow,如下:
Source .single(request) .via(stub.doSomething) .runForeach(println)
刚好,beyond-the-lines gRPCAkkaStream开源项目提供这么一种gRPC StreamObserver到aka-stream Flow转换桥梁。下面是gRPCAkkaStream的使用示范。先从Unary-Call开始:下面是.proto文件的IDL服务描述:
syntax = "proto3"; package learn.grpc.akka.stream.services; message NumPair { int32 num1 = ; int32 num2 = ; } message Num { int32 num = ; } message SumResult { int32 result = ; } service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} }
我们看看编译后自动产生的SumGrpcAkkaStream.scala文件中一些相关类型和函数:
服务界面描述:
trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] }
我们看到服务函数sumPair是一个akka-stream Fow[NumPair,SumResult,NotUsed]。下面是具体实现SumNumbers.sumPair代码:
class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) override def sumPair: Flow[NumPair, SumResult, NotUsed] = { logger.info(s"*** calling sumPair ... ***") Flow[NumPair].map { case NumPair(a,b) => { logger.info(s"serving ${a} + ${b} = ???") SumResult(a + b) } } }
产生的客户端stub源代码如下:
class SumNumbersStub( channel: Channel, options: CallOptions = CallOptions.DEFAULT ) extends AbstractStub[SumNumbersStub](channel, options) with SumNumbers { override def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] = Flow[learn.grpc.akka.stream.services.sum.NumPair].flatMapConcat(request => Source.fromFuture( Grpc.guavaFuture2ScalaFuture( ClientCalls.futureUnaryCall(channel.newCall(METHOD_SUM_PAIR, options), request) ) ) ) def stub(channel: Channel): SumNumbersStub = new SumNumbersStub(channel)
我们可以通过stub来调用sumPair方法,如下:
val channel = ManagedChannelBuilder .forAddress(host,port) .usePlaintext(true) .build() val stub = SumGrpcAkkaStream.stub(channel) def addPair(num1: Int, num2: Int): Source[String,NotUsed] = { logger.info(s"Requesting to add $num1, $num2") Source .single(NumPair(num1,num2)) .via(stub.sumPair) .map(r => s"the result: ${r.result}") }
下面是Unary-Call的具体调用方式:
object UnaryCallClient extends App { implicit val system = ActorSystem("UnaryClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.addPair(29,33).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
在Server-Streaming中一个request返回的是stream of responses。IDL的描述如下:
service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} rpc GenIncsFrom(Num) returns (stream Num) {} }
编译后自动产生的service trait如下:
trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] }
这个服务函数genIncsFrom是Flow[Num,Num,NotUsed],它的具体实现如下:
class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) override def genIncsFrom: Flow[Num, Num, NotUsed] = { logger.info("*** calling genIncsFrom") Flow[Num].mapConcat { n => ( to n.num).map {m => logger.info(s"genIncFrom producing num: ${m}") Num(m) } } } }
因为输出response是一个stream,可以用mapConcat展平Seq来产生一个。在客户方调用服务函数genIncsFrom的方式如下:
def genIncNumbers(len: Int): Source[Int,NotUsed] = { logger.info(s"Requesting to produce ${len} inc numbers") Source .single(Num(len)) .via(stub.genIncsFrom) .map(n => n.num) }
我们还是用runForeach来运算这个Source:
object ServerStreamingClient extends App { implicit val system = ActorSystem("ServerStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", ) client.genIncNumbers().runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
再来看看Client-Streaming是如何通过reactive-stream实现的。IDL服务描述如下:
service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} rpc GenIncsFrom(Num) returns (stream Num) {} rpc SumStreamNums(stream Num) returns (SumResult) {} }
自动产生的service接口如下:
trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] def sumStreamNums: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] }
sumStreamNums Flow实现如下:
override def sumStreamNums: Flow[Num, SumResult, NotUsed] = { logger.info("*** calling sumStreamNums") Flow[Num].fold(SumResult()) { case (a, b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } }
request是一个stream,可以用aggregation来汇总成一个response。在客户端调用stub.sumStreamNums:
def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .via(stub.sumStreamNums) .map(r => s"the result: ${r.result}") } object ClientStreamingClient extends App { implicit val system = ActorSystem("ClientStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
最后我们示范一下BiDirectional-Streaming。先用IDL定义一个流输入输出的服务函数keepAdding:
service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} rpc GenIncsFrom(Num) returns (stream Num) {} rpc SumStreamNums(stream Num) returns (SumResult) {} rpc KeepAdding(stream Num) returns (stream SumResult) {} }
这个函数的实现代码:
override def keepAdding: Flow[Num, SumResult, NotUsed] = { Flow[Num].scan(SumResult()) { case (a,b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } }
这个服务函数的作用是把一串输入数字逐个相加并输出当前结果。我们可以用scan来实现这样的功能。下面是客户端调用服务的示范代码:
def ContSum(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .throttle(1, 500.millis, 1, ThrottleMode.shaping) .map { n => logger.info(s"Sending number: $n") n } .via(stub.keepAdding) .map(r => s"current sum = ${r.result}") }
用下面这段代码运算:
object BiDiStreamingClient extends App { implicit val system = ActorSystem("BiDiStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.ContSum(Seq(12,4,8,19)).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
好,下面是本次讨论涉及的所有源代码:
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1", "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5" )
build.sbt
import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "gRPCAkkaStreamDemo" version := "0.1" scalaVersion := "2.12.6" resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0", // for GRPC Akkastream "beyondthelines" %% "grpcakkastreamruntime" % "0.0.5" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value, // generate the akka stream files grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value )
src/main/protobuf/sum.proto
syntax = "proto3"; package learn.grpc.akka.stream.services; message NumPair { int32 num1 = ; int32 num2 = ; } message Num { int32 num = ; } message SumResult { int32 result = ; } service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} rpc GenIncsFrom(Num) returns (stream Num) {} rpc SumStreamNums(stream Num) returns (SumResult) {} rpc KeepAdding(stream Num) returns (stream SumResult) {} }
src/main/scala/gRPCAkkaStreamService.scala
package learn.grpc.akka.stream.services.impl import akka.NotUsed import akka.stream.scaladsl.Flow import learn.grpc.akka.stream.services.sum._ import java.util.logging.Logger class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) override def sumPair: Flow[NumPair, SumResult, NotUsed] = { logger.info(s"*** calling sumPair ... ***") Flow[NumPair].map { case NumPair(a, b) => { logger.info(s"serving ${a} + ${b} = ???") SumResult(a + b) } } } override def genIncsFrom: Flow[Num, Num, NotUsed] = { logger.info("*** calling genIncsFrom ... ***") Flow[Num].mapConcat { n => ( to n.num).map { m => logger.info(s"genIncFrom producing num: ${m}") Num(m) } } } override def sumStreamNums: Flow[Num, SumResult, NotUsed] = { logger.info("*** calling sumStreamNums ... ***") Flow[Num].fold(SumResult()) { case (a, b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } } override def keepAdding: Flow[Num, SumResult, NotUsed] = { Flow[Num].scan(SumResult()) { case (a,b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } } }
src/main/scala/gRPCAkkaStreamServer.scala
package learn.grpc.akka.stream.server import java.util.logging.Logger import akka.actor.ActorSystem import akka.stream.ActorMaterializer import io.grpc.Server import learn.grpc.akka.stream.services.impl.gRPCAkkaStreamService import io.grpc.ServerBuilder import learn.grpc.akka.stream.services.sum._ class gRPCServer(server: Server) { val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName) def start(): Unit = { server.start() logger.info(s"Server started, listening on ${server.getPort}") sys.addShutdownHook { // Use stderr here since the logger may has been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down") stop() System.err.println("*** server shut down") } () } def stop(): Unit = { server.shutdown() } /** * Await termination on the main thread since the grpc library uses daemon threads. */ def blockUntilShutdown(): Unit = { server.awaitTermination() } } object DemoServer extends App { implicit val system = ActorSystem("UnaryServer") implicit val mat = ActorMaterializer.create(system) val server = new gRPCServer( ServerBuilder .forPort() .addService( SumGrpcAkkaStream.bindService( new gRPCAkkaStreamService ) ).build() ) server.start() // UnaryServer.blockUntilShutdown() scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
src/main/scala/gRPCAkkaStreamClient.scala
package learn.grpc.akka.stream.client import learn.grpc.akka.stream.services.sum._ import java.util.logging.Logger import akka.stream.scaladsl._ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, ThrottleMode} import scala.concurrent.duration._ import io.grpc._ class gRPCAkkaStreamClient(host: String, port: Int) { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamClient].getName) val channel = ManagedChannelBuilder .forAddress(host,port) .usePlaintext(true) .build() val stub = SumGrpcAkkaStream.stub(channel) def addPair(num1: Int, num2: Int): Source[String,NotUsed] = { logger.info(s"Requesting to add $num1, $num2") Source .single(NumPair(num1,num2)) .via(stub.sumPair) .map(r => s"the result: ${r.result}") } def genIncNumbers(len: Int): Source[Int,NotUsed] = { logger.info(s"Requesting to produce ${len} inc numbers") Source .single(Num(len)) .via(stub.genIncsFrom) .map(n => n.num) } def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .throttle(1, 500.millis, 1, ThrottleMode.shaping) .map { n => logger.info(s"Sending number: $n") n } .via(stub.sumStreamNums) .map(r => s"the result: ${r.result}") } def ContSum(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .throttle(1, 500.millis, 1, ThrottleMode.shaping) .map { n => logger.info(s"Sending number: $n") n } .via(stub.keepAdding) .map(r => s"current sum = ${r.result}") } } object UnaryCallClient extends App { implicit val system = ActorSystem("UnaryClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.addPair(29,33).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() } object ServerStreamingClient extends App { implicit val system = ActorSystem("ServerStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.genIncNumbers(5).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() } object ClientStreamingClient extends App { implicit val system = ActorSystem("ClientStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() } object BiDiStreamingClient extends App { implicit val system = ActorSystem("BiDiStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.ContSum(Seq(12,4,8,19)).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }