Akka(38): Http:Entityof ByteString-数据传输基础
object HttpEntity { implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-)`, string) implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes) implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data) def apply(contentType: ContentType.NonBinary, string: String): HttpEntity.Strict = if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset))) def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict = if (bytes.length == ) empty(contentType) else apply(contentType, ByteString(bytes)) def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict = if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data) def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity = if (contentLength == ) empty(contentType) else HttpEntity.Default(contentType, contentLength, data) def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked = HttpEntity.Chunked.fromData(contentType, data) ...
object ContentTypes { val `application/json` = ContentType(MediaTypes.`application/json`) val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`) val `text/plain(UTF-)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-` val `text/html(UTF-)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-` val `text/xml(UTF-)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-` val `text/csv(UTF-)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-` // used for explicitly suppressing the rendering of Content-Type headers on requests and responses val NoContentType = ContentType(MediaTypes.NoMediaType) }
def runService(request: HttpRequest, rentity: RequestEntity) = { val futResp = for { entity <- Future.successful(rentity) resp <- Http(sys).singleRequest( request.copy(entity = rentity) ) } yield resp futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.map(_.utf8String).runForeach(println) case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download rows!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } }
我们只需要对这个函数传入RequestEntity就可以了解返回Response里Entity的许多细节了。首先我们要求服务端发送一个纯字符串Hello World。服务端代码如下:
} ~ path("text") { get { complete("Hello World!") } ~
虽然complete("Hello World!")有些迷糊,不过应该complete做了些字符串到ByteString的转换。我们可以从上面这个runService函数得到证实。下面是这个例子的调用:
val reqText = HttpRequest(uri = s"http://localhost:8011/text") runService(reqText,HttpEntity.Empty) .andThen{case _ => sys.terminate()}
我们再试着发送一些数据給服务端,然后让服务端把结果通过response entity返回来:
} ~ path("text") { get { complete("Hello World!") } ~ post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } }
我们看到服务端对request entity的操作是以ByteString进行的。客户端上传一串字符的request如下:
val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text") val uploadText = HttpEntity( ContentTypes.`text/plain(UTF-)`, // transform each number to a chunk of bytes ByteString("hello world again") ) runService(postText,uploadText) .andThen{case _ => sys.terminate()}
val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain val source = limitableByteSource(numbers)
path("random") { get { complete( HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes source.take() ) ) } ~
futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println) Await.result(futEnt, Duration.Inf) // throws if binding fails println("End of stream!!!") case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download rows!") case Failure(err) => println(s"Download failed: ${err.getMessage}") }
val reqRandom = HttpRequest(uri = s"http://localhost:8011/random") runService(reqRandom,HttpEntity.Empty) .andThen{case _ => sys.terminate()}
post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } }
val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain val source = limitableByteSource(numbers) val bytes = HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes source.take() ) val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random") runService(postRandom,bytes) .andThen{case _ => sys.terminate()}
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ import akka.util.ByteString import akka.http.scaladsl.model.HttpEntity._ import scala.util.Random object ServerEntity extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain val source = limitableByteSource(numbers) val route = path("random") { get { withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes source.take(1000)) ) } } ~ post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } } } ~ path("text") { get { complete("Hello World!") } ~ post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } } } val (port, host) = (8011,"localhost") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpEntity.limitableByteSource import akka.http.scaladsl.model._ import scala.concurrent.duration._ import akka.util.ByteString import scala.concurrent._ import scala.util._ object ClientEntity extends App { implicit val sys = ActorSystem("ClientSys") implicit val mat = ActorMaterializer() implicit val ec = sys.dispatcher def runService(request: HttpRequest, rentity: RequestEntity) = { val futResp = for { entity <- Future.successful(rentity) resp <- Http(sys).singleRequest( request.copy(entity = rentity) ) } yield resp futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println) Await.result(futEnt, Duration.Inf) // throws if binding fails println("End of stream!!!") case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download rows!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } } val reqText = HttpRequest(uri = s"http://localhost:8011/text") // runService(reqText,HttpEntity.Empty) // .andThen{case _ => sys.terminate()} val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text") val uploadText = HttpEntity( ContentTypes.`text/plain(UTF-8)`, // transform each number to a chunk of bytes ByteString("hello world again") ) // runService(postText,uploadText) // .andThen{case _ => sys.terminate()} val reqRandom = HttpRequest(uri = s"http://localhost:8011/random") // runService(reqRandom,HttpEntity.Empty) // .andThen{case _ => sys.terminate()} val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain val source = limitableByteSource(numbers) val bytes = HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes source.take(10000) ) val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random") runService(postRandom,bytes) .andThen{case _ => sys.terminate()} }