利用Kettle進行數據同步(下)
來自專欄我在微服務圈子裡混
上篇介紹了基於kettle的數據同步工程的搭建,entrypoint.kjb就是整個工程執行的入口。
為了進一步降低操作成本,讓整個數據同步過程更穩定、安全,需要進行更高層面的抽象,做成一個簡單易用的系統。
以下是應用截圖:
除了選擇數據源和資料庫之外,還加入了授權碼,意味著授權範圍內的用戶才能使用該系統。
因為是內部使用,授權用戶還沒實現後台管理,直接往應用資料庫里添加,所選擇的數據源和資料庫都是通過配置文件生成的。
文末會附上GitHub上的源碼地址,有需要的讀者,可以進行二次開發改造。
一、資料庫設計
資料庫名稱:kettle,目前有兩張表:
1、授權用戶表。表內記錄的用戶即可使用數據同步系統。
CREATE TABLE `authorized_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 用戶ID,自增, `user` varchar(128) NOT NULL COMMENT 用戶名,全局唯一, `token` varchar(20) NOT NULL COMMENT 用戶的授權碼,全局唯一, `status` char(1) NOT NULL DEFAULT A COMMENT 授權用戶狀態:A-已授權,R-未授權, `gmt_create` datetime NOT NULL COMMENT 創建時間, `gmt_modify` datetime NOT NULL COMMENT 最後修改時間, PRIMARY KEY (`id`), UNIQUE KEY `unique_index_token` (`token`) USING BTREE, UNIQUE KEY `unique_index_user` (`user`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=授權用戶表
2、同步記錄表。記錄用戶的數據同步操作。
CREATE TABLE `sync_record` ( `sync` varchar(20) NOT NULL COMMENT 同步記錄主鍵, `ipv4` varchar(15) NOT NULL COMMENT ip地址, `from_db` varchar(100) NOT NULL COMMENT 源數據, `to_db` varchar(100) NOT NULL COMMENT 目標數據, `user` varchar(128) NOT NULL COMMENT 用戶名, `token` varchar(20) NOT NULL COMMENT 用戶的授權碼, `status` char(1) NOT NULL DEFAULT P COMMENT 同步狀態:P-正在執行,S-成功,F-失敗, `gmt_create` datetime NOT NULL COMMENT 同步創建時間, `gmt_modify` datetime NOT NULL COMMENT 最後修改時間, PRIMARY KEY (`sync`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=同步記錄表;
二、程序設計
因為系統做得比較簡單實用,沒有什麼特別設計之處。筆者重點說三點:
1、數據源及其參數配置。
在application.yml配置文件中,有這麼一段配置:
env: entry-point: kettle/entrypoint.kjb databases: - taxi-user - taxi-account - taxi-trade - taxi-coupon - taxi-bi - taxi-system - taxi-credits - taxi-finance - taxi-notification - taxi-gateway from-dbs: - PROD - TEST - LOCAL to-dbs: - LOCAL - TEST db-settings: - name: LOCAL host: ***** port: 3306 user: ***** password: ***** - name: TEST host: ***** port: 3306 user: ***** password: ***** - name: PROD host: ***** port: 3306 user: ***** password: *****
利用了springboot的@ConfigurationProperties的註解。
@Setter@Getter@ConfigurationProperties(prefix = "env")public class EnvConfig { private List<String> databases; private List<String> fromDbs; private List<String> toDbs; private List<DBSetting> dbSettings; public DBSetting getDBConfig(String name) { if (StringUtils.isBlank(name)) return null; return dbSettings.stream().filter(dbSetting -> dbSetting.getName().equalsIgnoreCase(name)).findFirst().orElse(null); }}
當中的DBSetting的定義如下所示:
@Setter@Getter@NoArgsConstructorpublic class DBSetting { private String name; private String host; private String port = "3306"; private String user = "root"; private String password; public DBSetting(String host, String user, String password) { this.host = host; this.user = user; this.password = password; }}
通過客戶端傳來的參數,即可定位到對應的參數設置。
2、集成kettle的API。
因為kettle相關的jar包放在了自己搭建的nexus私服上,所以如果使用的是maven管理jar包的話,需要在settings.xml配置文件中做一點修改:
<mirror> <id>nexus</id> <url>公司內部的nexus的URL</url> <mirrorOf>*,!pentaho-releases</mirrorOf> </mirror>
其中的mirrorOf節點加上了!pentaho-releases,表示排除pentaho-releases。
然後,在springboot工程中的pom.xml中指定pentaho-releases的url。
<repositories> <repository> <id>pentaho-releases</id> <url>https://nexus.pentaho.org/content/groups/omni/</url> </repository></repositories>
接下來是核心的對接代碼,具體可以參照工程源碼。
JobMeta jobMeta = getJobMeta(new ClassPathResource(envConfig.getEntryPoint()));Job job = new Job(null, jobMeta);//設置Variablejob.setVariable("sync", sync);job.setVariable("TO_HOST", toDbSetting.getHost());job.setVariable("TO_DB", form.getDb());job.setVariable("TO_USER", toDbSetting.getUser());job.setVariable("TO_PASSWORD", toDbSetting.getPassword());job.setVariable("TO_PORT", toDbSetting.getPort());job.setVariable("FROM_HOST", fromDbSetting.getHost());job.setVariable("FROM_DB", form.getDb());job.setVariable("FROM_USER", fromDbSetting.getUser());job.setVariable("FROM_PASSWORD", fromDbSetting.getPassword());job.setVariable("FROM_PORT", fromDbSetting.getPort());job.start(); //開始執行Jobjob.waitUntilFinished(); //等待Job完成
3、非同步執行作業
因為一個Job的執行時間可能會很長,這個主要是看數據量的多少,所以一個request的來回可能會導致TIMEOUT,所以需要改為非同步的模式。
其核心的思想是:啟動新的線程,客戶端定時輪詢執行結果。
三、總結
筆者分兩篇文章介紹了如何利用kettle進行數據同步,並實現一個簡易的系統,降低操作成本和出錯率。
就介紹到這了,如有疑問,可以留言。
歡迎fork我的工程代碼。
https://github.com/liu-weihao/kettle推薦閱讀:
※千萬不要錯過!ICLR-2018精品論文解析
※了解一點模型部署與上線
※基礎數據分析——案例分析1(藥品銷售數據分析)
※如何構建一般時間序列問題的回歸解決方案
※【3-2】GAFATA 股票數據分析