標籤:

kubernetes1.9源碼閱讀 replication controller的Informer機制

replication controller是kube-controller-manager中一個重要的控制器,主要是rs進行控制,確保pods的數量恰好和rs的規定一致。因此replication controller主要對這兩類進行watch,一類是replicationset,另一類是pods。本文是replication controller的源碼閱讀筆記,會包括client-go的Informer機制,希望幫助開始閱讀kubernetes源碼的小夥伴們,更希望與對kubernetes源碼閱讀感興趣的小夥伴兒們交流,有錯誤的地方也希望能指出,共同進步。

類圖示意圖

入口程序

cmd/kube-controller-manager/controller-manager.go main

  1. 調用options.NewCMServer,構建CMServer;
  2. 調用app.Run方法,運行CMServer;

啟動CMServer

cmd/kube-controller-manager/app/controllermanager.go Run

  1. 調用createClients, 創建apiserver客戶端,通過REST方式訪問APIserver提供的API服務;
  2. 啟動協程調用go startHTTP,運行http Server;
  3. 調用record.NewBroadcaster,創建eventBroadcaster對象,接收EventBroadcaster發送的event,輸出到logging中,並輸出到EventSink,並使用recorder記錄"controller-mananger"的事件;
  4. 調用CreateControllerContext,在CreateControllerContext方法中,會調用informers.NewSharedInformerFactory,創建sharedInformerFactory(client-go/informers/factory.go)對象;
  5. 調用StartControllers,啟動Controllers;
  6. 調用ctx.InformerFactory.Start,在這裡是調用sharedInformerFactory.start(client-go/informers/factory.go);

7. saTokenControllerInitFunc和NewControllerInitializers定義了controllers的InitFunc;

startReplicationController

cmd/kube-controller-manager/app/core.go startReplicationController

  1. 協程啟動調用replicationcontroller.NewReplicationManager,構建ReplicationManager;
    1. 調用ctx.InformerFactory.Core().V1().Pods(),這裡調用sharedInformerFactory(client-go/informers/factory.go)對象的Core().V1().Pods()方法,將會構建PodInformer對象,
    2. 以此方式,創建ReplicationControllersInformer;
  2. 運行ReplicationManager;

NewReplicationManager

pkg/controller/replication/replication_set.go NewReplicationManager

  1. 在NewReplicationManager中,
    1. 首先,調用record.NewBroadcaster,創建eventBroadcaster對象,調用eventBroadcaster.StartLogging,接收EventBroadcaster發送的event,輸出到logging中;調用 eventBroadcaster.StartRecordingToSink,event輸出到EventSink,並調用eventBroadcaster.NewRecorder記錄"replication-controller"的事件;
    2. 將調用NewBaseController;
  2. 在NewBaseController方法中,
    1. 構建ReplicaSetController對象,包括了podControl,它定義了對Pod的操作,是由RealPodControl去調用apiserver完成創建實現;
    2. 將調用 rsInformer.Informer().AddEventHandler,這將調用rsInformer的構造函數NewReplicaSetInformer,rsInformer將event handler包裝成listerner,然後添加到s.processor.listeners中,並定義對象處理的回調函數AddFunc、UpdateFunc、DeleteFunc;
    3. 同時,調用rsInformer的Lister方法;
    4. 最後,調用rsInformer.Informer().HasSynced,判斷是否緩存完成;
    5. 以此方式,調用podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;
    6. 設置rsc.syncHandler;syncHandler負責pod與rc的同步,確保Pod副本數與rc規定的相同;

PodInformer

client-go/informers/core/v1/pod.go NewPodInformer

  1. 構建cache.listWatch對象,定義了ListFunc和WatchFunc;
  2. 調用cache.NewSharedIndexInformer;

NewSharedIndexInformer

client-go/tools/cache/shared_informer.go NewSharedIndexInformer

運行ReplicationManager

pkg/controller/replicaset/replica_set.go Run

  1. 調用controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,將調用ca che.WaitForCacheSync;
  2. 調用rsc.worker, 將啟動workers調用rsc.syncHandler,syncHandler負責pod與rc的同步,確保Pod副本數與rc規定的相同;

sharedInformerFactory.Start

client-go/informers/factory.go Start

  1. 調用Informer.Run,這裡調用SharedIndexInformer.Run;

SharedIndexInformer.Run

client-go/tools/cache/shared_informer.go Run

  1. 調用NewDeltaFIFO,創建queue;
  2. 定義Deltas處理函數s.HandleDeltas;
  3. 調用New(cfg),構建sharedIndexInformer的controller;
  4. 調用s.cacheMutationDetector.Run,檢查緩存對象是否變化;
  5. 調用s.processor.run,將調用sharedProcessor.run,會調用Listener.run和Listener.pop,執行處理queue的函數;
  6. 調用s.controller.Run,構建Reflector,進行對etcd的緩存;

sharedIndexedInformer.controller.Run

client-go/tools/cache/controller.go Run

  1. 調用NewReflector,構建Reflector;
    1. Reflector對象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;
    2. 調用r.run,將調用reflector.ListAndWatch,執行r.List、r.watch、r.watchHandler,進行對etcd的緩存;
    3. 調用c.processLoop,reflector向queue裡面添加數據,processLoop會不停去消費這裡這些數據;

controller.processLoop

client-go/tools/cache/controller.go processLoop

  1. cache.PopProcessFunc(c.config.Process)將前面Process函數傳遞進去;

DeltaFIFO.Pop

client-go/tools/cache/delta_fifo.go Pop

  1. 主要從f.items取出object,然後調用process函數進行處理;

處理DeltaFIFO

client-go/tools/cache/shared_informer.go HandleDeltas

  1. 調用s.process.distribute,將調用Listener.add,負責將watch的資源傳到listener;

Listener.add/pop/run

client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;

  1. listenser的add函數負責將notify裝進pendingNotifications;
  2. pop函數取出pendingNotifications的第一個nofify,輸出到nextCh channel;
  3. run函數則負責取出notify,然後根據notify的類型(增加、刪除、更新)觸發相應的處理函數,這些函數在ReplicaSetController註冊,分別是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet

rsc.addPod

pkg/controller/replicaset/replica_set.go addPod

  1. 首先會根據pod返回rc,當pod不屬於任何rc時,則返回。找到rc以後,更新rm.expectations.CreationObserved這個rc的期望值,也就是假如一個rc有4個pod,現在檢測到創建了一個pod,則會將這個rc的期望值減少,變為3。然後將這個rc放入隊列;
  2. 調用rsc.enqueueReplicaSet,將調用rsc.queue.Add;

rsc.worker

pkg/controller/replicaset/replica_set.go worker()

  1. 調用rsc.syncHandler,這裡會調用rsc.syncReplicaSet,syncReplicaSet負責pod與rc的同步,確保Pod副本數與rc規定的相同;


推薦閱讀:

docker + kubernetes=共生?相愛相殺?
Kubernetes v1.7新特性解析-本地數據卷
梁勝關於容器的年終總結,沒再提Docker
有人在嘗試使用Kubernetes嗎?
使用Heapster和Splunk監控Kubernetes運行性能

TAG:Kubernetes |