高層服務端 API
除底層服務端 API外,Akka HTTP 提供非常靈活的 路由 DSL,用於優雅定義 RESTful web service,補充了底層 API 的空檔,提供典型 Web 伺服器、框架的大部分功能,比如 URI 解構、內容協商、靜態內容服務。
建議先閱讀 請求、響應實體的流式本質 一節,該節講解了底層的全棧流的概念,這對來自非 streaming first HTTP 客戶端背景的用戶可能有點出乎意料。
如下,是使用路由 DSL 實現的,完整的、非常基礎的 Akka HTTP 應用:
import akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.http.scaladsl.model._nimport akka.http.scaladsl.server.Directives._nimport akka.stream.ActorMaterializernimport scala.io.StdInnnobject WebServer {n def main(args: Array[String]) {nn implicit val system = ActorSystem("my-system")n implicit val materializer = ActorMaterializer()n // needed for the future flatMap/onComplete in the endn implicit val executionContext = system.dispatchernn val route =n path("hello") {n get {n complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))n }n }nn val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)nn println(s"Server online at http://localhost:8080/nPress RETURN to stop...")n StdIn.readLine() // let it run until user presses returnn bindingFuturen .flatMap(_.unbind()) // trigger unbinding from the portn .onComplete(_ => system.terminate()) // and shutdown when donen }n}n
該例子在 localhost 上啟動 HTTP 伺服器,並響應 `/hello` 路徑的 GET 請求。
API 可能變化
下例使用了實驗性特性,且在以後的 Akka HTTP 中,其 API 極有可能變化。更多信息,請查看 Akka 文檔中的 Binary Compatibility Rules)。
import akka.http.scaladsl.model.{ ContentTypes, HttpEntity }nimport akka.http.scaladsl.server.HttpAppnimport akka.http.scaladsl.server.Routenn// Server definitionnobject WebServer extends HttpApp {n override def routes: Route =n path("hello") {n get {n complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))n }n }n}nn// Starting the servernWebServer.startServer("localhost", 8080)n
查看 HttpApp Bootstrap 獲取更多此類情動伺服器的方法。
下例展示了 Akka HTTP 的路由定義,可以稍微感受下使用路由 DSL 定義真實 API 的樣子:
import akka.actor.{ActorRef, ActorSystem}nimport akka.http.scaladsl.coding.Deflatenimport akka.http.scaladsl.marshalling.ToResponseMarshallernimport akka.http.scaladsl.model.StatusCodes.MovedPermanentlynimport akka.http.scaladsl.server.Directives._nimport akka.http.scaladsl.unmarshalling.FromRequestUnmarshallernimport akka.pattern.asknimport akka.stream.ActorMaterializernimport akka.util.Timeoutnn// types used by the API routesntype Money = Double // only for demo purposes, dont try this at home!ntype TransactionResult = Stringncase class User(name: String)ncase class Order(email: String, amount: Money)ncase class Update(order: Order)ncase class OrderItem(i: Int, os: Option[String], s: String)nn// marshalling would usually be derived automatically using librariesnimplicit val orderUM: FromRequestUnmarshaller[Order] = ???nimplicit val orderM: ToResponseMarshaller[Order] = ???nimplicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???nimplicit val timeout: Timeout = ??? // for actor asksnimplicit val ec: ExecutionContext = ???nimplicit val mat: ActorMaterializer = ???nimplicit val sys: ActorSystem = ???nn// backend entry pointsndef myAuthenticator: Authenticator[User] = ???ndef retrieveOrdersFromDB: Seq[Order] = ???ndef myDbActor: ActorRef = ???ndef processOrderRequest(id: Int, complete: Order => Unit): Unit = ???nnval route = {n path("orders") {n authenticateBasic(realm = "admin area", myAuthenticator) { user =>n get {n encodeResponseWith(Deflate) {n complete {n // marshal custom object with in-scope marshallern retrieveOrdersFromDBn }n }n } ~n post {n // decompress gzipped or deflated requests if requiredn decodeRequest {n // unmarshal with in-scope unmarshallern entity(as[Order]) { order =>n complete {n // ... write order to DBn "Order received"n }n }n }n }n }n } ~n // extract URI path element as Intn pathPrefix("order" / IntNumber) { orderId =>n pathEnd {n (put | parameter(method ! "put")) {n // form extraction from multipart or www-url-encoded formsn formFields((email, total.as[Money])).as(Order) { order =>n complete {n // complete with serialized Future resultn (myDbActor ? Update(order)).mapTo[TransactionResult]n }n }n } ~n get {n // debugging helpern logRequest("GET-ORDER") {n // use in-scope marshaller to create completer functionn completeWith(instanceOf[Order]) { completer =>n // customn processOrderRequest(orderId, completer)n }n }n }n } ~n path("items") {n get {n // parameters to case class extractionn parameters((size.as[Int], color ?, dangerous ? "no"))n .as(OrderItem) { orderItem =>n // ... route using case class instance created fromn // required and optional query parametersn complete("") // hiden }n }n }n } ~n pathPrefix("documentation") {n // optionally compresses the response with Gzip or Deflaten // if the client accepts compressed responsesn encodeResponse {n // serve up static content from a JAR resourcen getFromResourceDirectory("docs")n }n } ~n path("oldApi" / Remaining) { pathRest =>n redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)n }n}n
在初始化或運行 Akka HTTP 伺服器時,很多場景都可能發生錯誤。默認情況下,Akka 將列印這些錯誤,有時除簡單列印外,還需要其他措施,比如關閉 Actor 系統,或通知外部監控等。
第一種錯誤類型是,伺服器無法綁定到指定埠,例如當該埠已被佔用,或無該埠的許可權(比如只有 root 用戶可用)。該場景下 binding future 將立刻失敗,可以通過監聽該 `Future` 的完成事件來做出反應:
import akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.http.scaladsl.Http.ServerBindingnimport akka.http.scaladsl.server.Directives._nimport akka.stream.ActorMaterializernnimport scala.concurrent.Futurennobject WebServer {n def main(args: Array[String]) {n implicit val system = ActorSystem()n implicit val materializer = ActorMaterializer()n // needed for the future foreach in the endn implicit val executionContext = system.dispatchernn val handler = get {n complete("Hello world!")n }nn // lets say the OS wont allow us to bind to 80.n val (host, port) = ("localhost", 80)n val bindingFuture: Future[ServerBinding] =n Http().bindAndHandle(handler, host, port)nn bindingFuture.failed.foreach { ex =>n log.error(ex, "Failed to bind to {}:{}!", host, port)n }n }n}n
若要從更底層視角了解各種失敗場景,參考 底層 API 中如何處理伺服器失敗。
路由 DSL 中的錯誤與異常
路由 DSL 中發生的錯誤、異常由 `ExceptionHandler` 處理,更多細節參考 Exception Handling。ExceptionHandler 將異常轉化為 HttpResponse,其中包含錯誤碼和錯誤描述信息。
FileUploadDirectives ? Akka HTTP 介紹支持文件上傳的指令。
通過接受 Multipart.FormData 實體,可以實現瀏覽器表單文件上傳,因為消息體、消息體 payload 是 `Source`,並非立即可得,所以需要消費它們的流。
val uploadVideo =n path("video") {n entity(as[Multipart.FormData]) { formData =>nn // collect all parts of the multipart as it arrives into a mapn val allPartsF: Future[Map[String, Any]] = formData.parts.mapAsync[(String, Any)](1) {nn case b: BodyPart if b.name == "file" =>n // stream into a file as the chunks of it arrives and return a futuren // file to where it got storedn val file = File.createTempFile("upload", "tmp")n b.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ =>n (b.name -> file))nn case b: BodyPart =>n // collect form field valuesn b.toStrict(2.seconds).map(strict =>n (b.name -> strict.entity.data.utf8String))nn }.runFold(Map.empty[String, Any])((map, tuple) => map + tuple)nn val done = allPartsF.map { allParts =>n // You would have some better validation/unmarshalling heren db.create(Video(n file = allParts("file").asInstanceOf[File],n title = allParts("title").asInstanceOf[String],n author = allParts("author").asInstanceOf[String]))n }nn // when processing have finished create a response for the usern onSuccess(allPartsF) { allParts =>n complete {n "ok!"n }n }n }n }n
除將文件寫入臨時文件,還可以在文件到來時對其進行處理。下例接受 `.csv` 文件,將其解析成行,然後將其發送到 Actor 中進一步處理:
val splitLines = Framing.delimiter(ByteString("n"), 256)nnval csvUploads =n path("metadata" / LongNumber) { id =>n entity(as[Multipart.FormData]) { formData =>n val done: Future[Done] = formData.parts.mapAsync(1) {n case b: BodyPart if b.filename.exists(_.endsWith(".csv")) =>n b.entity.dataBytesn .via(splitLines)n .map(_.utf8String.split(",").toVector)n .runForeach(csv =>n metadataActor ! MetadataActor.Entry(id, csv))n case _ => Future.successful(Done)n }.runWith(Sink.ignore)nn // when processing have finished create a response for the usern onSuccess(done) { _ =>n complete {n "ok!"n }n }n }n }n
配置服務端 HTTPS
查閱 Server-Side HTTPS Support 獲取更多在服務端配置、使用 HTTPS 的細節。
備註:知乎的編輯器確實很難用,原文在 github 上,點擊 此處 查看原文,歡迎提 issue 和 PR。