實時流處理系統反壓機制(BackPressure)綜述

實時流處理系統反壓機制(BackPressure)綜述

來自專欄實時流處理

反壓機制(BackPressure)被廣泛應用到實時流處理系統中,流處理系統需要能優雅地處理反壓(backpressure)問題。反壓通常產生於這樣的場景:短時負載高峰導致系統接收數據的速率遠高於它處理數據的速率。許多日常問題都會導致反壓,例如,垃圾回收停頓可能會導致流入的數據快速堆積,或者遇到大促或秒殺活動導致流量陡增。反壓如果不能得到正確的處理,可能會導致資源耗盡甚至系統崩潰。反壓機制就是指系統能夠自己檢測到被阻塞的Operator,然後系統自適應地降低源頭或者上游的發送速率。目前主流的流處理系統 Apache Storm、JStorm、Spark Streaming、S4、Apache Flink、Twitter Heron都採用反壓機制解決這個問題,不過他們的實現各自不同。

不同的組件可以不同的速度執行(並且每個組件中的處理速度隨時間改變)。 例如,考慮一個工作流程,或由於數據傾斜或任務調度而導致數據被處理十分緩慢。 在這種情況下,如果上游階段不減速,將導致緩衝區建立長隊列,或導致系統丟棄元組。 如果元組在中途丟棄,那麼效率可能會有損失,因為已經為這些元組產生的計算被浪費了。並且在一些流處理系統中比如Strom,會將這些丟失的元組重新發送,這樣會導致數據的一致性問題,並且還會導致某些Operator狀態疊加。進而整個程序輸出結果不準確。第二由於系統接收數據的速率是隨著時間改變的,短時負載高峰導致系統接收數據的速率遠高於它處理數據的速率的情況,也會導致Tuple在中途丟失。所以實時流處理系統必須能夠解決發送速率遠大於系統能處理速率這個問題,大多數實時流處理系統採用反壓(BackPressure)機制解決這個問題。下面我們就來介紹一下不同的實時流處理系統採用的反壓機制:

1.Strom 反壓機制

1.1 Storm 1.0 以前的反壓機制

對於開啟了acker機制的storm程序,可以通過設置conf.setMaxSpoutPending參數來實現反壓效果,如果下游組件(bolt)處理速度跟不上導致spout發送的tuple沒有及時確認的數超過了參數設定的值,spout會停止發送數據,這種方式的缺點是很難調優conf.setMaxSpoutPending參數的設置以達到最好的反壓效果,設小了會導致吞吐上不去,設大了會導致worker OOM;有震蕩,數據流會處於一個顛簸狀態,效果不如逐級反壓;另外對於關閉acker機制的程序無效;

1.2 Storm Automatic Backpressure

新的storm自動反壓機制(Automatic Back Pressure)通過監控bolt中的接收隊列的情況,當超過高水位值時專門的線程會將反壓信息寫到 Zookeeper ,Zookeeper上的watch會通知該拓撲的所有Worker都進入反壓狀態,最後Spout降低tuple發送的速度。

每個Executor都有一個接受隊列和發送隊列用來接收Tuple和發送Spout或者Bolt生成的Tuple元組。每個Worker進程都有一個單的的接收線程監聽接收埠。它從每個網路上進來的消息發送到Executor的接收隊列中。Executor接收隊列存放Worker或者Worker內部其他Executor發過來的消息。Executor工作線程從接收隊列中拿出數據,然後調用execute方法,發送Tuple到Executor的發送隊列。Executor的發送線程從發送隊列中獲取消息,按照消息目的地址選擇發送到Worker的傳輸隊列中或者其他Executor的接收隊列中。最後Worker的發送線程從傳輸隊列中讀取消息,然後將Tuple元組發送到網路中。

1. 當Worker進程中的Executor線程發現自己的接收隊列滿了時,也就是接收隊列達到high watermark的閾值後,因此它會發送通知消息到背壓線程。

2. 背壓線程將當前worker進程的信息註冊到Zookeeper的Znode節點中。具體路徑就是 /Backpressure/topo1/wk1下

3. Zookeepre的Znode Watcher監視/Backpreesure/topo1下的節點目錄變化情況,如果發現目錄增加了znode節點說明或者其他變化。這就說明該Topo1需要反壓控制,然後它會通知Topo1所有的Worker進入反壓狀態。

4.最終Spout降低tuple發送的速度。

2. JStorm 反壓機制

Jstorm做了兩級的反壓,第一級和Jstorm類似,通過執行隊列來監測,但是不會通過ZK來協調,而是通過Topology Master來協調。在隊列中會標記high water mark和low water mark,當執行隊列超過high water mark時,就認為bolt來不及處理,則向TM發一條控制消息,上游開始減慢發送速率,直到下游低於low water mark時解除反壓。

此外,在Netty層也做了一級反壓,由於每個Worker Task都有自己的發送和接收的緩衝區,可以對緩衝區設定限額、控制大小,如果spout數據量特別大,緩衝區填滿會導致下游bolt的接收緩衝區填滿,造成了反壓。

限流機制:jstorm的限流機制, 當下游bolt發生阻塞時, 並且阻塞task的比例超過某個比例時(現在默認設置為0.1),觸發反壓

限流方式:計算阻塞Task的地方執行線程執行時間,Spout每發送一個tuple等待相應時間,然後講這個時間發送給Spout, 於是, spout每發送一個tuple,就會等待這個執行時間。

Task阻塞判斷方式:在jstorm 連續4次採樣周期中採樣,隊列情況,當隊列超過80%(可以設置)時,即可認為該task處在阻塞狀態。

3. SparkStreaming 反壓機制

3.1 為什麼引入反壓機制Backpressure

默認情況下,Spark Streaming通過Receiver以生產者生產數據的速率接收數據,計算過程中會出現batch processing time > batch interval的情況,其中batch processing time 為實際計算一個批次花費時間, batch interval為Streaming應用設置的批處理間隔。這意味著Spark Streaming的數據接收速率高於Spark從隊列中移除數據的速率,也就是數據處理能力低,在設置間隔內不能完全處理當前接收速率接收的數據。如果這種情況持續過長的時間,會造成數據在內存中堆積,導致Receiver所在Executor內存溢出等問題(如果設置StorageLevel包含disk, 則內存存放不下的數據會溢寫至disk, 加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數據接收速率,可以通過設置靜態配製參數「spark.streaming.receiver.maxRate」的值來實現,此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。比如:producer數據生產高於maxRate,當前集群處理能力也高於maxRate,這就會造成資源利用率下降等問題。為了更好的協調數據接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機制(back-pressure),通過動態控制數據接收速率來適配集群數據處理能力。

3.2 反壓機制Backpressure

Spark Streaming Backpressure: 根據JobScheduler反饋作業的執行信息來動態調整Receiver數據接收率。通過屬性「spark.streaming.backpressure.enabled」來控制是否啟用backpressure機制,默認值false,即不啟用。

SparkStreaming 架構圖如下所示:

SparkStreaming 反壓過程執行如下圖所示:

在原架構的基礎上加上一個新的組件RateController,這個組件負責監聽「OnBatchCompleted」事件,然後從中抽取processingDelay 及schedulingDelay信息. Estimator依據這些信息估算出最大處理速度(rate),最後由基於Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉發給BlockGenerator(繼承自RateLimiter).

4. Heron 反壓機制

當下游處理速度跟不上上游發送速度時,一旦StreamManager 發現一個或多個Heron Instance 速度變慢,立刻對本地spout進行降級,降低本地Spout發送速度, 停止從這些spout讀取數據。並且受影響的StreamManager 會發送一個特殊的start backpressure message 給其他的StreamManager ,要求他們對spout進行本地降級。 當其他StreamManager 接收到這個特殊消息時,他們通過不讀取當地Spout中的Tuple來進行降級。一旦出問題的Heron Instance 恢復速度後,本地的SM 會發送stop backpressure message 解除降級。

很多Socket Channel與應用程序級別的Buffer相關聯,該緩衝區由high watermark 和low watermark組成。 當緩衝區大小達到high watermark時觸發反壓,並保持有效,直到緩衝區大小低於low watermark。 此設計的基本原理是防止拓撲在進入和退出背壓緩解模式之間快速振蕩。

5. Flink 反壓機制

Flink 沒有使用任何複雜的機制來解決反壓問題,因為根本不需要那樣的方案!它利用自身作為純數據流引擎的優勢來優雅地響應反壓問題。下面我們會深入分析 Flink 是如何在 Task 之間傳輸數據的,以及數據流如何實現自然降速的。

Flink 在運行時主要由operatorsstreams兩大組件構成。每個 operator 會消費中間態的流,並在流上進行轉換,然後生成新的流。對於 Flink 的網路機制一種形象的類比是,Flink 使用了高效有界的分散式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。還記得經典的線程間通信案例:生產者消費者模型嗎?使用 BlockingQueue 的話,一個較慢的接受者會降低發送者的發送速率,因為一旦隊列滿了(有界隊列)發送者會被阻塞。Flink 解決反壓的方案就是這種感覺。

在 Flink 中,這些分散式阻塞隊列就是這些邏輯流,而隊列容量是通過緩衝池來(LocalBufferPool)實現的。每個被生產和被消費的流都會被分配一個緩衝池。緩衝池管理著一組緩衝(Buffer),緩衝在被消費後可以被回收循環利用。這很好理解:你從池子中拿走一個緩衝,填上數據,在數據消費完之後,又把緩衝還給池子,之後你可以再次使用它。

5.1 Flink 網路傳輸中的內存管理

如下圖所示展示了 Flink 在網路傳輸場景下的內存管理。網路上傳輸的數據會寫到 Task 的 InputGate(IG) 中,經過 Task 的處理後,再由 Task 寫到 ResultPartition(RS) 中。每個 Task 都包括了輸入和輸入,輸入和輸出的數據存在 Buffer 中(都是位元組數據)。Buffer 是 MemorySegment 的包裝類。

  1. TaskManager(TM)在啟動時,會先初始化NetworkEnvironment對象,TM 中所有與網路相關的東西都由該類來管理(如 Netty 連接),其中就包括NetworkBufferPool。根據配置,Flink 會在 NetworkBufferPool 中生成一定數量(默認2048個)的內存塊 MemorySegment(關於 Flink 的內存管理,後續文章會詳細談到),內存塊的總數量就代表了網路傳輸中所有可用的內存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的,每個 TM 只會實例化一個。
  2. Task 線程啟動時,會向 NetworkEnvironment 註冊,NetworkEnvironment 會為 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創建一個 LocalBufferPool(緩衝池)並設置可申請的 MemorySegment(內存塊)數量。IG 對應的緩衝池初始的內存塊數量與 IG 中 InputChannel 數量一致,RP 對應的緩衝池初始的內存塊數量與 RP 中的 ResultSubpartition 數量一致。不過,每當創建或銷毀緩衝池時,NetworkBufferPool 會計算剩餘空閑的內存塊數量,並平均分配給已創建的緩衝池。注意,這個過程只是指定了緩衝池所能使用的內存塊數量,並沒有真正分配內存塊,只有當需要時才分配。為什麼要動態地為緩衝池擴容呢?因為內存越多,意味著系統可以更輕鬆地應對瞬時壓力(如GC),不會頻繁地進入反壓狀態,所以我們要利用起那部分閑置的內存塊。
  3. 在 Task 線程執行過程中,當 Netty 接收端收到數據時,為了將 Netty 中的數據拷貝到 Task 中,InputChannel(實際是 RemoteInputChannel)會向其對應的緩衝池申請內存塊(上圖中的①)。如果緩衝池中也沒有可用的內存塊且已申請的數量還沒到池子上限,則會向 NetworkBufferPool 申請內存塊(上圖中的②)並交給 InputChannel 填上數據(上圖中的③和④)。如果緩衝池已申請的數量達到上限了呢?或者 NetworkBufferPool 也沒有可用內存塊了呢?這時候,Task 的 Netty Channel 會暫停讀取,上游的發送端會立即響應停止發送,拓撲會進入反壓狀態。當 Task 線程寫數據到 ResultPartition 時,也會向緩衝池請求內存塊,如果沒有可用內存塊時,會阻塞在請求內存塊的地方,達到暫停寫入的目的。
  4. 當一個內存塊被消費完成之後(在輸入端是指內存塊中的位元組被反序列化成對象了,在輸出端是指內存塊中的位元組寫入到 Netty Channel 了),會調用 Buffer.recycle() 方法,會將內存塊還給 LocalBufferPool (上圖中的⑤)。如果LocalBufferPool中當前申請的數量超過了池子容量(由於上文提到的動態容量,由於新註冊的 Task 導致該池子容量變小),則LocalBufferPool會將該內存塊回收給 NetworkBufferPool(上圖中的⑥)。如果沒超過池子容量,則會繼續留在池子中,減少反覆申請的開銷。

5.2 Flink 反壓機制

下面這張圖簡單展示了兩個 Task 之間的數據傳輸以及 Flink 如何感知到反壓的:

  1. 記錄「A」進入了 Flink 並且被 Task 1 處理。(這裡省略了 Netty 接收、反序列化等過程)
  2. 記錄被序列化到 buffer 中。
  3. 該 buffer 被發送到 Task 2,然後 Task 2 從這個 buffer 中讀出記錄。

不要忘了:記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer。

結合上面兩張圖看:Task 1 在輸出端有一個相關聯的 LocalBufferPool(稱緩衝池1),Task 2 在輸入端也有一個相關聯的 LocalBufferPool(稱緩衝池2)。如果緩衝池1中有空閑可用的 buffer 來序列化記錄 「A」,我們就序列化並發送該 buffer。

這裡我們需要注意兩個場景:

  • 本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節點(TaskManager),該 buffer 可以直接交給下一個 Task。一旦 Task 2 消費了該 buffer,則該 buffer 會被緩衝池1回收。如果 Task 2 的速度比 1 慢,那麼 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度,導致緩衝池1無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。
  • 遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節點上,那麼 buffer 會在發送到網路(TCP Channel)後被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然後拷貝網路中的數據到 buffer 中。如果沒有可用的 buffer,會停止從 TCP 連接中讀取數據。在輸出端,通過 Netty 的水位值機制來保證不往網路中寫入太多數據(後面會說)。如果網路中的數據(Netty輸出緩衝中的位元組數)超過了高水位值,我們會等到其降到低水位值以下才繼續寫入數據。這保證了網路中不會有太多的數據。如果接收端停止消費網路中的數據(由於接收端緩衝池沒有可用 buffer),網路中的緩衝數據就會堆積,那麼發送端也會暫停發送。另外,這會使得發送端的緩衝池得不到回收,writer 阻塞在向 LocalBufferPool 請求 buffer,阻塞了 writer 往 ResultSubPartition 寫數據。

這種固定大小緩衝池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生產數據的速度不會快於消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數據傳輸自然地擴展到更複雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。

5.3 反壓實驗

另外,官方博客中為了展示反壓的效果,給出了一個簡單的實驗。下面這張圖顯示了:隨著時間的改變,生產者(黃色線)和消費者(綠色線)每5秒的平均吞吐與最大吞吐(在單一JVM中每秒達到8百萬條記錄)的百分比。我們通過衡量task每5秒鐘處理的記錄數來衡量平均吞吐。該實驗運行在單 JVM 中,不過使用了完整的 Flink 功能棧。

首先,我們運行生產task到它最大生產速度的60%(我們通過Thread.sleep()來模擬降速)。消費者以同樣的速度處理數據。然後,我們將消費task的速度降至其最高速度的30%。你就會看到背壓問題產生了,正如我們所見,生產者的速度也自然降至其最高速度的30%。接著,停止消費task的人為降速,之後生產者和消費者task都達到了其最大的吞吐。接下來,我們再次將消費者的速度降至30%,pipeline給出了立即響應:生產者的速度也被自動降至30%。最後,我們再次停止限速,兩個task也再次恢復100%的速度。總而言之,我們可以看到:生產者和消費者在 pipeline 中的處理都在跟隨彼此的吞吐而進行適當的調整,這就是我們希望看到的反壓的效果。

5.4 Flink 反壓監控

在 Storm/JStorm 中,只要監控到隊列滿了,就可以記錄下拓撲進入反壓了。但是 Flink 的反壓太過於天然了,導致我們無法簡單地通過監控隊列來監控反壓狀態。Flink 在這裡使用了一個 trick 來實現對反壓的監控。如果一個 Task 因為反壓而降速了,那麼它會卡在向 LocalBufferPool 申請內存塊上。那麼這時候,該 Task 的 stack trace 就會長下面這樣:

java.lang.Object.wait(Native Method)o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request[...]

那麼事情就簡單了。通過不斷地採樣每個 task 的 stack trace 就可以實現反壓監控。

Flink 的實現中,只有當 Web 頁面切換到某個 Job 的 Backpressure 頁面,才會對這個 Job 觸發反壓檢測,因為反壓檢測還是挺昂貴的。JobManager 會通過 Akka 給每個 TaskManager 發送TriggerStackTraceSample消息。默認情況下,TaskManager 會觸發100次 stack trace 採樣,每次間隔 50ms(也就是說一次反壓檢測至少要等待5秒鐘)。並將這 100 次採樣的結果返回給 JobManager,由 JobManager 來計算反壓比率(反壓出現的次數/採樣的次數),最終展現在 UI 上。UI 刷新的默認周期是一分鐘,目的是不對 TaskManager 造成太大的負擔。

總結

Flink 不需要一種特殊的機制來處理反壓,因為 Flink 中的數據傳輸相當於已經提供了應對反壓的機制。因此,Flink 所能獲得的最大吞吐量由其 pipeline 中最慢的組件決定。相對於 Storm/JStorm 的實現,Flink 的實現更為簡潔優雅,源碼中也看不見與反壓相關的代碼,無需 Zookeeper/TopologyMaster 的參與也降低了系統的負載,也利於對反壓更迅速的響應。


推薦閱讀:

怎麼看待樂視這次的eui升級?
其實,鎚子是羅永浩為自己量身定製的手機。
黑科技隨機報:怕不是在暗示太空開發要來了
藍色星球的和解:阿波羅-聯盟測試計劃
海爾電視為什麼運行內存那麼小?

TAG:科技 | 計算機 | 大數據 |