標籤:

伺服器端 API - 底端

除了 HTTP 客戶端部分,Akka HTTP 還提供了在 Akka Stream 上建立的一套內嵌式,響應流模式,全非同步化的 HTTP 1.1 伺服器端實現。

該實現突出了以下幾個特性

  • HTTP 持久連接的全線支持

  • HTTP 管道化的全線支持(譯註:非主流技術,支持的瀏覽器不多)

  • 非同步 HTTP 數據流(包括例如通過一個好用的 API 處理「分塊」傳輸轉碼)的全線支持

  • 可選性的 SSL/TLS 加密

  • WebSocket 支持

Akka HTTP 伺服器端的組件主要分兩層:

  1. 基礎的低抽象度伺服器端實現在 akka-http-core

  2. 高抽象度的功能模塊在 akka-http

所謂低抽象度指主要集中在處理一個 HTTP/1.1 伺服器端的最本質的功能如:

  • 連接管理

  • HTTP 信息和頭域的解析和生成

  • (對於請求和連擊的)超時管理

  • (對於透明管道處理的支持)響應排序

所有對於一個典型 HTTP 伺服器而言非核心的功能(例如,請求派送,文件傳遞,信息壓縮,等等等等)則留給更高的抽象層去處理,而不會在 akka-http-core-level 伺服器層實現。這樣的設計風格可以使伺服器端的核心實現保持小和輕,易於理解和維護。

開發者可以按照自己的具體需要選用低層 API 或者高層的路由 DSL (這個選擇易於把某些相對複雜的邏輯簡化)

注意事項

建議先閱讀前一章關於流式處理對於正文數據的影響。該章淺述了全棧流式處理的概念的影響,對於習慣開發非流式處理伺服器的思維可能會帶來一些不適應。

Streams 和 HTTP

Akka HTTP 伺服器端是建立在 Akka Streams 的基礎上的,很多功能都會直接用到相關的代碼和 API。

在連接的層面上,Akka HTTP 基本上提供了和 Working with streaming IO 一樣的介面:埠的綁定可看作連接請求的流。應用程序在這個流上獲取連接,對於每個連接,提供一個 Flow[HttpRequest, HttpResponse, _] 用於把請求 「翻譯」 成響應。

除了認為伺服器上綁定的一個埠是一個 Source[IncomingConnection],以及每個連接是一個帶有 Sink[HttpResponse]Source[HttpRequest],這種流式的抽象概念也應用到每個具體的 HTTP 信息上:HTTP 請求和響應的正文內容也被抽象成一個 Source[ByteString]。參考前文 HTTP 模型

啟動與關閉

在最基礎的角度而言,一個 Akka HTTP 伺服器是由 akka.http.scaladsl.Http 擴展里的 bind 函數開始綁定的。

import akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.stream.ActorMaterializernimport akka.stream.scaladsl._nnimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()nimplicit val executionContext = system.dispatchernnval serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =n Http().bind(interface = "localhost", port = 8080)nval bindingFuture: Future[Http.ServerBinding] =n serverSource.to(Sink.foreach { connection => // foreach materializes the sourcen println("Accepted new connection from " + connection.remoteAddress)n // ... 具體處理的部分省略n }).run()n

Http().bind 函數這裡用到的參數確立了需要綁定的 interface 和 port ,並申明了可以處理進入系統的 HTTP 連接請求。除此之外,這個函數還可以加入...以及開發者所需要的其它相關伺服器端的設定。

bind 函數的返回值是一個 Source[Http.IncomingConnection],應用程序必須從中抽取內容以便接收連接請求。 source 在沒有實體化為整個處理管道的一部分前,埠的綁定並不會執行。假如埠綁定失敗(比如說,埠已經被佔用),整個已經實體化的流處理管道會馬上終止並拋出一個相關的異常。當連接請求的消費端取消對來源的訂閱時,埠的綁定則會被釋放。開發者也可以用 Http.ServerBinding 類型實例里的 unbind 函數釋放綁定。Http.ServerBinding 還提供了獲取對應埠本地地址的功能,可用於綁定到埠 0 的情況(從而讓系統挑選可用埠)

請求至響應的處理流程

當一個新的連接建立後,它會作為一個帶有地址和相關函數 Http.IncomingConnection 類型的實體,以便向一個 Flow[HttpRequest, HttpResponse, _] 中輸送由該連接上流入的 http 請求。

請求的處理是通過調用一個處理器上的 handleWithXXX 函數進行的。那可以是以下任意一個:

  • handleWith 用於 一個 Flow[HttpRequest, HttpResponse, _]

  • handleWithSyncHandler 用於 HttpRequest => HttpResponse 類型的函數。

  • handleWithAsyncHandler 用於 HttpRequest => Future[HttpResponse] 類型的函數。

以下是一個完整的例子:

import akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.http.scaladsl.model.HttpMethods._nimport akka.http.scaladsl.model._nimport akka.stream.ActorMaterializernimport akka.stream.scaladsl.Sinknnimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()nimplicit val executionContext = system.dispatchernnval serverSource = Http().bind(interface = "localhost", port = 8080)nnval requestHandler: HttpRequest => HttpResponse = {n case HttpRequest(GET, Uri.Path("/"), _, _, _) =>n HttpResponse(entity = HttpEntity(n ContentTypes.`text/html(UTF-8)`,n "<html><body>Hello world!</body></html>"))nn case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>n HttpResponse(entity = "PONG!")nn case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>n sys.error("BOOM!")nn case r: HttpRequest =>n r.discardEntityBytes() // 一定要抽空流入的 HTTP 正文數量流n HttpResponse(404, entity = "Unknown resource!")n}nnval bindingFuture: Future[Http.ServerBinding] =n serverSource.to(Sink.foreach { connection =>n println("Accepted new connection from " + connection.remoteAddress)nn connection handleWithSyncHandler requestHandlern // 這裡等價於n // connection handleWith { Flow[HttpRequest] map requestHandler }n }).run()n

在這個例子裡面,一個請求的轉換處理過程是:通過請求管道上的 handleWithSyncHandler 調用一個 HttpRequest => HttpResponse 類型的函數完成。在不同的應用場景里,我們可以預見到開發者可以通過不同的方式向 Akka Streams 提供請求處理器。

如果應用程序里提供了一個 提供了一個 Flow,那麼應用程序就有責任對應每一個請求生成一個響應,並保證每個響應返回的順序必須與其對應的請求進入的順序是完全一致的(應用場景為:當 HTTP 管道開啟,並需要處理髮生重疊的多個請求時)。

當採用 handleWithSyncHandler handleWithAsyncHandler,又或使用管道操作上的 map 或者 mapAsync,這個需求則會自動滿足了。

請求/響應中正文數據的流式處理

流化 HTTP 信息中的正文數據是由 HttpEntity 的子類型們提供支持的。應用程序必須在接收請求(以及生成響應)時能應付已經被流式化的正文數據。

如果開發者能依賴 Akka HTTP 自己帶有的編集器/反編集器,那麼一些客制化類型的與正文數據間的轉化應該是很便利的。

關閉連接

當處理數據用的 Flow 取消了對上游來源對訂閱,又或者連接的另一端關閉時,HTTP 連接就會被關閉。有一種比較更便利的手法是強制加入一個 Connection: close 頭域至 HttpResponse 中。這個響應信息就會成為連接上的最後的一個,伺服器端會在送出這個響應後主動地關閉連接。

除此之外,還可以通過取消請求正文(例如:把連接掛到 Sink.cancelled 上 )或者消費部分正文數據(例如:用 take 這個組合鍵)導致關閉連接。為了避免以上的行為,正文數據的取消應該被掛到 Sink.ignore 上進行強制消除。

伺服器端的 HTTPS 配置

對應的相關文檔請參考:(譯註:還沒有翻譯,欠坑一個)

獨立運作的 HTTP 層應用範例

因其響應流式的天性,Akka HTTP 完全可以抽離其附在的 TCP 介面。對於大部分的應用程序而言,這個「功能」並不那麼重要,但某些情況下只需 HTTP 層(甚至更高層級)處理網路以外的數據來源也是很有用的。例如測試,除錯,底層事件數據流(比如,網路包重放)等應用情景。

對於伺服器端的獨立 HTTP 層可以作為如下定義的一個 BidiFlow:

/**n * 可以在任意 TCP 層上的構建一個 HTTP 伺服器的 HTTP 層的一個獨立 BidiFlown *n * {{{n * +------+n * HttpResponse ~>| |~> SslTlsOutboundn * | bidi |n * HttpRequest <~| |<~ SslTlsInboundn * +------+n * }}}n */ntype ServerLayer = BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed]n

開發者可以通過調用 Http().serverLayer 函數的兩個變種的任意一個(配合各種參數), 建立一個 Http.ServerLayer 類型的實例

控制伺服器多工處理的平行度

請求的處理可以有兩種手段進行平行化多工處理,一種是連接平行化,另一種是通過 HTTP 管道化在一個連接上同時接收到多個響應而送出多個請求。兩種情況下都需要客戶端控制流動的請求數。為了防止被太多的請求壓垮,Akka HTTP 可以限制在平行處理時最多可以使用的請求數量。

akka.http.server.max-connections,可以限定同時開啟的連接數量,而且應用與所有的 Http.bindAndHandle* 方法。如果開發使用 Http.bind,進入到系統的連接則會以 Source[IncomingConnection, ...] 類型存在。通過 Akka Stream 的組合子,可以通過反壓機制控制(即:用 throttle mapAysnc)連接的流入。

HTTP 管道化通常是不鼓勵使用的(甚至大部分的瀏覽器都是關閉掉的),但是 Akka HTTP 仍然有提供全面的技術支持。相關的限制設置有兩層,首先,akka.http.server.pipeline-limit 用於配置防止過多(超過這個設置值的)請求數被送往處理流,處理流內部則可以另外加其它流量控制。如果開發者用 Http.bindAndHandleSyncHttp.bindAndHandleAsync 接入口,則可以通過參數設定平行度(預設值為1,即管道化不開通)來控制一個連接上可以同時接收的請求數量。如果開發者使用的是 Http.bindAndHandle 或者 Http.bind, 那麼用戶提供的處理流則完全可以通過反壓機制自主控制同時接收的請求。在這種情況下,開發者可以使用 Akka Stream 的 mapAysnc 組合介面以及一個平行度參數去控制同時處理的請求。

如何處理在底層API里出現的 HTTP 伺服器故障

在啟動或者運行一個 HTTP 伺服器的過程中,可能有著各種出現異常的情況。Akka 會預定把所有的異常記錄到日誌中,但有時候開發者可能想處理這些異常而不只是記錄它們。例如關閉 actor 系統,或者通知一些外部的監察工具。

當建立和具現化一個 HTTP 伺服器(同理,對單純的 TCP 伺服器也是)時,有很多地方都可以出現故障。整個 akka 運行棧上不同層都會著有不同的故障出現,從無法啟動伺服器,到無法反編集一個 HttpRequest。從最外層到最裡層的故障例子可以有:

  • 綁定到指定地址和埠時的故障

  • 接受一個新的 IncomingConnection 時的故障,例如操作系統沒有足夠的內存,或者可使用文件句柄不足。

  • 在處理一個連接上數據時發生的故障,例如進入系統的 HttpRequest 格式有誤。

這章就是介紹如何處理各種不同的故障情景,以及哪些情景下,故障可能會發生。

綁定故障

第一類故障是伺服器無法綁定到指定的埠。例如,當所需要的埠已經被其它程序佔用,又或者被特權化(意即:只能由 root 使用)。這種情況下 bindingFuture 裡面的非同步行為會馬上異常結束,開發者如果希望處理該異常可以通過監聽在 Future 的結束狀態。

mport akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.http.scaladsl.Http.ServerBindingnimport akka.stream.ActorMaterializernnimport scala.concurrent.Futurennimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()nimplicit val executionContext = system.dispatchernn// 假設操作系統不讓你使用埠 80 進行綁定.nval (host, port) = ("localhost", 80)nval serverSource = Http().bind(host, port)nnval bindingFuture: Future[ServerBinding] = serverSourcen .to(handleConnections) // Sink[Http.IncomingConnection, _]n .run()nnbindingFuture.onFailure {n case ex: Exception =>n log.error(ex, "Failed to bind to {}:{}!", host, port)n}n

當伺服器成功綁定到埠時, Source[IncomingConnection, _] 則開始運作並開始輸送接入的連接。這個 source 技術上可以也知會故障的發生,但是這種情況應該只會在很突發的情況下,(例如文件句柄不足,或者系統內存不夠等)才無法接收新的連接請求。Akka Streams 系統里處理危機是很直觀的,異常信號發出後,會從發生點一直流到流水線的最終點。

連接信號源故障

在以下的例子中我們增加了一個客制化的 GraphStage (參考 客制化流水線)作為流程故障的處理者。我們向一個 failureMonitor actor 發送異常的起因,然後讓這個 Actor 處理後續的事情:可能是重啟伺服器,也可能是關閉 ActorSystem,總而言之,不再是當前節點需要關心的事了。

import akka.actor.ActorSystemnimport akka.actor.ActorRefnimport akka.http.scaladsl.Httpnimport akka.stream.ActorMaterializernimport akka.stream.scaladsl.Flownnimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()nimplicit val executionContext = system.dispatchernnimport Http._nval (host, port) = ("localhost", 8080)nval serverSource = Http().bind(host, port)nnval failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props)nnval reactToTopLevelFailures = Flow[IncomingConnection]n .watchTermination()((_, termination) => termination.onFailure {n case cause => failureMonitor ! causen })nnserverSourcen .via(reactToTopLevelFailures)n .to(handleConnections) // Sink[Http.IncomingConnection, _]n .run()n

連接故障

第三種會出現的故障是當連接建立好之後,突然斷掉:例如用戶取消了底層的TCP連接。我們可以用上面同樣的方案處理這個故障,不同的是要處理練級的 Flow

import akka.actor.ActorSystemnimport akka.http.scaladsl.Httpnimport akka.http.scaladsl.model._nimport akka.stream.ActorMaterializernimport akka.stream.scaladsl.Flownnimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()nimplicit val executionContext = system.dispatchernnval (host, port) = ("localhost", 8080)nval serverSource = Http().bind(host, port)nnval reactToConnectionFailure = Flow[HttpRequest]n .recover[HttpRequest] {n case ex =>n // 假設這裡有異常處理n throw exn }nnval httpEcho = Flow[HttpRequest]n .via(reactToConnectionFailure)n .map { request =>n // 簡單地在流水線上返回 (!) "echo" 響應:n HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, request.entity.dataBytes))n }nnserverSourcen .runForeach { con =>n con.handleWith(httpEcho)n }n

以上列舉的都是特性或多或少都是跟基礎架構有關,不是綁定就是連接出問題。很多時候,開發者不需要發掘太深這些意外的成因,反正 akka 也只是把這裡錯誤寫到日誌里,對於這類問題也算是一種合理的默認處理手段。

如果要深入一點學習如何在路由層(開發者具體的業務相關代碼所在地)處理異常,可以參考異常處理,該章節的內容集中在解釋在路由里拋出的異常可以如何對應並轉換成攜帶錯誤碼以及易於理解的異常狀態描述的 HttpResponse

推薦閱讀:

WebSocket 淺析
SSL協議之數據加密過程詳解
談談 HTTPS

TAG:HTTP | Akka |