Pyflow : 一個基於工作流的編程模型(Flow Based Programing) 工具
來自專欄剛是程序員
Flow Based Programing 是由J. Paul Rodker Morrison在很早以前提出的一種編程範式。
維基百科對FBP的定義如下:
In computer programming, flow-based programming (FBP) is a programming paradigm that defines applications as networks of "black box" processes, which exchange data across predefined connections by message passing, where the connections are specified externally to the processes. These black box processes can be reconnected endlessly to form different applications without having to be changed internally. FBP is thus naturally component-oriented.
在github的這個https://github.com/samuell/awesome-fbp項目內列舉了很多不同語言對該範式的實現以及一些資料,大家可以參考。
很多年前我用Python開發了一個基於流概念的數據處理工具。當時主要是想解決讓不懂編程的數據工程師能夠通過構建圖形化的數據流來達到數據獲取,變形和抽取的功能。這麼多年過去了,我整理了一下代碼,豐富了一下基本功能,構建了簡單的運行UI,算是有個初步的雛型,看看能不能分享給社區做些貢獻。
項目在這裡:
- https://github.com/gangtao/pyflow
- 碼雲地址 https://gitee.com/gangtao/pyflow
其實利用Flow的概念在軟體項目中很常見。例如:
- Apache NiFi
- DAG in spark
- AWS Step Functions
- Azure ML Studio
- TensorBoard from Tensorflow
- Scratch programing language
- argo an open source container-native workflow engine
我么下面來看看這個項目的基本概念和如何使用吧。
Flow的基本概念
Flow的基本概念很簡單,就是一個有向無環圖(DAG),數據在節點間流動。
- 節點 Node 節點是組成流的主要單元,負責對流入節點的數據進行處理,並輸出到後續節點進行進一步的處理。
- 埠 Port每個節點擁有輸入和輸出埠,輸入埠負責數據流入節點,輸出埠負責數據流出節點。每個節點都可能擁有一個或者多個輸入和輸出埠。
- 連接 Link一個節點的輸出埠連接到另一個節點的輸入埠,節點處理好的數據通過連接流入其後的節點。
Flow的實現
Pyflow對Flow的實現基本思路就是用一個Python的函數function實現一個節點,輸入埠映射為函數的輸入參數。輸出埠映射為函數的返回值。
流中有一個節點被設置為終點節點(End Node),通過節點間的連接關係,以終點節點開始通過連接搜索所有的依賴關係(樹形查找),得到一個節點運行的棧。例如上圖,我們就可以得到一個 [node1,node2, node3] 這樣的棧。按順序出棧的方式執行每一個節點的功能就可以運行整個流。(注意,這是一個簡單版本的Flow的實現,仍然是一個批處理,不是streaming)
需要假定每一個節點的功能是無狀態的,這樣就可以利用輸入輸出埠對計算結果進行緩存,但輸入值是已經運算過的值的時候,不需要運算,直接返回已經計算過的值。
Pyflow的功能
節點
我們先介紹一下Pyflow的節點管理功能呢。Pyflow支持動態的創建和修改節點。如下圖:
左邊的節點列表列舉出當前所有已經定義的節點。
右邊包含一個節點的代碼編輯面板和一個節點屬性面板。
節點屬性列出選中節點的ID,Title,Port等屬性內容。
節點編輯面板提供對節點功能和代碼的編輯。代碼是節點對應的Python函數。利用函數的注釋生成對應的元數據,也就是埠的屬性。
上圖是一個節點定義了邏輯操作And,輸入埠是a和b兩個布爾值,輸出是a and b
上圖的按鈕分別對應以下功能:
- 增加一個新的節點
- 存儲當前的節點
- 刪除當前的節點
- 對當前的節點進行測試
例如之前And的例子,我們修改代碼後,點擊
對該節點的功能進行測試。
流
定義好節點後,我們就可以在流管理的界面創建和運行數據流。
左邊的面板是所有應定義好的節點,用戶可以通過拖拽的方式把節點拖到右邊的FLow面板來構建一個數據流。
當選中一個節點的時候,對應的Inspector面板會顯示當前選中節點的屬性,包含
- ID
流中該節點的唯一標誌
- Specification ID該節點對應的Spec的ID,Spec可以理解為該節點的類,而該節點在流中是該類的一個實例。
- Title節點的名字
- Input Port輸入埠的信息,用戶可以對沒有連接的埠給出輸入值。對於已經連接的埠,會給出連接信心。
- Output Port輸出埠的信息。同樣對於有連接的給出連接信息
- Status當前節點運行的狀態
- Error運行的出錯信息
- Action點擊
按鈕,會把當前節點設置為終點節點,並運行當前的數據流。
上面的按鈕分別定義了:
- 創建新的數據流
- 載入已經保存的數據流
- 保存當前的數據流
- 顯示當前數據流的JSON表示
- 刪除選中的節點
倉庫
Pyflow內置一個基於Sqlite的數據倉庫,用於保存所有的節點Sepc定義和數據流。用戶可以使用action菜單的Dump和Load功能來載入或者導出。
Pyflow的幾個例子
Pyflow給出了幾個數據流的例子。
數學運算
pyflow.sample.math:Math是一個數學運算的例子。
這個例子運算 (2+3)X(4+5)/ 5
邏輯運算
pyflow.sample.xor:Xor是利用基本的邏輯操作實現異或(XOR)。數據流如下:
異或只有在兩個輸入不同的時候才輸出真值。
Pyflow給出數據流運行的每一步的結果,讓我們可以直觀的理解整個數據流的處理過程。
利用多輸出,我們可以得到一個簡化版本的異或。
這裡Self and Not節點的輸出埠有兩個,同時輸出輸入值和輸入的非值。整個數據流變得更簡潔。
錯誤處理
pyflow.sample.error:Error是一個對出錯處理的演示。
該數據流演示了如果數據流中一個節點出錯的情形。
由於數據流中的一個節點出錯,該數據流無法正常完成,流運行的結果給出每一步的結果,出錯節點之後對出錯節點有依賴的節點處於Skip狀態,沒有運行。當出錯被恢復後,已經運行成功的節點可以直接從緩存中獲取之前運行成功的值。
Flow的優缺點
Flow的優點:
- 直觀
- 可重用性
- 利用埠緩存提高效率
Flow的缺點或局限:
- 節點的定義很困難,如果單個節點很大,負責很多東西,則可重用性變差;如果單個節點和簡單,則flow會變的很複雜。
- 並非所有的功能都能映射為無狀態的節點,例如取當前時間,產生隨機數。
- 不支持反饋,這個是當前實現的限制,由於要構建運行棧,不支持Flow中有環路。
總結
利用Flow的概念在編程中不是什麼新鮮概念,我希望我的工具能給大家帶來一些幫助。如果大家喜歡也可以和我一起來完善這個工具,這裡有幾個想法:
- 當前的運行核心不支持stream運算,想要支持,對核心改動比較大,但也不是做不到。
- 當前的節點是運行在一個獨立的進程中,更好的做法是和容器結合,讓每一個節點成為一個容器,這樣更安全,也有更多的實用性。
- 我的代碼sucks,請大家多多批評。我不是一個前端工程師,前端要實現這麼多功能太累了。
- Python 2 升級到 3,想想頭就疼
項目已經發布到dockerhub了,大家只要運行:
docker run -P naughtytao/pyflow:latest
推薦閱讀:
※編程的思考其二
※編譯原理學習
※Ian Goodfellow 最新問答:11 歲學會編程,因為喜歡遊戲編程入坑深度學習
※《python編程 從入門到實踐》筆記
※最為經典的計算機編程語言之一 C語言