分散式Snapshot和Flink Checkpointing簡介

摘要: 最近在學習Flink的Fault Tolerance,了解到Flink在Chandy Lamport Algorithm的基礎上擴展實現了一套分散式Checkpointing機制,這個機制在論文"Lightweight Asynchronous Snapshots for Distributed Dataflows"中進行了詳盡的描述。

原文

阿里巴巴實時計算部-崑崙

最近在學習Flink的Fault Tolerance,了解到Flink在Chandy Lamport Algorithm的基礎上擴展實現了一套分散式Checkpointing機制,這個機制在論文"Lightweight Asynchronous Snapshots for Distributed Dataflows"中進行了詳盡的描述。懷著對Lamport大神的敬仰,我分別下載研讀了兩篇論文,在這裡把一些閱讀的收穫記錄下來,希望能對學習Flink/Blink的同學能有些幫助。

Chandy Lamport Algorithm

我們先來看看Chandy Lamport Algorithm,「Distributed Snapshots: Determining Global States of a Distributed System」,此文應該是分散式SnapShot的開山之作,發佈於1985年(很多同學還沒有出生-_-),按照Lamport自己的說法,這篇文章是這麼來的:

「The distributed snapshot algorithm described here came about when I visited Chandy, who was then at the University of Texas in Austin. He posed the problem to me over dinner, but we had both had too much wine to think about it right then. The next morning, in the shower, I came up with the solution. When I arrived at Chandys office, he was waiting for me with the same solution.」

所以說,大神的世界我們不懂,一言不合就寫一篇論文。我們言歸正傳,開始介紹論文中描述的演算法。

分散式系統模型和狀態定義

分散式系統模型

分散式系統是一個包含有限進程和有限消息通道的系統,這些進程和通道可以用一個有向圖描述,其中節點表示進程,邊表示通道。如下圖所示:p、q分別是進程,c, c則是消息通道。

另外為了問題描述的簡潔,對上述模型還做了假設:消息通道只包含有限的buffer、消息保序、通道可靠等

分散式系統狀態(State)

所謂的Distributed Snapshot,就是為了保存分散式系統的State,那麼首先我們需要定義清楚什麼是分散式系統的State。考慮到上述分散式模型的定義,分散式系統State同樣是由「進程狀態」和「通道狀態」組成的。

  1. Event:分散式系統中發生的一個事件,在類似於Flink這樣的分散式計算系統中從Source輸入的新消息相當於一個事件。在論文中作者給出了數學化的定義,具體參考論文。
  2. 進程狀態:包含一個初始狀態(initial state),和持續發生的若干Events。初始狀態可以理解為Flink中剛啟動的計算節點,計算節點每處理一條Event,就轉換到一個新的狀態。
  3. 通道狀態:我們用在通道上傳輸的消息(Event)來描述一個通道的狀態。

在某一個時刻的某分散式系統的所有進程和所有通道狀態的組合,就是這個分散式系統的全局狀態。基於上述的雙進程雙通道的最簡分散式系統,為了描述演算法,作者設計了一個「單令牌狀態」轉換系統,兩個進程通過接收和發出令牌,會在S0、S1兩個State之間轉換,整個分散式系統則會在如下所示的4種全局狀態(Global State)之間轉換。

Distributed Snapshots

有了系統狀態和模型的定義,終於可以開始介紹分散式快照的演算法了。對於一個分散式快照演算法,我們有如下的兩點要求:

  1. 正確性:能夠準確的記錄每一個進程、通道的狀態,同時通過這些局部狀態,能夠準確表達一個分散式系統的全局狀態。這裡碰到的挑戰是,每個進程、通道沒法同時記錄自身的狀態,因為我們沒有一個全局的時鐘來保持狀態記錄的同步。
  2. 並行性:快照操作與分散式系統計算同時運行,但不能影響所有系統的正常功能,對性能、正確性等無影響。

按照上一小節的描述,全局狀態是進程和通道狀態的組合,在論文中,作者證明了通道狀態可以通過記錄進程狀態來記錄和恢復,並提出了下述的分散式snapshot演算法:

對於進程p、q,p->q通過通道c連接,通過以下步驟記錄global state

進程p啟動這個演算法,記錄自身狀態,並發出Marker。隨著Marker不斷的沿著分散式系統的相連通道逐漸傳輸到所有的進程,所有的進程都會執行演算法以記錄自身狀態和入射通道的狀態,待到所有進程執行完該演算法,一個分散式Snapshot就完成了記錄。Marker相當於是一個信使,它隨著消息流流經所有的進程,通知每個進程記錄自身狀態。且Marker對整個分散式系統的計算過程沒有任何影響。只要保證Marker能在有限時間內通過通道傳輸到進程,每個進程能夠在有限時間內完成自身狀態的記錄,這個演算法就能在有限的時間內執行完成。

以上就是這個演算法的最主要內容,演算法本身不是很複雜,但是Chandy和Lamport兩位大神在論文中展現的對問題分析和思考的過程真的很值得玩味,定義問題->定義分散式模型->推導演算法->分析特例->證明演算法的完備性,層層推進,環環相扣,缺一不可,演算法的數學之美展露無遺!

關於Chandy-Lamport Algorithm的主要介紹就到這裡,論文中還有一些關於某些特殊情況的證明,大家有興趣可以參考論文。

Flink Checkpointing的實現原理

Flink 分散式Checkpointing是通過Asynchronous Barrier Snapshots的演算法實現的,該演算法借鑒了Chandy-Lamport演算法的主要思想,同時做了一些改進,這些改進在論文"Lightweight Asynchronous Snapshots for Distributed Dataflows"中進行了詳盡的描述,結合這篇論文,我們來看看具體的實現。

Flink流式計算模型

Flink流式計算模型中包含Source Operator、Transformation Operators、Sink Operator等三種不同類型的節點如下圖所示,分別負責數據的輸入、處理、和輸出,對應計算拓撲的起點、中間節點和終點。計算模型的介紹不是我們的重點,細節請參考官方文檔-Concepts

Asynchronous Barrier Snapshots

這個演算法基本上是Chandy-Lamport演算法的變體,只在執行上有一些差別。論文中分別針對有向無環和有向有環的兩種計算拓撲圖,提出了兩種不同的演算法,其中後者是在前者的基礎上進行了修改,在實際的使用中,絕大部分的系統都是有向無環圖,因此我們只會針對前者進行介紹。

在ABS演算法中用Barrier代替了C-L演算法中的Marker,針對DAG的ABS演算法執行流程如下所示:

  1. Barrier周期性的被注入到所有的Source中,Source節點看到Barrier後,會立即記錄自己的狀態,然後將Barrier發送到Transformation Operator。
  2. 當Transformation Operator從某個input channel收到Barrier後,它會立刻Block住這條通道,直到所有的input channel都收到Barrier,此時該Operator就會記錄自身狀態,並向自己的所有output channel廣播Barrier。
  3. Sink接受Barrier的操作流程與Transformation Oper一樣。當所有的Barrier都到達Sink之後,並且所有的Sink也完成了Checkpoint,這一輪Snapshot就完成了。

下面這個圖展示了一個ABS演算法的執行過程:

下面是針對DAG拓撲圖的演算法偽代碼:

在這個演算法中Block Input實際上是有負面效果的,一旦某個input channel發生延遲,Barrier遲遲未到,這會導致Transformation Operator上的其它通道全部堵塞,系統吞吐大幅下降。但是這麼做的一個最大的好處就是能夠實現Exactly Once。我們來看看Flink文檔中的描述:

When the alignment is skipped, an operator keeps processing all inputs, even after some checkpoint barriers for checkpoint n arrived. That way, the operator also processes elements that belong to checkpoint n+1 before the state snapshot for checkpoint n was taken. On a restore, these records will occur as duplicates, because they are both included in the state snapshot of checkpoint n, and will be replayed as part of the data after checkpoint n.

不過Flink還是提供了選項,可以關閉Exactly once並僅保留at least once,以提供最大限度的吞吐能力。

本文僅從原理角度介紹了分散式Snapshot的基本原理以及Flink中的實現,從這篇文章出發,我們還需要閱讀相關的源代碼以及在實際的開發中去學習和理解。另外本文是基於我自己的理解寫就,難免有疏漏和錯誤,如果大家發現問題,可以留言或者直接聯繫我,我們一起討論。

更多技術乾貨敬請關注云棲社區知乎機構號:阿里云云棲社區 - 知乎


推薦閱讀:

快速打造分散式深度學習訓練平台
Elasticell和Jepsen測試

TAG:演算法 | 分散式系統 | input |