標籤:

高層服務端 API(一)

高層服務端 API

除底層服務端 API外,Akka HTTP 提供非常靈活的 路由 DSL,用於優雅定義 RESTful web service,補充了底層 API 的空檔,提供典型 Web 伺服器、框架的大部分功能,比如 URI 解構、內容協商、靜態內容服務。

注意

建議先閱讀 請求、響應實體的流式本質 一節,該節講解了底層的全棧流的概念,這對來自非 streaming first HTTP 客戶端背景的用戶可能有點出乎意料。

最簡單的例子

如下,是使用路由 DSL 實現的,完整的、非常基礎的 Akka HTTP 應用:

import akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.http.scaladsl.model._nimport akka.http.scaladsl.server.Directives._nimport akka.stream.ActorMaterializernimport scala.io.StdInnnobject WebServer {n def main(args: Array[String]) {nn implicit val system = ActorSystem("my-system")n implicit val materializer = ActorMaterializer()n // needed for the future flatMap/onComplete in the endn implicit val executionContext = system.dispatchernn val route =n path("hello") {n get {n complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))n }n }nn val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)nn println(s"Server online at http://localhost:8080/nPress RETURN to stop...")n StdIn.readLine() // let it run until user presses returnn bindingFuturen .flatMap(_.unbind()) // trigger unbinding from the portn .onComplete(_ => system.terminate()) // and shutdown when donen }n}n

該例子在 localhost 上啟動 HTTP 伺服器,並響應 `/hello` 路徑的 GET 請求。

API 可能變化

下例使用了實驗性特性,且在以後的 Akka HTTP 中,其 API 極有可能變化。更多信息,請查看 Akka 文檔中的 Binary Compatibility Rules)。

import akka.http.scaladsl.model.{ ContentTypes, HttpEntity }nimport akka.http.scaladsl.server.HttpAppnimport akka.http.scaladsl.server.Routenn// Server definitionnobject WebServer extends HttpApp {n override def routes: Route =n path("hello") {n get {n complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))n }n }n}nn// Starting the servernWebServer.startServer("localhost", 8080)n

查看 HttpApp Bootstrap 獲取更多此類情動伺服器的方法。

複雜一點的例子

下例展示了 Akka HTTP 的路由定義,可以稍微感受下使用路由 DSL 定義真實 API 的樣子:

import akka.actor.{ActorRef, ActorSystem}nimport akka.http.scaladsl.coding.Deflatenimport akka.http.scaladsl.marshalling.ToResponseMarshallernimport akka.http.scaladsl.model.StatusCodes.MovedPermanentlynimport akka.http.scaladsl.server.Directives._nimport akka.http.scaladsl.unmarshalling.FromRequestUnmarshallernimport akka.pattern.asknimport akka.stream.ActorMaterializernimport akka.util.Timeoutnn// types used by the API routesntype Money = Double // only for demo purposes, dont try this at home!ntype TransactionResult = Stringncase class User(name: String)ncase class Order(email: String, amount: Money)ncase class Update(order: Order)ncase class OrderItem(i: Int, os: Option[String], s: String)nn// marshalling would usually be derived automatically using librariesnimplicit val orderUM: FromRequestUnmarshaller[Order] = ???nimplicit val orderM: ToResponseMarshaller[Order] = ???nimplicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???nimplicit val timeout: Timeout = ??? // for actor asksnimplicit val ec: ExecutionContext = ???nimplicit val mat: ActorMaterializer = ???nimplicit val sys: ActorSystem = ???nn// backend entry pointsndef myAuthenticator: Authenticator[User] = ???ndef retrieveOrdersFromDB: Seq[Order] = ???ndef myDbActor: ActorRef = ???ndef processOrderRequest(id: Int, complete: Order => Unit): Unit = ???nnval route = {n path("orders") {n authenticateBasic(realm = "admin area", myAuthenticator) { user =>n get {n encodeResponseWith(Deflate) {n complete {n // marshal custom object with in-scope marshallern retrieveOrdersFromDBn }n }n } ~n post {n // decompress gzipped or deflated requests if requiredn decodeRequest {n // unmarshal with in-scope unmarshallern entity(as[Order]) { order =>n complete {n // ... write order to DBn "Order received"n }n }n }n }n }n } ~n // extract URI path element as Intn pathPrefix("order" / IntNumber) { orderId =>n pathEnd {n (put | parameter(method ! "put")) {n // form extraction from multipart or www-url-encoded formsn formFields((email, total.as[Money])).as(Order) { order =>n complete {n // complete with serialized Future resultn (myDbActor ? Update(order)).mapTo[TransactionResult]n }n }n } ~n get {n // debugging helpern logRequest("GET-ORDER") {n // use in-scope marshaller to create completer functionn completeWith(instanceOf[Order]) { completer =>n // customn processOrderRequest(orderId, completer)n }n }n }n } ~n path("items") {n get {n // parameters to case class extractionn parameters((size.as[Int], color ?, dangerous ? "no"))n .as(OrderItem) { orderItem =>n // ... route using case class instance created fromn // required and optional query parametersn complete("") // hiden }n }n }n } ~n pathPrefix("documentation") {n // optionally compresses the response with Gzip or Deflaten // if the client accepts compressed responsesn encodeResponse {n // serve up static content from a JAR resourcen getFromResourceDirectory("docs")n }n } ~n path("oldApi" / Remaining) { pathRest =>n redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)n }n}n

處理伺服器錯誤

在初始化或運行 Akka HTTP 伺服器時,很多場景都可能發生錯誤。默認情況下,Akka 將列印這些錯誤,有時除簡單列印外,還需要其他措施,比如關閉 Actor 系統,或通知外部監控等。

綁定錯誤

第一種錯誤類型是,伺服器無法綁定到指定埠,例如當該埠已被佔用,或無該埠的許可權(比如只有 root 用戶可用)。該場景下 binding future 將立刻失敗,可以通過監聽該 `Future` 的完成事件來做出反應:

import akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.http.scaladsl.Http.ServerBindingnimport akka.http.scaladsl.server.Directives._nimport akka.stream.ActorMaterializernnimport scala.concurrent.Futurennobject WebServer {n def main(args: Array[String]) {n implicit val system = ActorSystem()n implicit val materializer = ActorMaterializer()n // needed for the future foreach in the endn implicit val executionContext = system.dispatchernn val handler = get {n complete("Hello world!")n }nn // lets say the OS wont allow us to bind to 80.n val (host, port) = ("localhost", 80)n val bindingFuture: Future[ServerBinding] =n Http().bindAndHandle(handler, host, port)nn bindingFuture.failed.foreach { ex =>n log.error(ex, "Failed to bind to {}:{}!", host, port)n }n }n}n

注意

若要從更底層視角了解各種失敗場景,參考 底層 API 中如何處理伺服器失敗。

路由 DSL 中的錯誤與異常

路由 DSL 中發生的錯誤、異常由 `ExceptionHandler` 處理,更多細節參考 Exception Handling。ExceptionHandler 將異常轉化為 HttpResponse,其中包含錯誤碼和錯誤描述信息。

文件上傳

FileUploadDirectives ? Akka HTTP 介紹支持文件上傳的指令。

通過接受 Multipart.FormData 實體,可以實現瀏覽器表單文件上傳,因為消息體、消息體 payload 是 `Source`,並非立即可得,所以需要消費它們的流。

下面是一個簡單例子,將接受到的文件寫入磁碟的臨時文件中,將表單欄位寫入虛擬的資料庫:

val uploadVideo =n path("video") {n entity(as[Multipart.FormData]) { formData =>nn // collect all parts of the multipart as it arrives into a mapn val allPartsF: Future[Map[String, Any]] = formData.parts.mapAsync[(String, Any)](1) {nn case b: BodyPart if b.name == "file" =>n // stream into a file as the chunks of it arrives and return a futuren // file to where it got storedn val file = File.createTempFile("upload", "tmp")n b.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ =>n (b.name -> file))nn case b: BodyPart =>n // collect form field valuesn b.toStrict(2.seconds).map(strict =>n (b.name -> strict.entity.data.utf8String))nn }.runFold(Map.empty[String, Any])((map, tuple) => map + tuple)nn val done = allPartsF.map { allParts =>n // You would have some better validation/unmarshalling heren db.create(Video(n file = allParts("file").asInstanceOf[File],n title = allParts("title").asInstanceOf[String],n author = allParts("author").asInstanceOf[String]))n }nn // when processing have finished create a response for the usern onSuccess(allPartsF) { allParts =>n complete {n "ok!"n }n }n }n }n

除將文件寫入臨時文件,還可以在文件到來時對其進行處理。下例接受 `.csv` 文件,將其解析成行,然後將其發送到 Actor 中進一步處理:

val splitLines = Framing.delimiter(ByteString("n"), 256)nnval csvUploads =n path("metadata" / LongNumber) { id =>n entity(as[Multipart.FormData]) { formData =>n val done: Future[Done] = formData.parts.mapAsync(1) {n case b: BodyPart if b.filename.exists(_.endsWith(".csv")) =>n b.entity.dataBytesn .via(splitLines)n .map(_.utf8String.split(",").toVector)n .runForeach(csv =>n metadataActor ! MetadataActor.Entry(id, csv))n case _ => Future.successful(Done)n }.runWith(Sink.ignore)nn // when processing have finished create a response for the usern onSuccess(done) { _ =>n complete {n "ok!"n }n }n }n }n

配置服務端 HTTPS

查閱 Server-Side HTTPS Support 獲取更多在服務端配置、使用 HTTPS 的細節。

備註:知乎的編輯器確實很難用,原文在 github 上,點擊 此處 查看原文,歡迎提 issue 和 PR。

推薦閱讀:

Scala數據分析 Step:1=>解析xlsx

TAG:Akka | Scala |