Akka(42): Http:身份验证 - authentication, autorization and use of ra
当我们把Akka-http作为数据库数据交换工具时,数据是以Source[ROW,_]形式存放在Entity里的。很多时候除数据之外我们可能需要进行一些附加的信息传递如对数据的具体处理方式等。我们可以通过Akka-http的raw-header来实现附加自定义消息的传递,这项功能可以通过Akka-http提供的raw-header筛选功能来实现。在客户端我们把附加消息放在HttpRequest的raw header里,如下:
import akka.http.scaladsl.model.headers._ val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") .addHeader(RawHeader("action","insert:county"))
在这里客户端注明上传数据应插入county表。服务端可以像下面这样获取这项信息:
optionalHeaderValueByName("action") { case Some(action) => entity(asSourceOf[County]) { source => val futofNames: Future[List[String]] = source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) complete(s"Received rows for $action") } case None => complete ("No action specified!") }
Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端可以用下面的方法提供自己的用户身份信息:
import akka.http.scaladsl.model.headers._ val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") .addHeader(RawHeader("action","insert:county")) .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))
服务端对客户端的身份验证处理方法如下:
import akka.http.scaladsl.server.directives.Credentials def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = { implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") credentials match { case p @ Credentials.Provided(id) => Future { // potentially if (p.verify("p4ssw0rd")) Some(User(id)) else None } case _ => Future.successful(None) } } case class User(name: String) val validUsers = Set("john","peter","tiger","susan") def hasAdminPermissions(user: User): Future[Boolean] = { implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") Future.successful(validUsers.contains(user.name)) }
下面是Credential-Directive的使用方法:
authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user => authorizeAsync(_ => hasPermissions(user)) { withoutSizeLimit { handleExceptions(postExceptionHandler) { optionalHeaderValueByName("action") { case Some(action) => entity(asSourceOf[County]) { source => val futofNames: Future[List[String]] = source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) complete(s"Received rows for $action sent from $user") } case None => complete(s"$user did not specify action for uploaded rows!") } } } } }
下面是本次讨论的示范代码:
客户端:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import scala.util._ import akka._ import akka.http.scaladsl.common._ import spray.json.DefaultJsonProtocol import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.common.EntityStreamingSupport import akka.http.scaladsl.model._ import spray.json._ trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol object Converters extends MyFormats { case class County(id: Int, name: String) implicit val countyFormat = jsonFormat2(County) } object HttpClientDemo extends App { import Converters._ implicit val sys = ActorSystem("ClientSys") implicit val mat = ActorMaterializer() implicit val ec = sys.dispatcher implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() import akka.util.ByteString import akka.http.scaladsl.model.HttpEntity.limitableByteSource val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")} def countyToByteString(c: County) = { ByteString(c.toJson.toString) } val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString) val rowBytes = limitableByteSource(source via flowCountyToByteString) import akka.http.scaladsl.model.headers._ val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") .addHeader(RawHeader("action","insert:county")) .addCredentials(BasicHttpCredentials("john", "p4ssw0rd")) val data = HttpEntity( ContentTypes.`application/json`, rowBytes ) def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = { val futResp = Http(sys).singleRequest( request.copy(entity = dataEntity) ) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.map(_.utf8String).runForeach(println) case Success(r@HttpResponse(code, _, _, _)) => println(s"Upload request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to Upload file!") case Failure(err) => println(s"Upload failed: ${err.getMessage}") } } uploadRows(request,data) scala.io.StdIn.readLine() sys.terminate() }
服务端:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka._ import akka.http.scaladsl.common._ import spray.json.DefaultJsonProtocol import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import scala.concurrent._ import akka.http.scaladsl.server._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol object Converters extends MyFormats { case class County(id: Int, name: String) val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") } implicit val countyFormat = jsonFormat2(County) } object HttpServerDemo extends App { import Converters._ implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher implicit val jsonStreamingSupport = EntityStreamingSupport.json() .withParallelMarshalling(parallelism = 8, unordered = false) def postExceptionHandler: ExceptionHandler = ExceptionHandler { case _: RuntimeException => extractRequest { req => req.discardEntityBytes() complete((StatusCodes.InternalServerError.intValue, "Upload Failed!")) } } import akka.http.scaladsl.server.directives.Credentials def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = { implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") credentials match { case p @ Credentials.Provided(id) => Future { // potentially if (p.verify("p4ssw0rd")) Some(User(id)) else None } case _ => Future.successful(None) } } case class User(name: String) val validUsers = Set("john","peter","tiger","susan") def hasPermissions(user: User): Future[Boolean] = { implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") Future.successful(validUsers.contains(user.name)) } val route = path("rows") { get { complete { source } } ~ post { authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user => authorizeAsync(_ => hasPermissions(user)) { withoutSizeLimit { handleExceptions(postExceptionHandler) { optionalHeaderValueByName("action") { case Some(action) => entity(asSourceOf[County]) { source => val futofNames: Future[List[String]] = source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) complete(s"Received rows for $action sent from $user") } case None => complete(s"$user did not specify action for uploaded rows!") } } } } } } } val (port, host) = (8011,"localhost") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }
相关推荐
渔民旁的松树 2020-05-26
摇摆少年梦的技术 2020-04-30
nobodyxiaomi 2020-01-08
TIGERXC 2019-11-04
求贤若渴礼贤下士 2019-09-06
摇摆少年梦的技术 2016-02-15
jiexray 2015-11-21
jiexray 2014-11-04
nobodyxiaomi 2014-08-12
摇摆少年梦的技术 2014-08-02
摇摆少年梦的技术 2013-11-26
jiexray 2012-08-20
nobodyxiaomi 2012-05-19
渔民旁的松树 2012-05-19
TIGERXC 2012-05-02
80931836 2019-06-27
nobodyxiaomi 2019-06-27
TIGERXC 2014-12-22
渔民旁的松树 2019-06-21