PlayScala技巧 - 實時同步MongoDB高可用方案

PlayScala技巧 - 實時同步MongoDB高可用方案

來自專欄 PlayScala開發

1 如何實時同步MongoDB?

MongoDB 從 3.6 開始為開發者提供了 Change Streams 功能,利用 Change Streams 功能可以非常方便地監聽指定 Collection 上的數據變化。例如在 mongo shell 中,我們可以通過如下方式監聽 shopping 資料庫 order 表上的變化:

watchCursor = db.getSiblingDB("shopping").order.watch()while (!watchCursor.isExhausted()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); }}

2 在Play中如何操作?

利用 Play Mongo 可以方便地實現數據監聽功能,並且我們可以將 Change Stream 轉換成 Akka Stream,然後以流的方式處理指定 Collection 上的數據變化,

mongo .collection[Order] .watch() .fullDocument .toSource .groupedWithin(10, 1000.millis) .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping) .runForeach{ seq => // ... }

上面的代碼實現了以下幾個功能:

  • 將從 Change Stream 接收到的元素進行緩衝,以方便批處理,當滿足下面任意一個條件時便結束緩衝向後傳遞:
    • 緩衝滿10個元素
    • 緩衝時間超過了1000毫秒
  • 對緩衝後的元素進行流控,每秒只允許通過1個元素

3 如何實現高可用?

上面的代碼並沒有考慮可用性,如果在監聽過程中發生了網路錯誤,如何從錯誤中恢復呢? 上面的實現代碼底層是基於官方的 mongo-java-driver 實現的,關於可用性官方文檔有如下描述:

Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the MongoCollection API includes a new watch method. The ChangeStreamIterable sets up the change stream and automatically attempts to resume if it encounters a potentially recoverable error.

文檔中提及程序可以自動從可恢復的錯誤中恢復。經測試驗證,如果網路中斷在 30 秒以內均屬於可恢復錯誤;但是如果大於 30 秒,則會報連接超時錯誤並且無法從錯誤中自動恢復:

com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=127.0.0.1:27117, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}] at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:401) at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:309) at com.mongodb.internal.connection.BaseCluster.access$800(BaseCluster.java:65) at com.mongodb.internal.connection.BaseCluster$WaitQueueHandler.run(BaseCluster.java:482) at java.lang.Thread.run(Thread.java:748)

幸運的是,Akka Stream 的 RestartSource 可以幫我們解決這種不可恢復錯誤,解決方式就是通過指數規避(exponential back-off)方式不斷重試。下面是一個通用的創建 RestartSource 的方法實現:

def restartSource(colName: String): Source[ChangeStreamDocument[JsObject], _] = { RestartSource.withBackoff( minBackoff = 3.seconds, maxBackoff = 10.seconds, randomFactor = 0.2, maxRestarts = 1000000 ) { () ? Logger.warn(s"Creating source for watching ${colName}.") mongo.collection(colName).watch().fullDocument.toSource }}

通過 Backoff 參數可以指定重試策略:

  • minBackoff 最小重試時間間隔
  • maxBackoff 最大重試時間間隔
  • randomFactor 設置一個隨機的浮動因子,使得每次計算的間隔有些許差異
  • maxRestarts 最大重試次數

當發生錯誤時,RestartSource 會嘗試重新創建一個 Source:

Logger.warn(s"Creating source for watching ${colName}.")mongo.collection(colName).watch().fullDocument.toSource

完整代碼如下:

val colName = "common-user"restartSource(colName) .groupedWithin(10, 1000.millis) .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping) .runForeach{ seq => try { Logger.info(seq.toString()) } catch { case t: Throwable => Logger.error(s"Watch change stream of ${colName} error: ${t.getMessage}", t) } }

需要注意的是 runForeach 中需要顯式捕獲異常並處理,否則會導致 Source 結束並退出。

轉載請註明來源:Scala & Akka & Play Framework 技術交流平台

推薦閱讀:

TAG:MongoDB | NoSQL | Play!框架 |