標籤:

流式處理對 請求/響應 正文數據體的影響

(譯註:如果在概念上不是太清晰,建議先去閱讀 Akka Streams 的相關介紹。簡單而言,akka streams 把每台電腦變成一條條交錯的流水線,每個數據就好像傳送帶上的物件,輸送到不同的機件進行處理。)

Akka HTTP 的整個數據處理過程在每個抽象層都是流式化,那麼意味著 Akka Streams 帶來的反壓功能會作用在每一個層級(無論是最底層的 TCP 開始,到 HTTP 伺服器層,甚至到 HttpRequestHttpResponse 以及其 HttpEntity 等 相關 API )。

如果開發者習慣處理的客戶端是非流式/非響應式的,那麼 Akka HTTP 這種機制會對應用程序帶來一些意外的影響。這裡特別要說明的是,這意味著「消費 HTTP 正文的無力,對連接的另一端而言就是反壓的信號」。這是 akka http 的特性,因為這樣使得連接的其中某端可以一邊處理正文內容,一邊向另一端反推進入的壓力,從而防止在內存中無必要地過多緩存正文數據。

警告

在程序中消耗(或者拋棄掉)請求中的正文數據是必須的!如果意外地沒有進行處理,Akka HTTP 會認為在管道進來上的數據必須保持反壓,並使TCP 層上流通的數據停滯。特別是客戶端的開發無論 HttpResopnse 處於何種狀態都必須把正文數據完全消化掉。

客戶端的對 HTTP 正文數據的流式處理

消費 HTTP 響應中的正文數據 (客戶端)

客戶端最常碰到的應用場景自然是消費響應信息里的正文數據,使用方式可以通過消耗 dataBytes 數據源達成。(對伺服器端而言,則是用 BasicDirectives.extractDataBytes 之類的指令塊)

我們鼓勵開發者嘗試用不同的流化技巧去發掘 akka http 提供的底層架構的潛力。例如:把流入的分塊數據放到一個一個幀內,然後一行一行地轉換類型,最後把管道接到出口(如 File 或其它 Akka Streams 介面的 )Sink。

import java.io.Filennimport akka.actor.ActorSystemnimport akka.http.scaladsl.model._nimport akka.stream.ActorMaterializernimport akka.stream.scaladsl.{ FileIO, Framing }nimport akka.util.ByteStringnnimplicit val system = ActorSystem()nimplicit val dispatcher = system.dispatchernimplicit val materializer = ActorMaterializer()nnval response: HttpResponse = ???nnresponse.entity.dataBytesn .via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256))n .map(transformEachLine)n .runWith(FileIO.toPath(new File("/tmp/example.out").toPath))nndef transformEachLine(line: ByteString): ByteString = ???n

然而有時候,整個正文數據流可能需要被轉化成一個 String 實例(就是說把所有的內容都讀入到內存中)。Akka HTTP 提供了一個特殊的 toStrict(timeout) 函數用於主動地(對應與響應性的處理)消費正文數據並全部載入到內存中。

import scala.concurrent.Futurenimport scala.concurrent.duration._nnimport akka.actor.ActorSystemnimport akka.http.scaladsl.model._nimport akka.stream.ActorMaterializernimport akka.util.ByteStringnnimplicit val system = ActorSystem()nimplicit val dispatcher = system.dispatchernimplicit val materializer = ActorMaterializer()nncase class ExamplePerson(name: String)ndef parse(line: ByteString): ExamplePerson = ???nnval response: HttpResponse = ???nn// toStrict 強制所以正文數據都讀入到內存中。nval strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3.seconds)nn// 雖然我們使用同樣 API 去消費 dataBytes, 但現在他們都已經全部吸收到內存里了:nval transformedData: Future[ExamplePerson] =n strictEntity flatMap { e =>n e.dataBytesn .runFold(ByteString.empty) { case (acc, b) => acc ++ b }n .map(parse)n }n

拋棄 HTTP 響應中的正文數據 (客戶端)

有時候訪問 HTTP 服務時,我們並不太在意返回的數據內容(例如,只關心響應狀態碼),但正文作為返回信息的一部分,我們還是需要把它處理掉,不然就會導致底層 TCP 連接產生反壓。

discardEntityBytes 函數是一個方便的小工具,如果不需要正文數據,開發者可以調用這個函數直接忽略掉。它的實現很簡單,就是把進來的位元組碼直接連到 Sink.ignore 上。

下面的兩段代碼是等價的,並且對於流入伺服器端的 HTTP 請求,其工作原理也是一樣的。

import akka.actor.ActorSystemnimport akka.http.scaladsl.model.HttpMessage.DiscardedEntitynimport akka.http.scaladsl.model._nimport akka.stream.ActorMaterializernnimplicit val system = ActorSystem()nimplicit val dispatcher = system.dispatchernimplicit val materializer = ActorMaterializer()nnval response1: HttpResponse = ??? // 接收一個 HTTP 調用的返回nnval discarded: DiscardedEntity = response1.discardEntityBytes()nndiscarded.future.onComplete { done => println("Entity discarded completely!") }n

val response1: HttpResponse = ??? // 接收一個 HTTP 調用的返回 nnval discardingComplete: Future[Done] = response1.entity.dataBytes.runWith(Sink.ignore)nndiscardingComplete.onComplete(done => println("Entity discarded completely!"))n

伺服器端的對 HTTP 正文數據的流式處理

和客戶端一樣,HTTP 正文數據是直接連到數據 Streams (Streams 的來源則是從底層的 TCP 連接接收到)。所以,如果請求里的正文數據如果沒有被消耗的話,伺服器就會向連接反壓,期待用戶意識到需要對流入的數據進行處理。

請注意有些指令塊會強制使用一個隱式的 toStrict 操作,例如 entity(as[String]) 和類似的函數。

消費 HTTP 響應中的正文數據 (伺服器端)

消費流入正文數據的最簡單處理是把其轉化為一個實質的領域模型,如以下例子中的 entity 指令塊

import akka.actor.ActorSystemnimport akka.http.scaladsl.server.Directives._nimport akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._nimport akka.stream.ActorMaterializernimport spray.json.DefaultJsonProtocol._nnimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()n// 結尾部分 flatMap/onComplete 所需nimplicit val executionContext = system.dispatchernnfinal case class Bid(userId: String, bid: Int)nn// 由 spray-json 提供nimplicit val bidFormat = jsonFormat2(Bid)nnval route =n path("bid") {n put {n entity(as[Bid]) { bid =>n // 所有流入的正文數據接收完成並轉化為一個 Bidn complete("The bid was: " + bid)n }n }n }n

當然開發者也可以直接訪問原生的 dataBytes,和其相關的數據流,例如把數據管道連到 FileIO Sink,並用一個 Future[IoResult] 的結束來標示數據已經全部寫入到文件中。

import akka.actor.ActorSystemnimport akka.stream.scaladsl.FileIOnimport akka.http.scaladsl.server.Directives._nimport akka.stream.ActorMaterializernimport java.io.Filennimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()n// 結尾部分 flatMap/onComplete 所需nimplicit val executionContext = system.dispatchernnval route =n (put & path("lines")) {n withoutSizeLimit {n extractDataBytes { bytes =>n val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))nn // 我們只想當數據全部處理完時才回應n onComplete(finishedWriting) { ioResult =>n complete("Finished writing data: " + ioResult)n }n }n }n }n

拋棄 HTTP 請求中的正文數據 (伺服器端)

有時候,某些校驗(例如,檢查當前用戶是否有上載的許可權)會使開發者需要決定有時需要拋棄傳入的正文數據。

必須留意的是拋棄數據意味者數據的上傳還是會發生的,只是上傳到伺服器的數據應用程序不想要而已。這種處理方式對於某些場景如:應用對正文數據本身不感冒,但是又不想把連接關閉(下面會有演示),因為在連接上還有其他的請求正在等待處理。

開發者可以調用 discardEntityBytes 來強制拋棄流入的 HTTPRequest 中帶有的正文數據位元組流

import akka.actor.ActorSystemnimport akka.http.scaladsl.server.Directives._nimport akka.stream.ActorMaterializernimport akka.http.scaladsl.model.HttpRequestnnimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()n// 結尾部分 flatMap/onComplete 所需nimplicit val executionContext = system.dispatchernnval route =n (put & path("lines")) {n withoutSizeLimit {n extractRequest { r: HttpRequest =>n val finishedWriting = r.discardEntityBytes().futurenn // 我們只想當數據全部處理完時才回應n onComplete(finishedWriting) { done =>n complete("Drained all data from connection... (" + done + ")")n }n }n }n }n

和拋棄相關的做法是取消entity.dataBytes 數據流本身,這樣做的話 akka http 會強行關閉和客戶端建立的相關連接。這種處理的有效相關情景是:當檢測到相關用戶無權進行任何的數據上載行為,並可以關閉連接(而不是接收並忽略數據本身)。該行為可以通過把entity.dataBytes 數據流接到 Sink.cancelled 實現。Sink.cancelled 會取消掉整個數據流,並通知相關的連接進行關閉,從而強硬地卡斷了後續的請求。

import akka.actor.ActorSystemnimport akka.stream.scaladsl.Sinknimport akka.http.scaladsl.server.Directives._nimport akka.http.scaladsl.model.headers.Connectionnimport akka.stream.ActorMaterializernnimplicit val system = ActorSystem()nimplicit val materializer = ActorMaterializer()n// 結尾部分 flatMap/onComplete 所需nimplicit val executionContext = system.dispatchernnval route =n (put & path("lines")) {n withoutSizeLimit {n extractDataBytes { data =>n // 關閉連接, 方法一 (強制):n // 認為請求非法直接關掉連接:n data.runWith(Sink.cancelled) // "brutally" closes the connectionnn // 關閉連接, 方法二 (按部就班):n // 如果希望由客戶端接收到響應後斷開,n // 那麼抽空連接上的數據並返回一個 `Connection: Close` 頭域做響應:n respondWithHeader(Connection("close"))n complete(StatusCodes.Forbidden -> "Not allowed!")n }n }n }n

關閉連接的相關文檔:連接

未實現的部分:自動拋棄未使用的正文數據

在某些情況下,系統應該可以檢測到某個用戶的某個請求裡面的正文數據是不需要被處理的,系統應該可以直接拋出異常或者自動拋棄相關的數據流。這種高端功能還沒被開發組實現。

提示:相關功能的討論可以在 issue #183 以及 issue #117 看到,歡迎大家參與!

推薦閱讀:

Akka HTTP 文檔 (非官方漢化)- 導讀
雲計算的1024種玩法之零基礎入門
KaliRouter安裝與使用全指南
最經典的前端面試題之一,你能答出什麼幺蛾子?

TAG:Akka | HTTP |