雜談:我為什麼推薦 Apache Flink
在過去的一年多時間裡,我一直在隊伍里推廣 Apache Flink。很多朋友大概都知道,一年多以前,我負責的項目還在推 Spark,但如今我卻極力推薦大家嘗試使用 Flink。公司內外許多朋友開始問我:為什麼一年裡我的態度轉變如此之大,特別是當我司加大對 Spark 的支持力度的時候。所以我想,也許我可以把自己的一些經驗做一些總結。
我們部門用 Flink 的時間不長,應用領域限制在實時計算,所以我的經驗也只能圍繞一些我們實際用到的工程特性展開。至於其它一些特性比如機器學習支持之類,我也不甚了了。考慮到網上也有很多各種評測和討論,諸君不妨在 Google 上搜索相關的文章。
新上手的驚喜:小功能的大作用
剛上手 Flink 時,我驚喜地發現 Flink 添加了兩個很小但實用的功能,都恰到好處地解決了我們之前在用 Spark 時的一些工程上的痛點。這在一開始時給了我巨大的好感。
第一個好用的功能是項目模版。Flink 提供了 flink-quickstart-java 和 flink-quickstart-scala 插件,允許使用 Maven 的開發者創建統一的項目模版。熟悉 Maven 的朋友可能都了解,Maven 的 pom.xml 可以藉助各種插件完成極其靈活的功能,但需要程序員對插件有足夠多的了解才能駕馭。比如uber-jar 打包時去除不必要的依賴,在運行環境相對複雜的大型項目中十分必要,但很容易就因為配置不當混入額外的依賴包。在使用 Spark 的時候我們經常圖省事把需要的 Spark 運行時包組成 uber-jar,產生的 jar 包動輒上百 MB,集成測試時上傳更新非常不便。而 Flink 的模版則直接集成了一個良好的 uber-jar 配置,大部分 Flink 運行時包都被有效地排除了出去,最終產生的 jar 包經常只有數百 KB 到幾個 MB,上傳下載都節約了不少時間,大大提高了集成測試的效率。
另一個我迅速喜歡上的功能是 RichFunction 介面。Flink 的開發始於 Java 7 時期,因此其大部分操作符都是從某個介面繼承下來並擴展,直到 Java 8 開始才引入 lambda 作為操作符。作為習慣了 Spark 的開發者,我一開始也很自然地嫌棄麻煩的介面而盡量用 lambda,但很快地我就發現 Flink 的好處:它的 RichFunction 介面允許我實現一個 open() 函數,這恰恰是我用 lambda 時一直想要的功能。代碼示例如下:
public class MyFilter extennds RichFilterFunctttttion<String> { public void open(Configuration parameters) { // Call loadLibrary() } public boolean filter(String value) { // Call native functions via JNI } }
因為 open() 函數由 Flink 運行時保證在實際操作前被調用,我可以用它來完成一些重要的一次性工作,比如初始化 JNI 調用前必須的 loadLibrary() 操作。在使用 Spark 時,因為沒有 open() 函數,我必須利用 Scala 的 lazy val 之類的語言機制來完成這類動作。然而 lazy val 本身一些限制導致它在一些場景下並不容易使用(比如用 main() 函數傳入的參數來初始化 lazy val),加上 Spark 代碼在 worker 節點上的執行方式需要經過反序列化,導致全局變數的初始化很容易出錯,導致我不得不退化到引入靜態類,用更多的代碼解決問題。有了 open() 函數後,雖然看上去繁瑣,但我可以完全控制整個初始化動作,而且去掉了很多繁瑣的全局初始化操作,代碼也更加簡潔。
後續使用中的思考:流式處理中的 Fail fast
在使用 Flink 一段時間後,我們注意到 Flink 一個很有意思的行為:如果一個 task manager 節點因為機器損壞而掉線,整個程序就會直接終止並從 checkpoint 恢復,而不會保持程序運行並從集群中重新申請一台空白機器進行恢復。這個行為是 Flink 官方文檔確認的:
In case of a program failure (due to machine-, network-, or software failure), Flink stops the distributed streaming dataflow. The system then restarts the operators and resets them to the latest successful checkpoint. The input streams are reset to the point of the state snapshot. Any records that are processed as part of the restarted parallel dataflow are guaranteed to not have been part of the previously checkpointed state.
事實上,這個實現方式在公司內部討論時曾引起過相當大的爭議。很多同事以此作為 Flink 設計有重大缺陷的範例大加批評。一開始我對此也頗為迷惑,但經過一段實踐之後,我的看法有所改變;事實上,如今我更傾向於認為,也許 Flink 的處置才是最合適的解法。
我可以理解這個行為大受批評的原因:它違反許多人對分散式計算的直覺。理想情況下,一個好的分散式程序似乎應該能夠在單個節點掉線之後用一個新的節點代替,並繼續計算。但在流式處理的上下文下,這個動作其實不容易做到。出於性能優化和實現難度的考慮,許多帶狀態的操作都需要將狀態保存在固定的機器上(比如 Spark 的 mapWithState()),然後將整體的狀態以 checkpoint 的形式定期保存。雖然理論上我們可以將狀態單獨複製若干份分別存儲在多台機器上以備恢復,但這個做法和 checkpoint 的功能重複,而且更容易出錯——如果整個集群里所有機器都在持續向前處理,而只有一台機器狀態是從災難恢復的,那就意味著它的狀態和整體相比會滯後一段時間;這種情況下,要麼所有機器停下來等它追上來,要麼就保持不一致繼續跑下去。前者無法避免延遲,效果和整個程序重啟並無大的區別;而後者則在很多場景下則完全不可接受。如果非要如此一條路走到黑,也許更好的方法是將狀態整個剝離出來作為一個有可靠性保證的獨立服務,但那是將問題轉移到了一個新的服務里。幾經考慮之後我最終相信,這個看似不合理的設計是有其內在的合理性的。
選擇接受 Flink 的方案之後,那麼對應的修補也就有了方向。既然 Flink 的錯誤恢復依賴快速的 checkpoint,那麼我們就在加快程序崩潰檢查和提高 HDFS 性能兩方面下功夫。經過一段時間的調整,我和同事們成功優化了我們的 HDFS 系統,保證單個 app 每分鐘穩定存入 300GB 的 checkpoint,這讓我們 Flink 程序的斷流恢復時間縮短到三分鐘左右,基本解決了災難恢復時的延遲問題。
對一些迷思的回應:Flink vs. Spark
網上早已有許多關於 Spark vs. Flink 的文章,我自己也拜讀過不少。據實而言,我對一些觀點並不十分認同;然而有些討論流傳頗遠,也影響了很多朋友對兩者的觀點。我嘗試地總結了一些,一來是梳理自己的看法,二來也是給問我問題的朋友們一個回應。
第一個常見的爭論是:Flink 比 Spark 好,是因為 Flink 是 per-event 的,而 Spark 是基於 mini-batch 的。事實上,在我的觀察中,這個特性差異在大部分項目的技術選型中並不起決定性作用。因為除了一部分諸如入侵檢測之類的特定業務類型要求嚴格的單個請求立即處理之外,大部分實時處理業務都允許有一定的延遲。在我的觀察中,Spark 的 mini-batch 在分鐘級別已經足夠穩定,足以支撐產品級別的業務。
有了第一個爭論,也就不難注意到另一個新近的說法:2018 年二月發布的 Spark 2.3 已經支持 continuous processing,所以 Flink 唯一的競爭優勢已經消失。其實這個論調不值得一駁,因為 Spark 官方文檔已經明確指出,這個特性如今只適用於簡單操作。回想一下 Spark 從 1.4 引入 DataFrame 到 2.2 真正宣布穩定可用(GA),我們不難預見,Spark 社區很可能還需要幾年的時間來持續完善 continuous processing。
As of Spark 2.3, only the following type of queries are supported in the continuous processing mode.
> Operations: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (select
,map
,flatMap
,mapPartitions
, etc.) and selections (where
,filter
, etc.).
第三個爭論:Flink 宣稱的 event time 和 watermark 機制 Spark 已經在 2.2 版本中也提供了,所以 Spark 和 Flink 一樣好用。事實上,且不說 Spark 引入 event time 和 watermark 比 Flink 晚得多(event time 在 2.0 開始加入,watermark 則到 2.2 才得到完整支持),即便是現在的 2.3 版本,Spark 也只支持相對簡單的 fixed watermark, Flink 支持的 watermark 動作則可以通過繼承介面擴展出更加定製化的邏輯,所以 Flink 仍然有相當的優勢。
不算結語的結語:我們仍在前進
我們對 Flink 的使用才剛剛開始。去年我們用在我司後台的一個功能上,成功地實現了處理延遲從小時級到秒級的跨越;而最近一段時間的項目中,我的團隊正在嘗試著結合 Flink 流式處理和統計學習的一些方法,對公司的一些數據源進行挖掘,以期找到一些有趣的特性。限於公司商業機密,我無法在專欄里討論項目細節。也許在不久的將來我們有機會將其產品化,那麼大家也就都能見到了。
推薦閱讀:
※阿里雲表格存儲負載均衡實踐
※各大公司分散式存儲領域相關論文
※快速打造分散式深度學習訓練平台
※分散式系統設計:批處理模式之事件驅動的批處理