jstorm/storm消息在spout和bolt間丟失?
02-07
1、spout從RocketMQ讀取消息,發送給bolt。
collector.emit(new Values(new String(msg.getBody())),msg.getMsgId());2、spout進行emit之前和ack(或bolt的execute方法開始處)、fail時分別入3個介面表。3、模擬持續發送1000條消息到RocketMQ,然後中間故意kill Topology並重啟幾次,結果介面表的數據量不一致。(emit1000條,ack幾百條,fail零條)
也就是消息從spout到bolt中間丟失了,甚至能丟失幾百條。用storm也做過測試也一樣的。不知道是否我的處理有問題,如何能保證中間不丟消息,謝謝!
回答這個問題前必須要了解:
(1)整個JStorm kill topology的流程
(2)一個優秀的Spout應該具備什麼能力?然後再來回答, 在任何情況下,不丟失數據
(1)JStorm Kill Topology 流程
jstorm的kill topology流程和storm的kill topology 流程完全不一樣, 做了大量優化,就是防止隨意kill topology時不丟失數據。1.1 nimbus收到kill topology命令後, 將topology 狀態設置為Killed
1.2 Worker監聽ZK 的topology狀態,當發現topology不是active狀態時, 把topology狀態設置為deactive狀態, 當topology 狀態為deactive時, spout 不再執行nextTuple1.3 如果kill topology沒有設置等待時間(jstorm kill $TopologyName xxx, xxx 就是等待時間), nimbus就會等待topology.message.timeout.secs 時間後(如果設置了等待時間,就等待設定時間), 執行zk remove操作。1.4 supervisor 發現zk上topology 被remove後, 對本supervisor上所有該topology 的worker執行一遍kill 命令1.5 supervisor等待task.cleanup.timeout.sec時間後(默認10秒)1.6 supervisor 對本supervisor上每個worker執行5次kill -9 命令,保證該worker徹底被殺掉。
(2) Spout應該如何設計
一個想達到任何情況下不丟失數據的Spout, 不局限於RocketMq和Kafka 的Spout, 必須要滿足幾個條件:2.1 所有發送tuple的操作必須在nextTuple中執行2.2 打開acker機制(怎麼打開acker機制, 麻煩直接閱讀jstorm的wiki 尋求幫助), 並且當spout成功執行ack函數後, 才真正把RocketMq/Kafka的偏移量向前移。
建議使用 官方的rocketmq插件jstorm/MetaSpout.java at master · alibaba/jstorm · GitHub
並在使用該插件時:2.3 打開流控, MetaClientConfig.META_SPOUT_FLOW_CONTROL 設置為true2.4 關閉autoAck, MetaClientConfig.META_SPOUT_AUTO_ACK 設置為false(3)Topology應該做一些額外的設置3.1 打開acker機制,(如何打開acker機制,自己查看jstorm wiki)3.2 設置task.cleanup.timeout.sec 保證worker有足夠的時間將cache的數據flush到DB, 並且保證task 定時或在cleanup時肯定flush到DB3.3 kill topology時, 不要設置等待時間,或者設置大於topology.message.timeout.secs的等待時間。推薦閱讀:
※kafka-connect和kafka-stream的使用場景?
TAG:ApacheStorm |