利用Kettle進行數據同步(上)

利用Kettle進行數據同步(上)

來自專欄我在微服務圈子裡混

寫這篇文章,是源於公司內部的一個常見需求:將生產環境的數據同步到測試環境,以便更方便的進行測試和bug定位。

起初是用的Navicat Premium這款DB管理軟體,功能非常強大了,足以滿足開發人員的日常工作需求,也包括上述的數據同步需求。

隨著公司業務日臻完善,對於數據的安全性提出了更為嚴格的要求。

實際上,是不允許將任何環境的數據同步至生產環境的,即是生產環境的數據是錯誤的。Navicat Premium有數據傳輸這一功能,能夠將一個資料庫傳輸並覆蓋到另外一個資料庫,沒有任何限制。

此外,為了避免開發人員直接接觸到生產資料庫,筆者將高許可權的賬號都收回了,只授予其只讀許可權,保留了數據查看的能力。若是如此,那麼數據同步的工作壓力就指向了筆者。

為了尋求更為方便、穩定的方式去完成這個數據同步的工作,筆者把思路聚焦在了ETL工具上了。

如果僅僅是為了完成數據同步的功能需求,把ETL的概念拿出來未免顯得有些班門弄斧了。考慮到以後還會有數據處理方面的需求,研究一款ETL工具勢在必行(在寫此篇文章的時候,就出現了一個導出Excel的功能需求)。

理論上,日常內部的ETL需求都可以通過「代碼 + 腳本」的方式實現。但是,在筆者看來都是無意義的重複造輪子,耗時耗力,如果能掌握一款ETL工具,無疑能減輕不少的工作量。

當然,市面上的ETL工具也不在少數,國內外的企業都有成品,但是本著開源免費,強大好用的原則,最終就只剩下kettle了。

使用kettle之前,還是需要有一些基礎知識背景。說直白點,如果給不懂技術的PM使用,都將會以一句「mmp」結束。全面學習kettle是沒有必要的,但是應該知道它能有哪些功能,大致能完成哪些工作,以便今後充分利用之。推薦閱讀如下文章:

賈東坡:開源ETL工具-kettle初識?

zhuanlan.zhihu.com圖標

ETL的過程就是一條工作流,以下是此次要實現的數據同步流程:

客戶端填充兩方資料庫的設置信息,如:host、user、password、database等,這些設置信息都將以variable的形式存在於kettle中。

因為指定的database中可能有多張表,所以在kettle內部中,循環的執行獲取數據,清空表,提交數據的流程。

當然,流程之間還是有些細節的,下面將講解如何用kettle搭建資料庫同步的工程。

此工程中的作業(Job)和轉換(Transformation)有嵌套關係,本著「自頂向下的設計,自底向上的實現」的原則,我們先將幾個子流程都配置好,再進行相關的串聯。

新建一個轉換,保存命名為「提交數據.ktr」。

準備兩個資料庫連接,主對象樹->右鍵DB連接->新建。

FROM DB

TO DB

可以看到,已經預留了DB相關的variable,使用${variable}的形式。

特別說明:密碼也是按照了這種形式填寫,kettle也能識別這是一個variable。

因為其他作業和轉換都用到了這兩個DB連接,可以將其設置成共享。

共享DB連接

新建Transformation,保存命名為「數據同步.ktr」。

提交數據的流程

T1:清空表。核心對象->腳本->執行SQL腳本。要清空的表名使用variable代替了,勾選「變數替換」的CheckBox。

清空表

T2:獲取表數據。核心對象->輸入->表輸入。獲取表裡的全部數據,選擇數據來源,表名使用variable代替了,勾選「替換SQL語句的替換」的CheckBox。

獲取表數據

T3:提交數據。核心對象->輸入->表輸出。選擇資料庫連接,目標表使用variable代替,提交記錄數量指的是一次commit的數據量,視實際的數據量情況而定,默認是1000條,並且建議勾選「使用批量插入」。

提交數據

因為需要確保清空表的操作先完成,所以做了一步阻塞。也就是不完成了T1:清空表的步驟,就不會進行T3:提交數據的步驟。

核心對象->流程->阻塞數據直到步驟都完成。

阻塞數據

再按住shift鍵,將各個步驟連接起來,提交數據的Transformation就完成了。

接下來是獲取表名的Transformation:

獲取表名

新建Transformation,保存命名為「獲取全量表名.ktr」。

T1:獲取表名,核心對象->輸入->獲取表名,選擇資料庫連接,勾選「包含表」即可,名稱欄位可以自定義,這裡設置的是table_name。

T1:獲取表名

T2:選擇欄位,核心對象->轉換->欄位選擇。主要是指定需要的欄位,顯然我們需要table_name欄位,也可以自定義改名,這裡改名成tablename。

選擇欄位

T3:複製記錄到結果,核心對象->作業->複製記錄到結果。主要是作為下一個步驟的輸入。

複製記錄到結果

再按住shift鍵,將各個步驟連接起來,獲取表名的Transformation就完成了。

接下來是一個中間的轉換過程,就是取出表名,然後設置到指定的變數中,以便提交數據的時候獲取${TABLENAME}。

新建轉換,保存命名為「獲取變數.ktr」。

T1:獲取表名變數值,核心對象->作業->從結果獲取記錄,指定要獲取的欄位名稱是tablename,就是上述獲取表名改名成的tablename。

T1:獲取表名變數值

T2:設置變數值,核心對象->作業->設置變數,設置TABLENAME變數,選擇變數作用範圍「Valid in the root job」。

T2:設置變數值

至此,所有的Transformation都完成了,需要通過Job來連接Transformation了。

新建Job,保存命名為「獲取變數-數據同步.kjb」,分別在核心對象中添加一個START,和兩個轉換,兩個轉換分別對應了上述已經準備好的「獲取變數.ktr」和「數據同步.ktr」,連接起來即可。

獲取變數-提交數據作業

接下來是最後一個Job,新建Job,保存命名為「entrypoint.kjb」。添加一個START,一個轉換,一個作業和一個成功結束節點。

entrypoint

獲取全量表名返回的結果是一個列表,所以「獲取變數-數據同步」作業需要循環執行。在設置作業的時候,勾選「執行每一個輸入行」。

S2:獲取變數-數據同步

至此,整個數據同步的工程已經搭建好了,即可運行entrypoint.kjb。在運行的時候,需要設置資料庫的相關參數。

做到這一步,還是不夠的,每次執行作業還需要輸入這麼多參數,還有可能會出現失誤。

kettle的強大之處就是還提供了Java API,可以基於此,做更高層次的抽象,使操作成本進一步降低。

下篇將講解如何實現一個基於kettle的數據同步系統。

推薦閱讀:

sql中插入中文問題
Quizlet Spanner 測試報告
HBase 資料庫學習筆記
學習SQL【8】-謂詞和CASE表達式
關係型資料庫的原理

TAG:數據挖掘 | ETL | 資料庫 |