中國民生銀行大數據團隊的Flume實踐

作者 | 文喬

編輯 | Vincent

AI前線出品| ID:ai-front

AI 前線導語:「中國民生銀行伺服器的操作系統種類眾多,除 Linux 外,部分生產系統仍採用 AIX 和 HP-UNIX 操作系統,由於在 AIX 和 HP-UNIX 無法使用 Logstash 作為日誌採集端,在大數據基礎平台產品團隊經過一系列選型後,採用 Flume 作為 AIX 和 HP-UNIX 操作系統上日誌採集端。本文作者中國民生銀行總行信息技術部大數據基礎產品平台組,他將與我們分享民生銀行在 Flume 上的實踐」。

一. Flume 簡介

Apache Flume 是 Cloudera 公司開源的一款分散式、可靠、可用的服務,可用於從多種不同數據源收集、聚集、移動大量日誌數據到集中數據存儲中;它通過事務機制提供了可靠的消息傳輸支持,並自帶負載均衡機制來支撐水平擴展。尤其近幾年隨著 Flume 的不斷被完善以及升級版本的逐一推出,特別是 flume-ng 的推出,以及 Flume 內部的各種組件不斷豐富,用戶在開發的過程中使用的便利性得到很大的改善,現已成為 Apache 頂級社區項目之一。

二. 中國民生銀行 Flume 實踐

中國民生銀行伺服器的操作系統種類眾多,除 Linux 外,部分生產系統仍採用 AIX 和 HP-UNIX 操作系統,由於在 AIX 和 HP-UNIX 無法使用 Logstash 作為日誌採集端,在大數據基礎平台產品團隊經過一系列選型後,採用 Flume 作為 AIX 和 HP-UNIX 操作系統上日誌採集端。

2016 年我們在測試環境進行試驗,使用的版本是 Apache Flume 1.6,在使用 Taildir Source 組件和核心組件的過程中,發現其無法完全滿足我們的需求,例如:

  1. 若 filegroup 路徑中包含正則表達式,則無法獲取文件的完整路徑,在日誌入到 Elasticsearch 後無法定位日誌的路徑;
  2. Taildir Source 不支持將多行合併為一個 event,只能一行一行讀取文件;
  3. filegroup 配置中不支持目錄包含正則表達式,不便配置包含多個日期並且日期自動增長的目錄,例如 /app/logs/yyyymmdd/appLog.log;
  4. 在使用 Host Interceptor 時,發現只能保留主機名或者是 IP,二者無法同時保留。

在研究 Flume 源碼之後,我們在源碼上擴展開發。截至目前,我們為開源社區貢獻了 4 個 Patch,其中 FLUME-2955 已被社區 Merge 並在 1.7 版本中發布,另外我們在 Github 上開放了一個版本,將 FLUME-2960/2961/3187 三個 Patch 合併到 Flume 1.7 上,歡迎大家下載使用,

Github 地址:

github.com/tinawenqiao/,分支名 trunk-cmbc。

接下來本文將對每個 Issue 進行詳細介紹:

三. FLUME-2955

3.1 問題和需求

為了採集後綴為 log 的日誌文件,filegroups 設置如下:

agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSourcenagent.sources.s1.filegroups = f1 nagent.sources.s1.filegroups.f1 = /app/logs/.*.logn

註:安卓手機端讀者查看代碼時可左右滑動閱讀完整代碼

若 /app/logs 目錄中存在 a.log、b.log、c.log 三個文件,在 Flume 1.6 版本中,雖然可以通過 headers..在 event 的 header 里放入自定義的 key 和 value,但是由於正則表達式匹配上了目錄中多個文件,所以無法通過該方法設置,這樣導致日誌數據入到 Elasticsearch 後,用戶從 Kibana 從查詢時無法定位到數據所在的日誌文件路徑。

3.2 解決辦法

增加 fileHeader 和 fileHeaderKey 兩個參數,兩個參數含義分別是:

修改類 ReliableTaildirEventReader 中 readEvents() 方法,根據配置文件的值,選擇是否在 event 的 header 里加入文件的路徑,主要代碼如下:

Map<String, String> headers = currentFile.getHeaders();nif (annotateFileName || (headers != null && !headers.isEmpty())) {n for (Event event : events) {n if (headers != null && !headers.isEmpty()) {n event.getHeaders().putAll(headers);n }n if (annotateFileName) {n event.getHeaders().put(fileNameHeader, currentFile.getPath());n }n }n}n

3.3 相關配置示例

agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSourcenagent.sources.s1.filegroups = f1 nagent.sources.s1.filegroups.f1 = /app/logs/.*.lognagent.sources.s1.fileHeader = truenagent.sources.s1.fileHeaderKey = pathn

四. FLUME-2960

4.1 問題和需求

在實際應用寫日誌時,很多系統是根據日期生成日期目錄,每個日期目錄中包含一個或多個日誌文件,因此存在:

/app/logs/20170101/、/app/logs/20170102/、/app/logs/20170103/

等多個目錄,且 /app/logs/ 目錄下每天會自動生成新的日期目錄,但是根據 Taildir Source 中 filegroups.的描述,只支持文件名帶正則,因此 1.6 版本的 Taildir Source 無法滿足該需求。

4.2 解決辦法

增加 filegroups..parentDir 和 filegroups..filePattern 兩個參數,兩個參數含義分別是:

修改類 TaildirMatcher 中匹配文件的方法,相關代碼如下:

private List<File> getMatchingFilesNoCache() {n final List<File> result = Lists.newArrayList();n try {n Set options = EnumSet.of(FOLLOW_LINKS);n Files.walkFileTree(Paths.get(parentDir.toString()), options, Integer.MAX_VALUE,n new SimpleFileVisitor<Path>() {n @Overriden public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {n if (fileMatcher.matches(file.toAbsolutePath())) {n result.add(file.toFile());n }n return FileVisitResult.CONTINUE;n }n @Overriden public FileVisitResult visitFileFailed(Path file, IOException exc) {n return FileVisitResult.CONTINUE;n }n });n } n ...n}n

另外進行了配置參數的兼容性處理,用戶仍可保留以前的 filegroups 配置,不需單獨配置 parentDir 和 filePattern,程序會將 filegroups 中的文件的目錄賦值給 parentDir,文件名賦值給 filePattern。

需要注意的是:在 Taildir Source 中有個參數 cachePatternMatching,默認值是 true,其作用是緩存正則匹配的文件列表和消費文件的順序,若目錄中文件較多時,使用正則匹配比較耗時,設置該參數可提高性能,當發現文件的目錄修改後會刷新緩存列表。由於 filePattern 中可包含目錄,若 cachePatternMatching 設為 true,在 filePattern 的子目錄中新增文件,parentDir 的修改時間不變,此時新增的日誌文件不能被跟蹤到,因此,建議在 filePattern 包含目錄的情況下,將 cachePatternMatching 設置為 false

4.3 相關配置示例

agent.sources.s2.type = org.apache.flume.source.taildir.TaildirSourcenagent.sources.s2.filegroups = f1 f2nagent.sources.s2.filegroups.f1.parentDir = /app/log/nagent.sources.s2.filegroups.f1.filePattern = /APP.log.d{8}nagent.sources.s2.filegroups.f2.parentDir = /app/log/nagent.sources.s2.filegroups.f2.filePattern = /w/.*lognagent.sources.s2.cachePatternMatching = falsen

五. FLUME-2961

5.1 問題和需求

Taildir Source 按行讀取日誌,把每一行作為內容放入 flume event 的 body 中,對於以下這種每行就可以結束的日誌處理沒有問題:

13 七月 2016 23:37:30,580 INFO [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:62) - Configuration provider startingn13 七月 2016 23:37:30,585 INFO [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:134) - Reloading configuration file:conf/taildir.confn13 七月 2016 23:37:30,592 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1013) - Processing:s1n

但是對於類似 Java Stacktrace 的日誌,如果按上述處理,以下日誌被截斷成 9 個 flume event(一共 9 行)輸出,而我們希望這樣的日誌記錄,要作為 1 個 flume event,而不是 9 個輸出:

13 七月 2016 23:37:41,942 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:229) - Failed to publish eventsnjava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.n at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)n at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)n at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:200)n at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)n at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)n at java.lang.Thread.run(Thread.java:745)nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.n

5.2 解決辦法

設計一個 buffer event 緩存多行內容,仿照 Logstash 的 codec/mulitline 插件配置,增加了如下參數:

主要修改了類 TailFile 里的 readEvents() 方法,相關代碼如下:

if (this.multiline) {n if (raf != null) { // when file has not closed yetn boolean match = this.multilinePatternMatched;n while (events.size() < numEvents) {n LineResult line = readLine();n if (line == null) {n break;n }n Event event = null;n logger.debug("TailFile.readEvents: Current line = " + new String(line.line) +n ". Current time : " + new Timestamp(System.currentTimeMillis()) +n ". Pos:" + pos +n ". LineReadPos:" + lineReadPos + ",raf.getPointer:" + raf.getFilePointer());n switch (this.multilinePatternBelong) {n case "next":n event = readMultilineEventNext(line, match);n break;n case "previous":n event = readMultilineEventPre(line, match);n break;n default:n break;n }n if (event != null) {n events.add(event);n }n if (bufferEvent != null) {n if (bufferEvent.getBody().length >= multilineMaxBytesn || Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == multilineMaxLines) {n flushBufferEvent(events);n }n }n }n }n if (needFlushTimeoutEvent()) {n flushBufferEvent(events);n }n}n

合併多行處理的方法代碼如下:

private Event readMultilineEventPre(LineResult line, boolean match)n throws IOException {n Event event = null;n Matcher m = multilinePattern.matcher(new String(line.line));n boolean find = m.find();n match = (find && match) || (!find && !match);n byte[] lineBytes = toOriginBytes(line);n if (match) {n /** If matched, merge it to the buffer event. */n mergeEvent(line);n } else {n /**n * If not matched, this line is not part of previous event when the buffer event is not null.n * Then create a new event with buffer events message and put the current line into then * cleared buffer event.n */n if (bufferEvent != null) {n event = EventBuilder.withBody(bufferEvent.getBody());n }n bufferEvent = null;n bufferEvent = EventBuilder.withBody(lineBytes);n if (line.lineSepInclude) {n bufferEvent.getHeaders().put("lineCount", "1");n } else {n bufferEvent.getHeaders().put("lineCount", "0");n }n long now = System.currentTimeMillis();n bufferEvent.getHeaders().put("time", Long.toString(now));n }n return event;n}nnprivate Event readMultilineEventNext(LineResult line, boolean match)n throws IOException {n Event event = null;n Matcher m = multilinePattern.matcher(new String(line.line));n boolean find = m.find();n match = (find && match) || (!find && !match);n if (match) {n /** If matched, merge it to the buffer event. */n mergeEvent(line);n } else {n /**n * If not matched, this line is not part of next event. Then merge the current line into then * buffer event and create a new event with the merged message.n */n mergeEvent(line);n event = EventBuilder.withBody(bufferEvent.getBody());n bufferEvent = null;n }n return event;n}n

3.3 相關配置示例

agent.sources.s3.multiline = truenagent.sources.s3.multilinePattern = ^AGENT_IP:nagent.sources.s3.multilinePatternBelong = previousnagent.sources.s3.multilineMatched = falsenagent.sources.s3.multilineEventTimeoutSeconds = 120nagent.sources.s3.multilineMaxBytes = 3145728nagent.sources.s3.multilineMaxLines = 3000n

六. FLUME-3187

6.1 問題和需求

為了獲取 Flume agent 所在機器的主機名或 IP,我們使用了主機名攔截器 (Host Interceptor),但是根據主機名攔截器的定義,只能保留主機名和 IP 中的一種,無法同時保留主機名和 IP。

Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

6.2 解決辦法

將原來的 useIP 參數擴展,增加一個參數 useHostname,若同時設置為 true,可同時保留主機名和 IP;另外支持自定義主機名和 IP 地址在 event header 里的 key,參數如下:

修改了類 HostInterceptor 中的構造方法和攔截方法,相關代碼如下:

addr = InetAddress.getLocalHost();nif (useIP) {n ip = addr.getHostAddress();n}nif (useHostname) {n hostname = addr.getCanonicalHostName();n}n

6.3 相關配置示例

agent.sources.s4.interceptors = i1nagent.sources.s4.interceptors.i1.type = hostnagent.sources.s4.interceptors.i1.useIP = truenagent.sources.s4.interceptors.i1.useHostname = truenagent.sources.s4.interceptors.i1.ip = ipnagent.sources.s4.interceptors.i1.hostname = hostnamen

總結

目前上述 4 個 Patch 在我行 A 類和 B 類生產系統已實際運行使用,「擁抱開源,回饋開源」,我們用的是開源軟體,我們希望也能對開源軟體做出貢獻。後續我們將分享我行 ELK 日誌平台架構演進的詳細細節,敬請大家關注!

作者介紹:

文喬,工作於中國民生銀行總行信息技術部大數據基礎產品平台組,負責行內大數據管控平台的開發,天眼日誌平台主要參與人。微信 tinawenqiao,郵箱 wenqiao@cmbc.com.cn。


-全文完-

人工智慧已不再停留在大家的想像之中,各路大牛也都紛紛抓住這波風口,投入AI創業大潮。那麼,2017年,到底都有哪些AI落地案例呢?機器學習、深度學習、NLP、圖像識別等技術又該如何用來解決業務問題?

2018年1月11-14日,AICon全球人工智慧技術大會上,一些大牛將首次分享AI在金融、電商、教育、外賣、搜索推薦、人臉識別、自動駕駛、語音交互等領域的最新落地案例,應該能學到不少東西。目前大會8折報名倒計時,更多精彩可點擊閱讀原文詳細了解。

t.cn/Rl2MftP

推薦閱讀:

1-遊走HBase的Block Cache
科技與商業管理策略的結合:約翰霍普金斯大學MSIS項目
微軟、IBM、英特爾、三星、紅帽歡迎谷歌入群,怎麼來了一條狗?
從hive到Spark SQL
深度學習教程 Part 1

TAG:大数据 | 中国民生银行 |