標籤:

jstorm/storm消息在spout和bolt間丟失?

1、spout從RocketMQ讀取消息,發送給bolt。

collector.emit(new Values(new String(msg.getBody())),msg.getMsgId());

2、spout進行emit之前和ack(或bolt的execute方法開始處)、fail時分別入3個介面表。

3、模擬持續發送1000條消息到RocketMQ,然後中間故意kill Topology並重啟幾次,結果介面表的數據量不一致。(emit1000條,ack幾百條,fail零條)

也就是消息從spout到bolt中間丟失了,甚至能丟失幾百條。用storm也做過測試也一樣的。

不知道是否我的處理有問題,如何能保證中間不丟消息,謝謝!


回答這個問題前必須要了解:

(1)整個JStorm kill topology的流程

(2)一個優秀的Spout應該具備什麼能力?

然後再來回答, 在任何情況下,不丟失數據

(1)JStorm Kill Topology 流程

jstorm的kill topology流程和storm的kill topology 流程完全不一樣, 做了大量優化,就是防止隨意kill topology時不丟失數據。

1.1 nimbus收到kill topology命令後, 將topology 狀態設置為Killed

1.2 Worker監聽ZK 的topology狀態,當發現topology不是active狀態時, 把topology狀態設置為deactive狀態, 當topology 狀態為deactive時, spout 不再執行nextTuple

1.3 如果kill topology沒有設置等待時間(jstorm kill $TopologyName xxx, xxx 就是等待時間), nimbus就會等待topology.message.timeout.secs 時間後(如果設置了等待時間,就等待設定時間), 執行zk remove操作。

1.4 supervisor 發現zk上topology 被remove後, 對本supervisor上所有該topology 的worker執行一遍kill 命令

1.5 supervisor等待task.cleanup.timeout.sec時間後(默認10秒)

1.6 supervisor 對本supervisor上每個worker執行5次kill -9 命令,保證該worker徹底被殺掉。

(2) Spout應該如何設計

一個想達到任何情況下不丟失數據的Spout, 不局限於RocketMq和Kafka 的Spout, 必須要滿足幾個條件:

2.1 所有發送tuple的操作必須在nextTuple中執行

2.2 打開acker機制(怎麼打開acker機制, 麻煩直接閱讀jstorm的wiki 尋求幫助), 並且當spout成功執行ack函數後, 才真正把RocketMq/Kafka的偏移量向前移。

建議使用 官方的rocketmq插件jstorm/MetaSpout.java at master · alibaba/jstorm · GitHub

並在使用該插件時:

2.3 打開流控, MetaClientConfig.META_SPOUT_FLOW_CONTROL 設置為true

2.4 關閉autoAck, MetaClientConfig.META_SPOUT_AUTO_ACK 設置為false

(3)Topology應該做一些額外的設置

3.1 打開acker機制,(如何打開acker機制,自己查看jstorm wiki)

3.2 設置task.cleanup.timeout.sec 保證worker有足夠的時間將cache的數據flush到DB, 並且保證task 定時或在cleanup時肯定flush到DB

3.3 kill topology時, 不要設置等待時間,或者設置大於topology.message.timeout.secs的等待時間。


推薦閱讀:

kafka-connect和kafka-stream的使用場景?

TAG:ApacheStorm |