ScalaPB(5):用akka-stream实现reactive-gRPC

在前面几篇讨论里我们介绍了scala-gRPC的基本功能和使用方法,我们基本确定了选择gRPC作为一种有效的内部系统集成工具,主要因为下面gRPC支持的几种服务模式:

、Unary-Call:独立的一对client-request/server-response,是我们常用的http交互模式

、Server-Streaming:client发出一个request后从server端接收一串多个response

、Client-Streaming:client向server发送一串多个request后从server接收一个response

、Bidirectional-Streaming:由client首先发送request启动连接,然后在这个连接上两端可以不断交互信息。

很明显,gRPC支持双向的streaming。那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。如果我们能用akka-stream编程方式实现gRPC服务调用的话,可能会遭遇下面的场景:在服务端我们只需要实现一种akka-stream的Flow把进来的request转化成出去的response,如下:

// Unary case
Flow[Request].map(computeResponse)
 
// Server streaming
Flow[Request].flatMapConcat(computeResponses)
 
// Client streaming
Flow[Request].fold(defaultResponse)(computeResponse)
 
// Bidirectional streaming
Flow[Request].flatMapConcat(computeResponses)

当然,这是个akka-stream Flow,我们可以在这个Flow里调用任何akka-stream提供的功能,如:

Flow[Request]
  .throttle(1, 10.millis, 1, ThrottleMode.Shaping)
  .map(computeResponse)

在客户端我们可以直接经客户端stub调用Flow,如下:

Source
  .single(request)
  .via(stub.doSomething)
  .runForeach(println)

刚好,beyond-the-lines gRPCAkkaStream开源项目提供这么一种gRPC StreamObserver到aka-stream Flow转换桥梁。下面是gRPCAkkaStream的使用示范。先从Unary-Call开始:下面是.proto文件的IDL服务描述:

syntax = "proto3";
package learn.grpc.akka.stream.services;
message NumPair {
   int32 num1 = ;
   int32 num2 = ;
}
message Num {
   int32 num = ;
}
message SumResult {
   int32 result = ;
}
service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
}

我们看看编译后自动产生的SumGrpcAkkaStream.scala文件中一些相关类型和函数:

服务界面描述:

trait SumNumbers extends AbstractService {
    override def serviceCompanion = SumNumbers
    def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
}

我们看到服务函数sumPair是一个akka-stream Fow[NumPair,SumResult,NotUsed]。下面是具体实现SumNumbers.sumPair代码:

class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
    override def sumPair: Flow[NumPair, SumResult, NotUsed] = {
      logger.info(s"*** calling sumPair ...  ***")
      Flow[NumPair].map {
        case NumPair(a,b) => {
          logger.info(s"serving ${a} + ${b} = ???")
          SumResult(a + b)
        }
      }
    }

产生的客户端stub源代码如下:

class SumNumbersStub(
    channel: Channel,
    options: CallOptions = CallOptions.DEFAULT
  ) extends AbstractStub[SumNumbersStub](channel, options) with SumNumbers {
    override def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] =
      Flow[learn.grpc.akka.stream.services.sum.NumPair].flatMapConcat(request =>
        Source.fromFuture(
          Grpc.guavaFuture2ScalaFuture(
            ClientCalls.futureUnaryCall(channel.newCall(METHOD_SUM_PAIR, options), request)
          )
        )
      )
  
 def stub(channel: Channel): SumNumbersStub = new SumNumbersStub(channel)

我们可以通过stub来调用sumPair方法,如下:

val channel = ManagedChannelBuilder
      .forAddress(host,port)
      .usePlaintext(true)
      .build()

  val stub = SumGrpcAkkaStream.stub(channel)

  def addPair(num1: Int, num2: Int): Source[String,NotUsed] = {
    logger.info(s"Requesting to add $num1, $num2")
    Source
      .single(NumPair(num1,num2))
      .via(stub.sumPair)
      .map(r => s"the result: ${r.result}")
  }

下面是Unary-Call的具体调用方式:

object UnaryCallClient extends App {
  implicit val system = ActorSystem("UnaryClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", 50051)

  client.addPair(29,33).runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

在Server-Streaming中一个request返回的是stream of responses。IDL的描述如下:

service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
   rpc GenIncsFrom(Num) returns (stream Num) {}
}

编译后自动产生的service trait如下:

trait SumNumbers extends AbstractService {
    override def serviceCompanion = SumNumbers
    def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
    def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed]
  }

这个服务函数genIncsFrom是Flow[Num,Num,NotUsed],它的具体实现如下:

class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
  override def genIncsFrom: Flow[Num, Num, NotUsed] = {
    logger.info("*** calling genIncsFrom")
    Flow[Num].mapConcat {
      n => ( to n.num).map {m =>
        logger.info(s"genIncFrom producing num: ${m}")
        Num(m)
      }
    }
  }
}

因为输出response是一个stream,可以用mapConcat展平Seq来产生一个。在客户方调用服务函数genIncsFrom的方式如下:

def genIncNumbers(len: Int): Source[Int,NotUsed] = {
    logger.info(s"Requesting to produce ${len} inc numbers")
    Source
      .single(Num(len))
      .via(stub.genIncsFrom)
      .map(n => n.num)
  }

我们还是用runForeach来运算这个Source:

object ServerStreamingClient extends App {
  implicit val system = ActorSystem("ServerStreamingClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", )

  client.genIncNumbers().runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

再来看看Client-Streaming是如何通过reactive-stream实现的。IDL服务描述如下:

service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
   rpc GenIncsFrom(Num) returns (stream Num) {}
   rpc SumStreamNums(stream Num) returns (SumResult) {}
}

自动产生的service接口如下:

trait SumNumbers extends AbstractService {
    override def serviceCompanion = SumNumbers
    def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
    def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed]
    def sumStreamNums: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
  }

sumStreamNums Flow实现如下:

override def sumStreamNums: Flow[Num, SumResult, NotUsed] = {
    logger.info("*** calling sumStreamNums")
    Flow[Num].fold(SumResult()) {
      case (a, b) =>
        logger.info(s"receiving operand ${b.num}")
        SumResult(b.num + a.result)
    }
  }

request是一个stream,可以用aggregation来汇总成一个response。在客户端调用stub.sumStreamNums:

def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = {
    logger.info(s"Requesting to sum up ${nums}")
    Source(nums.map(Num(_)).to[collection.immutable.Iterable])
      .via(stub.sumStreamNums)
      .map(r => s"the result: ${r.result}")
    }

object ClientStreamingClient extends App {
  implicit val system = ActorSystem("ClientStreamingClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", 50051)

  client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

最后我们示范一下BiDirectional-Streaming。先用IDL定义一个流输入输出的服务函数keepAdding:

service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
   rpc GenIncsFrom(Num) returns (stream Num) {}
   rpc SumStreamNums(stream Num) returns (SumResult) {}
   rpc KeepAdding(stream Num) returns (stream SumResult) {}
}

这个函数的实现代码:

override def keepAdding: Flow[Num, SumResult, NotUsed] = {
    Flow[Num].scan(SumResult()) {
      case (a,b) =>
        logger.info(s"receiving operand ${b.num}")
        SumResult(b.num + a.result)
    }
  }

这个服务函数的作用是把一串输入数字逐个相加并输出当前结果。我们可以用scan来实现这样的功能。下面是客户端调用服务的示范代码:

def ContSum(nums: Seq[Int]): Source[String,NotUsed] = {
    logger.info(s"Requesting to sum up ${nums}")
    Source(nums.map(Num(_)).to[collection.immutable.Iterable])
      .throttle(1, 500.millis, 1, ThrottleMode.shaping)
      .map { n =>
        logger.info(s"Sending number: $n")
        n
      }
      .via(stub.keepAdding)
      .map(r => s"current sum = ${r.result}")
  }

用下面这段代码运算:

object BiDiStreamingClient extends App {
  implicit val system = ActorSystem("BiDiStreamingClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", 50051)

  client.ContSum(Seq(12,4,8,19)).runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

好,下面是本次讨论涉及的所有源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")

resolvers += Resolver.bintrayRepo("beyondthelines", "maven")

libraryDependencies ++= Seq(
  "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1",
  "beyondthelines"         %% "grpcakkastreamgenerator" % "0.0.5"
)

build.sbt

import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion

name := "gRPCAkkaStreamDemo"

version := "0.1"

scalaVersion := "2.12.6"

resolvers += Resolver.bintrayRepo("beyondthelines", "maven")

libraryDependencies ++= Seq(
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
  "io.grpc" % "grpc-netty" % grpcJavaVersion,
  "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
  "io.monix" %% "monix" % "2.3.0",
  // for GRPC Akkastream
  "beyondthelines"         %% "grpcakkastreamruntime" % "0.0.5"
)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value,
  // generate the akka stream files
  grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value
)

src/main/protobuf/sum.proto

syntax = "proto3";

package learn.grpc.akka.stream.services;


message NumPair {
   int32 num1 = ;
   int32 num2 = ;
}

message Num {
   int32 num = ;
}

message SumResult {
   int32 result = ;
}

service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
   rpc GenIncsFrom(Num) returns (stream Num) {}
   rpc SumStreamNums(stream Num) returns (SumResult) {}
   rpc KeepAdding(stream Num) returns (stream SumResult) {}
}

src/main/scala/gRPCAkkaStreamService.scala

package learn.grpc.akka.stream.services.impl

import akka.NotUsed
import akka.stream.scaladsl.Flow
import learn.grpc.akka.stream.services.sum._
import java.util.logging.Logger

class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)

  override def sumPair: Flow[NumPair, SumResult, NotUsed] = {
    logger.info(s"*** calling sumPair ...  ***")
    Flow[NumPair].map {
      case NumPair(a, b) => {
        logger.info(s"serving ${a} + ${b} = ???")
        SumResult(a + b)
      }
    }
  }

  override def genIncsFrom: Flow[Num, Num, NotUsed] = {
    logger.info("*** calling genIncsFrom ... ***")
    Flow[Num].mapConcat {
      n =>
        ( to n.num).map { m =>
          logger.info(s"genIncFrom producing num: ${m}")
          Num(m)
        }
    }
  }

  override def sumStreamNums: Flow[Num, SumResult, NotUsed] = {
    logger.info("*** calling sumStreamNums ... ***")
    Flow[Num].fold(SumResult()) {
      case (a, b) =>
        logger.info(s"receiving operand ${b.num}")
        SumResult(b.num + a.result)
    }
  }

  override def keepAdding: Flow[Num, SumResult, NotUsed] = {
    Flow[Num].scan(SumResult()) {
      case (a,b) =>
        logger.info(s"receiving operand ${b.num}")
        SumResult(b.num + a.result)
    }
  }
}

src/main/scala/gRPCAkkaStreamServer.scala

package learn.grpc.akka.stream.server

import java.util.logging.Logger

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import io.grpc.Server
import learn.grpc.akka.stream.services.impl.gRPCAkkaStreamService
import io.grpc.ServerBuilder
import learn.grpc.akka.stream.services.sum._
class gRPCServer(server: Server) {

  val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)

  def start(): Unit = {
    server.start()
    logger.info(s"Server started, listening on ${server.getPort}")
    sys.addShutdownHook {
      // Use stderr here since the logger may has been reset by its JVM shutdown hook.
      System.err.println("*** shutting down gRPC server since JVM is shutting down")
      stop()
      System.err.println("*** server shut down")
    }
    ()
  }

  def stop(): Unit = {
    server.shutdown()
  }

  /**
    * Await termination on the main thread since the grpc library uses daemon threads.
    */
  def blockUntilShutdown(): Unit = {
    server.awaitTermination()
  }
}

object DemoServer extends App {
  implicit val system = ActorSystem("UnaryServer")
  implicit val mat = ActorMaterializer.create(system)
  val server = new gRPCServer(
    ServerBuilder
      .forPort()
        .addService(
          SumGrpcAkkaStream.bindService(
           new gRPCAkkaStreamService
          )
      ).build()
  )
  server.start()
//  UnaryServer.blockUntilShutdown()
  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

src/main/scala/gRPCAkkaStreamClient.scala

package learn.grpc.akka.stream.client
import learn.grpc.akka.stream.services.sum._
import java.util.logging.Logger

import akka.stream.scaladsl._
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ThrottleMode}
import scala.concurrent.duration._
import io.grpc._
class gRPCAkkaStreamClient(host: String, port: Int) {
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamClient].getName)

  val channel = ManagedChannelBuilder
      .forAddress(host,port)
      .usePlaintext(true)
      .build()

  val stub = SumGrpcAkkaStream.stub(channel)

  def addPair(num1: Int, num2: Int): Source[String,NotUsed] = {
    logger.info(s"Requesting to add $num1, $num2")
    Source
      .single(NumPair(num1,num2))
      .via(stub.sumPair)
      .map(r => s"the result: ${r.result}")
  }
  def genIncNumbers(len: Int): Source[Int,NotUsed] = {
    logger.info(s"Requesting to produce ${len} inc numbers")
    Source
      .single(Num(len))
      .via(stub.genIncsFrom)
      .map(n => n.num)
  }
  def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = {
    logger.info(s"Requesting to sum up ${nums}")
    Source(nums.map(Num(_)).to[collection.immutable.Iterable])
      .throttle(1, 500.millis, 1, ThrottleMode.shaping)
      .map { n =>
        logger.info(s"Sending number: $n")
        n
      }
      .via(stub.sumStreamNums)
      .map(r => s"the result: ${r.result}")
    }
  def ContSum(nums: Seq[Int]): Source[String,NotUsed] = {
    logger.info(s"Requesting to sum up ${nums}")
    Source(nums.map(Num(_)).to[collection.immutable.Iterable])
      .throttle(1, 500.millis, 1, ThrottleMode.shaping)
      .map { n =>
        logger.info(s"Sending number: $n")
        n
      }
      .via(stub.keepAdding)
      .map(r => s"current sum = ${r.result}")
  }
}

object UnaryCallClient extends App {
  implicit val system = ActorSystem("UnaryClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", 50051)

  client.addPair(29,33).runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

object ServerStreamingClient extends App {
  implicit val system = ActorSystem("ServerStreamingClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", 50051)

  client.genIncNumbers(5).runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

object ClientStreamingClient extends App {
  implicit val system = ActorSystem("ClientStreamingClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", 50051)

  client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

object BiDiStreamingClient extends App {
  implicit val system = ActorSystem("BiDiStreamingClient")
  implicit val mat = ActorMaterializer.create(system)

  val client = new gRPCAkkaStreamClient("localhost", 50051)

  client.ContSum(Seq(12,4,8,19)).runForeach(println)

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

相关推荐