MongoDB-Elasticsearch 實時數據導入

搜索功能是App必不可少的一部分,我們使用目前比較流行的Elasticsearch進行全文檢索。我們的數據主要存儲在MongoDB中,如何將這些數據導入到Elasticsearch中,並能一直保持同步呢?做法大致分為兩種:

  1. 在應用層操作,在讀寫MongoDB的同時讀寫Elasticsearch,比如mongoosastic,需要修改已有的業務代碼。
  2. 與業務無關,通過讀取MongoDB的replica oplog,將MongoDB產生的操作在Elasticsearch上replay,來實現單向同步。

為了減少老代碼修改成本,我們選擇了第二種方案,使用mongo-connector來進行數據同步。然而用著用著我們發現mongo-connector有一些問題:

  1. 有些數據需要關聯查詢,但是mongo-connector並不支持parent-child模型(其實有一個fork是支持的,但已經落後主分支一個版本,並且合進主分支的希望渺茫)。

  2. mongo-connector支持斷點續傳,但是恢復速度非常緩慢。
  3. mongo-connector可以設置每次處理的文檔數量,但坑爹的地方在於,到不了設置的數字,它始終不會寫入。比如,MongoDB一個表只有100個文檔,但是設置了batch的size為1000,於是那100個文檔這輩子也同步不到Elasticsearch中了。
  4. mongo-connector不會限速,直接把Elasticsearch寫炸了,但它不會管,接著寫,而且中間丟掉的數據就算後面有oplog裡面有update操作,也沒辦法恢復,會報出404錯誤。
  5. 在MongoDB裡面存了一張meta表,在Elasticsearch裡面也存了一個meta索引,裡面存了大量的timestamp,直接使Elasticsearch文檔總數翻倍。

(以上mongo-connector的缺點,如有誹謗,或許是我們不會用,懇請斧正。)

於是我們開始尋找更好用的工具,卻發現沒有好用的工具:

  1. Elasticsearch Rivers,曾經的官方同步工具。但該項目早已廢棄。
  2. Transporter,IBM旗下的Compose公司出品的同步工具。也不支持parent-child relationship,並且項目進度緩慢。

  3. elasticsearch-hadoop,先導到hadoop,再導到Elasticsearch。高射炮打蚊子,繞一大圈,不經濟。

沒辦法,只好自己用TypeScript寫一個,取名為mongo-es

mongo-es導入數據分為兩個階段:

  1. Scan:掃描整個MongoDB的collection,每條文檔都插入到Elasticsearch對應的index裡面。使用Bulk API,進行批量寫入。在掃描開始前記錄當前的時間點,供第二階段使用。

  2. Tail:從剛才記錄的時間點,或一個指定的時間點開始,將MongoDB的oplog在Elasticsearch上進行replay。使用RxJS的bufferWithTimeOrCount函數,既能批量寫入,又能保證同步延遲不會很長(一般是一秒左右)。

mongo-es比mongo-connector進步的地方有:

  1. 支持parent-child relationship,可以處理需要join的數據。

  2. 可以逆序Scan,先導入最新的數據,這對於出錯後重建索引快速恢復非常有用。
  3. 無需在兩邊存儲多餘元數據,只記錄oplog的timestamp。只要程序掛的時間不太長,oplog裡面還有這個timestamp,就能恢復。
  4. 遇到缺失文檔自動恢復。當因為不可控因素(如網路原因),導致某個本應已經同步了的文檔在Elasticsearch中不存在。這時如果oplog裡面遇到一個對該文檔的update操作,mongo-connector無法處理,列印出404錯誤。遇到這種情況時,mongo-es會回到MongoDB中,讀取到這個文檔,進行更新。
  5. 有限速功能,能夠限制每秒鐘讀取的文檔數量,避免把Elasticsearch壓垮。

當然了,mongo-connector是一個更加通用的程序,可以把文檔導到更多的地方。mongo-es只是把MongoDB的數據導入到Elasticsearch中,這樣比較未免有些不公平,但就在MongoDB到Elasticsearch這個使用場景下,還是好用不少的。

開發過程中踩過的坑:

  1. Scan階段使用stream,方便控制讀取速度。Tail階段使用cursor,配合noCursorTimeout參數,避免長時間沒有oplog時的超時錯誤。Tail階段如果用stream,即使是設置了noCursorTimeout,超時了也會報錯。
  2. 對於操作是update的oplog,oplog裡面有可能是一個完整的文檔,這時候直接就可以寫入。也有可能是$set或$unset操作,這時候要去Elasticsearch裡面取到舊的,完整的文檔,在內存里執行update後再寫入回去。最好不要直接讀MongoDB,以減少MongoDB負擔。
  3. 在內存中執行update時,也要檢查變化的欄位是否屬於我們需要的欄位。如果變化的都不是需要的欄位,可以忽略這次update操作,如果變化的欄位不在我們需要的範圍內,則應排除,以減少寫入次數。
  4. 有_parent的文檔是不能直接用_id訪問到的,因為它的routing是_parent,必須指定_parent的值才行。對於操作是update的oplog,我們只能拿到_id,拿不到_parent對應的欄位,所以這時要用es.search代替es.get,訪問每個分片,才能拿到文檔。
  5. Timestamp在js代碼里表示時low在前,high在後。在mongo shell裡面是反過來的。
  6. Bulk API傳入的body長度不能為0,遇到0的情況要跳過,否則會報錯。

現已開源至github,並發布到了npm,歡迎大家多多試用,多挑(ti)毛(xu)病(qiu)。


推薦閱讀:

微軟校園Hackathon南京站 無駭客 不青春
煎蛋段子爬蟲prototype
MongoDB的安裝和配置
MongoDB 存儲引擎 mongorocks 原理解析

TAG:MongoDB | Elasticsearch | ETL |