標籤:

DataX的一次bug修復經歷

DataX的一次bug修復經歷

來自專欄鴻的閱讀筆記1 人贊了文章

在一次數據傳輸任務中,使用DataX從FTP伺服器傳輸數據到Oracle資料庫,使用ftpreader和Oraclewriter插件,和往常一樣配置json文件(已做脫敏處理):

{ "setting": {}, "job": { "setting": { "speed": { "channel": 15 } }, "content": [ { "reader": { "name": "ftpreader", "parameter": { "protocol": "ftp", "host": "${ftp_host}", "port": ${ftp_port}, "username": "${ftp_user}", "password": "${ftp_pwd}", "path": [ "/path/to/data" ], "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ], "encoding": "UTF-8", "fieldDelimiter": "|" } }, "writer": { "name": "oraclewriter", "parameter": { "username": "${Oracl_user}", "password": "${Oracle_pwd}", "column": [ "id", "number" ], "batchSize":65535, "preSql": [ "truncate table table_name" ], "connection": [ { "table": [ "table_name" ], "jdbcUrl": "jdbc:oracle:thin:@${Oracle_host}:${Oracle_port}:${Oracle_sid}", } ] } } } ] }}

但是運行到800萬行的時候報錯了:

java.lang.OutOfMemoryError: Java heap space at com.csvreader.CsvReader.updateCurrentValue(Unknown Source) ~[na:na] at com.csvreader.CsvReader.checkDataLength(Unknown Source) ~[na:na] at com.csvreader.CsvReader.readRecord(Unknown Source) ~[na:na] at com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.splitBufferedReader(UnstructuredStorageReaderUtil.java:72) ~[na:na] at com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.doReadFromStream(UnstructuredStorageReaderUtil.java:288) ~[na:na] at com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.readFromStream(UnstructuredStorageReaderUtil.java:214) ~[na:na] at com.alibaba.datax.plugin.reader.ftpreader.FtpReader$Task.startRead(FtpReader.java:244) ~[na:na] at com.alibaba.datax.core.taskgroup.runner.ReaderRunner.run(ReaderRunner.java:57) ~[datax-core-0.0.1-SNAPSHOT.jar:na] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]

DataX居然報內存溢出了,DataX的內存管理應該不至於這樣,於是第一時間想到調大可用內存,查看默認的Java參數是-Xms1g -Xmx1g ,那麼在運行時增加 -j"-Xms4g -Xmx4g"參數,改成4G。 但是依然報錯了:

java.lang.OutOfMemoryError: Java heap space at com.csvreader.CsvReader.updateCurrentValue(Unknown Source) ~[na:na] at com.csvreader.CsvReader.checkDataLength(Unknown Source) ~[na:na] at com.csvreader.CsvReader.readRecord(Unknown Source) ~[na:na] at com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.splitBufferedReader(UnstructuredStorageReaderUtil.java:72) ~[na:na] at com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.doReadFromStream(UnstructuredStorageReaderUtil.java:288) ~[na:na] at com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.readFromStream(UnstructuredStorageReaderUtil.java:214) ~[na:na] at com.alibaba.datax.plugin.reader.ftpreader.FtpReader$Task.startRead(FtpReader.java:244) ~[na:na] at com.alibaba.datax.core.taskgroup.runner.ReaderRunner.run(ReaderRunner.java:57) ~[datax-core-0.0.1-SNAPSHOT.jar:na] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]

報錯內容依然是內存溢出,這時候有可能不是內存溢出的問題了,更加仔細的檢查日誌,發現每次都是在8429056這一行的附近出問題,於是將這一行的相關 取出來:(從ftp下載數據到本地目錄) sed -n 8429050,8429060p /path/to/data 將json文件改成從本地讀取數據(已做脫敏處理):

{ "setting": {}, "job": { "setting": { "speed": { "channel": 15 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": [ "/data/test.txt" ], "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } "encoding": "UTF-8", "fieldDelimiter": "|" }, "writer": { "name": "oraclewriter", "parameter": { "username": "${Oracle_user}", "password": "${Oracle_pwd}", "column": [ "id", "number" ], "batchSize":65535, "preSql": [ "truncate table table_name" ], "connection": [ { "table": [ "table_name" ], "jdbcUrl": "jdbc:oracle:thin:@${Oracle_host}:${Oracle_port}:${Oracle_sid}", } ] } } } ] }}

這時候發現依然報錯,但是報錯內容變了(已做脫敏處理):

{"message":"您嘗試讀取的列越界,源文件該行有 [1] 列,您嘗試讀取第 [2] 列, 數據詳情[xx, xx|xx
]","record":[{"byteSize":16,"index":0,"rawData":"xx","type":"STRING"},{"byteSize":50,"index":1,"rawData":"xxx","type":"STRING"}],"type":"reader"}

猜測有可能是越界導致的內存溢出,增加測試數據數量: sed -n 8429050,8439060p /path/to/data 發現還是依然報錯,但是數據詳情更長了,這時候基本上確定是因為DataX讀取列越界了,逐步減少數據量,檢查數據,最後發現是

xx|"xx$

這一行數據出問題了,問題應該是"這個符號引起的,導致轉義了。 按照DataX的文檔,應該是增加:

"csvReaderConfig":{ "safetySwitch": false, "skipEmptyRecords": false, "useTextQualifier": false}

將csv的符號轉義,但是發現沒有任何用,都沒有設置成功

2018-07-17 09:24:35.084 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默認值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":"|","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":""","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值為[null]

陸陸續續增加:

"fileType": "csv"

等參數,發現還是沒有用,這就有可能是源碼的問題了,從上述報錯信息,找到UnstructuredStorageReaderUtil類,檢查代碼: 從運行邏輯readFromStream開始排查,最後發現是validateCsvReaderConfig和setCsvReaderConfig兩個方法處理csvReaderConfig配置,仔細檢查代碼邏輯, 在Hdfsreader注意到兩個方法的順序是:

UnstructuredStorageReaderUtil.validateCsvReaderConfig(this.readerOriginConfig);

於是首先確認validateCsvReaderConfig再setCsvReaderConfig,而txtfilereader和ftpreader的邏輯里沒有先validateCsvReaderConfig,而是直接setCsvReaderConfig,但是setCsvReaderConfig本身是沒有讀取配置文件的csvReaderConfig選項,需要validateCsvReaderConfig配置,於是在doReadFromStream的方法里在setCsvReaderConfig(csvReader);前加上:

validateCsvReaderConfig(readerSliceConfig);

重新編譯plugin-unstructured-storage-util模塊,替換txtfilereader,進行測試,發現csvReaderConfig生效了,那基本上就確定是這個問題了。替換生產環境的txtfilereader和ftpreader的jar包,修改配置的json文件,成功運行。

後續:應該是DataX插件自身的bug,提了一個pull request給DataX。

微信公眾號:鴻的閱讀筆記

關注了解更多的分散式系統,python,scala的知識。問題或建議,請公眾號留言;

weixin.qq.com/r/2ztpcc7 (二維碼自動識別)

推薦閱讀:

49歲的李彥宏,劇透了2049年的科技生活...
【播報】2015中國畜牧生物科技大會聽課筆記(完結篇)
如何修改ipython notebook 的默認文件夾路徑?
「魂動」+三大科技,第二代CX-5的升級夠誠意嗎?
【圖像展】iOS進化之路:從iOS 1.0到12.0

TAG:科技 |