kubernetes1.9源碼閱讀 replication controller的Informer機制
replication controller是kube-controller-manager中一個重要的控制器,主要是rs進行控制,確保pods的數量恰好和rs的規定一致。因此replication controller主要對這兩類進行watch,一類是replicationset,另一類是pods。本文是replication controller的源碼閱讀筆記,會包括client-go的Informer機制,希望幫助開始閱讀kubernetes源碼的小夥伴們,更希望與對kubernetes源碼閱讀感興趣的小夥伴兒們交流,有錯誤的地方也希望能指出,共同進步。
類圖示意圖
入口程序
cmd/kube-controller-manager/controller-manager.go main
- 調用options.NewCMServer,構建CMServer;
- 調用app.Run方法,運行CMServer;
啟動CMServer
cmd/kube-controller-manager/app/controllermanager.go Run
- 調用createClients, 創建apiserver客戶端,通過REST方式訪問APIserver提供的API服務;
- 啟動協程調用go startHTTP,運行http Server;
- 調用record.NewBroadcaster,創建eventBroadcaster對象,接收EventBroadcaster發送的event,輸出到logging中,並輸出到EventSink,並使用recorder記錄"controller-mananger"的事件;
- 調用CreateControllerContext,在CreateControllerContext方法中,會調用informers.NewSharedInformerFactory,創建sharedInformerFactory(client-go/informers/factory.go)對象;
- 調用StartControllers,啟動Controllers;
- 調用ctx.InformerFactory.Start,在這裡是調用sharedInformerFactory.start(client-go/informers/factory.go);
7. saTokenControllerInitFunc和NewControllerInitializers定義了controllers的InitFunc;
startReplicationController
cmd/kube-controller-manager/app/core.go startReplicationController
- 協程啟動調用replicationcontroller.NewReplicationManager,構建ReplicationManager;
- 調用ctx.InformerFactory.Core().V1().Pods(),這裡調用sharedInformerFactory(client-go/informers/factory.go)對象的Core().V1().Pods()方法,將會構建PodInformer對象,
- 以此方式,創建ReplicationControllersInformer;
- 運行ReplicationManager;
NewReplicationManager
pkg/controller/replication/replication_set.go NewReplicationManager
- 在NewReplicationManager中,
- 首先,調用record.NewBroadcaster,創建eventBroadcaster對象,調用eventBroadcaster.StartLogging,接收EventBroadcaster發送的event,輸出到logging中;調用 eventBroadcaster.StartRecordingToSink,event輸出到EventSink,並調用eventBroadcaster.NewRecorder記錄"replication-controller"的事件;
- 將調用NewBaseController;
- 在NewBaseController方法中,
- 構建ReplicaSetController對象,包括了podControl,它定義了對Pod的操作,是由RealPodControl去調用apiserver完成創建實現;
- 將調用 rsInformer.Informer().AddEventHandler,這將調用rsInformer的構造函數NewReplicaSetInformer,rsInformer將event handler包裝成listerner,然後添加到s.processor.listeners中,並定義對象處理的回調函數AddFunc、UpdateFunc、DeleteFunc;
- 同時,調用rsInformer的Lister方法;
- 最後,調用rsInformer.Informer().HasSynced,判斷是否緩存完成;
- 以此方式,調用podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;
- 設置rsc.syncHandler;syncHandler負責pod與rc的同步,確保Pod副本數與rc規定的相同;
PodInformer
client-go/informers/core/v1/pod.go NewPodInformer
- 構建cache.listWatch對象,定義了ListFunc和WatchFunc;
- 調用cache.NewSharedIndexInformer;
NewSharedIndexInformer
client-go/tools/cache/shared_informer.go NewSharedIndexInformer
運行ReplicationManager
pkg/controller/replicaset/replica_set.go Run
- 調用controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,將調用ca che.WaitForCacheSync;
- 調用rsc.worker, 將啟動workers調用rsc.syncHandler,syncHandler負責pod與rc的同步,確保Pod副本數與rc規定的相同;
sharedInformerFactory.Start
client-go/informers/factory.go Start
- 調用Informer.Run,這裡調用SharedIndexInformer.Run;
SharedIndexInformer.Run
client-go/tools/cache/shared_informer.go Run
- 調用NewDeltaFIFO,創建queue;
- 定義Deltas處理函數s.HandleDeltas;
- 調用New(cfg),構建sharedIndexInformer的controller;
- 調用s.cacheMutationDetector.Run,檢查緩存對象是否變化;
- 調用s.processor.run,將調用sharedProcessor.run,會調用Listener.run和Listener.pop,執行處理queue的函數;
- 調用s.controller.Run,構建Reflector,進行對etcd的緩存;
sharedIndexedInformer.controller.Run
client-go/tools/cache/controller.go Run
- 調用NewReflector,構建Reflector;
- Reflector對象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;
- 調用r.run,將調用reflector.ListAndWatch,執行r.List、r.watch、r.watchHandler,進行對etcd的緩存;
- 調用c.processLoop,reflector向queue裡面添加數據,processLoop會不停去消費這裡這些數據;
controller.processLoop
client-go/tools/cache/controller.go processLoop
- cache.PopProcessFunc(c.config.Process)將前面Process函數傳遞進去;
DeltaFIFO.Pop
client-go/tools/cache/delta_fifo.go Pop
- 主要從f.items取出object,然後調用process函數進行處理;
處理DeltaFIFO
client-go/tools/cache/shared_informer.go HandleDeltas
- 調用s.process.distribute,將調用Listener.add,負責將watch的資源傳到listener;
Listener.add/pop/run
client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;
- listenser的add函數負責將notify裝進pendingNotifications;
- pop函數取出pendingNotifications的第一個nofify,輸出到nextCh channel;
- run函數則負責取出notify,然後根據notify的類型(增加、刪除、更新)觸發相應的處理函數,這些函數在ReplicaSetController註冊,分別是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet
rsc.addPod
pkg/controller/replicaset/replica_set.go addPod
- 首先會根據pod返回rc,當pod不屬於任何rc時,則返回。找到rc以後,更新rm.expectations.CreationObserved這個rc的期望值,也就是假如一個rc有4個pod,現在檢測到創建了一個pod,則會將這個rc的期望值減少,變為3。然後將這個rc放入隊列;
- 調用rsc.enqueueReplicaSet,將調用rsc.queue.Add;
rsc.worker
pkg/controller/replicaset/replica_set.go worker()
- 調用rsc.syncHandler,這裡會調用rsc.syncReplicaSet,syncReplicaSet負責pod與rc的同步,確保Pod副本數與rc規定的相同;
推薦閱讀:
※docker + kubernetes=共生?相愛相殺?
※Kubernetes v1.7新特性解析-本地數據卷
※梁勝關於容器的年終總結,沒再提Docker
※有人在嘗試使用Kubernetes嗎?
※使用Heapster和Splunk監控Kubernetes運行性能
TAG:Kubernetes |