Bash 的威力

引子

工作中,我們常常要處理浩瀚如煙的日誌。有時候,看日誌是為了處理軟體中的 bug,更多的時候,是為了從中獲取有用的信息 —— 這信息可以用來做 behavior analysis,可以用來做 anomaly detection,甚至也可以做 data recovery。

這個周末兩天,家裡的活動不少,有一搭沒一搭的,在做一個 log parser。這 log parser 說難不難,就是解析幾十G 日誌文件,從中抓取一些特定的數據,存在資料庫中。日誌的格式很固定,是標準的 tsv,單條記錄解析起來非常容易;說簡單也不簡單,上百個文件都以 gzip 形式存儲在 AWS S3 上,想要高效地處理並不容易。

最直觀的方案是寫一個 spark job 去處理。在 spark 的生態圈裡,處理大量 gzip 文件非常簡便,幾十行代碼就能搞定。如果沒有立等可用的 spark cluster 呢?總不能為了這樣一個任務去配置一個 spark cluster 吧。這個方案我們且放下不表。

第二個方案是選用對 streaming 支持良好的語言來處理。S3 本身對 stream 支持很好,每個文件都可以一個 chunk 一個 chunk 地去讀取,讀下來的 chunk,再交給支持 stream 模式的 gzip module 解壓,然後對解壓後的數據的每一行,進行 tsv 的解析,寫入資料庫。這個方案聽起來簡單,實際操作起來小問題不少,尤其 gzip 解壓這塊,如果一個 chunk 解壓後的結果跨行,處理起來還是頗為頭疼的。

第三個方案是一次性 aws s3 sync 把要處理的文件拷貝到本地,然後挨個 gunzip 解壓,最後再一行行做 tsv 的解析,把結果寫入資料庫。

方案二的問題是代碼複雜,方案三雖然代碼簡單,但可操作性很差,佔用的磁碟空間太多不說,文件的來回讀寫還會大大延緩整個處理過程。

有沒有辦法結合方案二和方案三的優點?

有!我們只需要幾行 bash,再加上幾十行代碼寫的 line parser 即可。

使用 bash 和 pipe

bash 代碼如下:

#!/bin/bashBUCKET=tubitv-awesome-logsPREFIX=${1:-20171111}trap ctrl_c INTfunction ctrl_c() { echo "** User stopped the process with CTRL-C" exit $?}for key in `aws s3api list-objects --bucket $BUCKET --prefix $PREFIX | jq -r .Contents[].Key`do sleep 1 aws s3 cp s3://$BUCKET/$key - | gunzip -c | log_parserdone

這個 bash 腳本接受一個參數,是 S3 object 的 prefix(文件路徑),如果不傳,就用默認值。然後,我們用 aws s3api list-objects --bucket $BUCKET --prefix $PREFIX 這個 CLI 獲取 bucket 下所有含有 prefix 的 object。s3api 返回 JSON 格式的數據,我們可以用 jq 對其解析,獲取每個 object 的 Key,也就是文件名。注意這裡我們用了 pipe,aws CLI 的結果被 pipe 給了 jq。

稍微補充一下 pipe 的概念。在 unix 中,程序的輸出如果沒有特別指定,會輸出到一塊叫 stdout 的內存中。pipe 操作會將上一個程序的 stdout 的數據放入下一個程序的 stdin。

對於拿到的每個 key,我們循環處理,怎麼處理呢?

  1. 首先 aws s3 cp s3://$BUCKET/$key -。對於相關的文件,我們將其 cp 到 stdout。「-」 在這裡指代 stdout。
  2. 接下來,我們將拷貝下來的每個 chunk,pipe 給 gunzip -c。gunzip 處理 stdin 里的數據,解壓到 stdout —— 這裡 -c 告訴 gunzip 的輸出是 stdout。
  3. 最後,我們把 gunzip 的結果進一步 pipe 給我們自己撰寫的 log_parser。我們只需要妥善處理 log_parser,使其從 stdin 按行 讀取數據即可。

log_parser 可以用任意語言撰寫,由於其問題規模被 bash script 大大縮減,所以寫起來非常簡單,這裡放一個 elixir 的例子(具體 tsv 解析放下不表):

defp parse do case IO.read(:stdio, :line) do :eof -> :ok {:error, reason} -> IO.puts "Error: #{reason}" line -> try do line |> parse_line() |> process() rescue e -> IO.puts("Cannot parse: #{line}. error: #{inspect e}") endparse() endend

這段代碼從 stdin 裡面讀入一行,如果讀到 EOF,就結束,否則,會讀到一行數據。我們對這行數據進行處理即可。

漂亮不?這就是 unix 的美妙之處,當你 do one thing and do it well 時,整個生態系統會回饋你。

簡單不?夠簡單,但是,我們回頭再看看之前的 bash script,當你的機器上有 16 個 core,我們卻只能打滿一個 core 去處理數據,我們的內心深處還是對這種簡單感到不滿。能不能讓數據並發處理?

使用 xargs 讓任務並行處理

沒問題。我們不需要修改 log_parser,只需對 bash script 稍加調整:

#!/bin/bashBUCKET=tubitv-awesome-logsPREFIX=${1:-20171111}aws s3api list-objects --bucket $BUCKET --prefix $PREFIX | jq -r .Contents[].Key | xargs -n1 -P8 -I{} ./process_one.sh {}

之前的 bash script 主體部分不變,只不過我們使用 xargs -P8 來將若干個參數分別傳遞給一個新的腳本 process_one.sh 並行處理,並行的數量為 8。當然你可以將其換成 sysctl -n hw.ncpu (osx) 或者 nproc --all (linux) 使用具體的 core 數目。

process_one.sh 很簡單:

#!/bin/bashBUCKET=tubitv-awesome-logsKEY=$1aws s3 cp s3://$BUCKET/$KEY - | gunzip -c | edgecast_parser

這樣簡單處理之後,CPU 利用率一下子上來了:

到目前為止,我寫了 65 行 elixir 代碼處理 tsv,18 行 bash 腳本粘合一切,並在單機並發。很好很強大。如果全部用 elixir 撰寫,實現相同的功能,相同的能力,估計要幾百行代碼。事實上,周末我因為忘記了 databricks 的打開方式,沒法用 spark,只得搗鼓方案二,費了好大的功夫,額外引入了三四個庫,甚至修改了一個庫(elixir 沒有封裝地不錯的支持 stream 的 gunzip 的庫),代碼還只是個半成品。今天突然開竅,使用 bash offload 不必要的工作,僅花了一個小時(包括寫 test case),就搞定了上述的方案,且乾淨漂亮。

有同學說,這樣單機並發,還是有處理上的瓶頸 —— 其實如果問題真到了這一步,最好還是用 spark 解決。當然,用 erlang cluster 也是個不錯的替代方案。可以在一個集群下讓各個 elixir process 形成一個 cluster,然後其中一個 node 把每個 node 處理的 bucket 和 prefix 發布出去,然後各自執行上述的 bash script。當集群的機器數量是幾十這個量級,這個方案也算一個備選。

讀完本文,你也許會對我的這幾篇歷史文章感興趣:

  • 淺談unix之美
  • Pipe 之美
  • 從 Pipe 到 Flow

推薦閱讀:

功能安全中的軟體隔離(二)
軟考中級的軟體設計師難考嗎?
請問軟體開發與編程對電腦有什麼要求嗎?
有哪些硬體限制對軟體設計造成影響的例子?

TAG:Bash | 软件开发 | 软件设计 |