etcd Watch實現筆記
最近要給項目實現一個類似於etcd的Watch功能。為此先研究下etcd如何實現Watch,順便把研究結果記錄下來作為學習筆記。筆記基於v3.3.0。
API
Watch和其它etcd v3 API一樣,都是基於protobuf定義:
message WatchRequest { // request_union is a request to either create a new watcher or cancel an existing watcher. oneof request_union { WatchCreateRequest create_request = 1; WatchCancelRequest cancel_request = 2; }}message WatchResponse { ResponseHeader header = 1; // watch_id is the ID of the watcher that corresponds to the response. int64 watch_id = 2; // created is set to true if the response is for a create watch request. // The client should record the watch_id and expect to receive events for // the created watcher from the same stream. // All events sent to the created watcher will attach with the same watch_id. bool created = 3; // canceled is set to true if the response is for a cancel watch request. // No further events will be sent to the canceled watcher. bool canceled = 4; // compact_revision is set to the minimum index if a watcher tries to watch // at a compacted index. // // This happens when creating a watcher at a compacted revision or the watcher cannot // catch up with the progress of the key-value store. // // The client should treat the watcher as canceled and should not try to create any // watcher with the same start_revision again. int64 compact_revision = 5; // cancel_reason indicates the reason for canceling the watcher. string cancel_reason = 6; repeated mvccpb.Event events = 11;}service Watch { // Watch watches for events happening or that have happened. Both input and output // are streams; the input stream is for creating and canceling watchers and the output // stream sends events. One watch RPC can watch on multiple key ranges, streaming events // for several watches at once. The entire event history can be watched starting from the // last compaction revision. rpc Watch(stream WatchRequest) returns (stream WatchResponse) { option (google.api.http) = { post: "/v3beta/watch" body: "*" }; }}
相對應的HTTP API通過grpc-gateway
自動生成的代碼實現。其本質為將HTTP request翻譯為grpc request,並由grpc client直接發給本地的grpc server。注意,因為grpc-gateway
不支持real bidirectional stream, 所以需要使用websocket(#8237)做一層proxy:
httpmux.Handle( "/v3beta/", wsproxy.WebsocketProxy( // grpc-websocket-proxy gwmux, wsproxy.WithRequestMutator( // Default to the POST method for streams func(incoming *http.Request, outgoing *http.Request) *http.Request { outgoing.Method = "POST" return outgoing }, ), ), )
websocket使得client和server可以雙向發送信息,其概念和用法可以參考這篇文章。
實現
API的實現由watchServer
完成(etcdserver/api/v3rpc/watch.go),其核心為WatchableKV
。
// WatchableKV is a KV that can be watched.type WatchableKV interface { KV Watchable}// Watchable is the interface that wraps the NewWatchStream function.type Watchable interface { // NewWatchStream returns a WatchStream that can be used to // watch events happened or happening on the KV. NewWatchStream() WatchStream}
這幾個組件間的關係大致如圖:
watchServer
通過gRPC生成的etcdserverpb
完成和用戶的交互,一個用戶對應一個gRPC stream。watchServer
中的Recv routine會不斷從gRPC stream獲得用戶請求,並且根據請求向WatchStream
發出Create/Cancel Watch的命令。同時其會通過golang channel向Send routine發送control msg,Send routine根據message內容進行處理(比如Cancel Watch時的一些內存清理)再將結果返回給用戶。同時Send routine也會通過channel監聽從WatchStream
發來的events,並將其轉發給用戶。之所以在WatchServer
和WatchableKV
間加入了一層WatchStream
而不是直接調用,是因為WatchableKV
在接到watch請求後會返回一個watcher
。watcher
更加flexible,比如不同的watcher
可以向不同的channel發送events,但是caller也需要管理這些的watcher
的lifecycle。WatchStream
就起到了管理這些watcher
lifecycle的作用。同時它也隱藏了一些watchServer
所不需要的flexibility,比如對於WatchServer
來說所有watcher
向一個channel發送events就已足夠。這在不降低WatchKV
extensibility的同時,使得WatchServer
提供的Watch功能更易於上層使用。WatchableKV
繼承了KV
,這兩個類因篇幅限制在此不展開討論,以後有機會再介紹。
這裡的設計很好的體現了兩個設計原則:單一功能原則和開閉原則。
從單一功能原則來說,這裡的每個類/routine都專註於實現一個功能。KV
實現了Key-Value store(mvcc/kvstore.go)。WatchableKV
專註實現watch KV
變化(mvcc/watchable_store.go)。WatchStream
負責管理WatchableKV
生成的watcher。watchServer
將WatchStream
傳遞的變化通過etcdserverpb
傳達給用戶(etcdserver/api/v3rpc/watch.go)。就連WatcherServer
中Send和Recv兩個goroutine都只分別負責Send和Recv,需要協調便通過control message。
從開閉原則來看,這些component無需修改代碼而可以很容易的通過composition、繼承等方法實現功能拓展。比如說我們需要一個WatchStream
的新功能:events不再通過一個channel返回,而是經由多個channel分發,同時保證同一個key的events順序不變。那麼我們無需修改WatchStream
代碼,而是可以直接創建一個新類MultiWatchStream
來實現這個功能(正如上文提到,WatchableKV
返回的watcher本身足夠flexible來完成events的分發)。假設我們沒有WatchStream
這個類而是直接把這層邏輯寫進了WatchableKV
的話,要實現同樣的新功能很可能就得修改WatchableKV
的代碼。而修改代碼的風險比增加代碼的風險要高,因為其可能會影響現有代碼。
Scalablity
每個client的Watch request都會建立一個gRPC stream。同時每watch一個key都會在WatchableKV
中創建額外的watcher。watch key的數量多了,這對watch的scalability會造成一定的影響。在etcd 3.2中,etcd通過gRPC proxy提升了其scalabili (link)。client將不直接connect到etcd server,而是先連接到gRPC proxy。proxy會將對同一個key的多次watch合併成etcd server上的一次Watch來減少對etcd的壓力。etcd repo里提供了一張架構圖,為了閱讀方便摘抄如下(原鏈接):
+-------------+ | etcd server | +------+------+ ^ watch key A (s-watcher) | +-------+-----+ | gRPC proxy | <-------+ | | | ++-----+------+ |watch key A (c-watcher)watch key A ^ ^ watch key A |(c-watcher) | | (c-watcher) | +-------+-+ ++--------+ +----+----+ | client | | client | | client | | | | | | | +---------+ +---------+ +---------+
這種方法針對leader election(多個client watch少數key)會有更好的效果。而對需要watch大量不同key的情況,作用有限。
推薦閱讀: