Kubernetes Scheduler分析

調度是Kubernetes集群中進行容器編排工作重要的一環,在Kubernetes中,Controller Manager負責Pod等副本管理,Kubelet負責拉起Pod,而Scheduler就是負責安排Pod到具體的Node,它通過API Server提供的介面監聽Pods,獲取待調度pod,然後根據一系列的預選策略和優選策略給各個Node節點打分排序,然後將Pod調度到得分最高的Node節點上,然後由kubelet負責拉起Pod。架構流程如下:

如果Pod中指定了NodeName屬性,則無需Scheduler參與,Pod會直接被調度到NodeName指定的Node節點

1 調度策略

Kubernetes的調度策略分為Predicates(預選策略)和Priorites(優選策略),整個調度過程分為兩步:

  1. 預選策略,是強制性規則,遍歷所有的Node,按照具體的預選策略篩選出符合要求的Node列表,如沒有Node符合Predicates策略規則,那該Pod就會被掛起,直到有Node能夠滿足;
  2. 優選策略,在第一步篩選的基礎上,按照優選策略為待選Node打分排序,獲取最優者;

1.1 預選策略

隨著版本的演進Kubernetes支持的Predicates策略逐漸豐富,v1.0版本僅支持4個策略,v1.7支持15個策略,Kubernetes(v1.7)中可用的Predicates策略有:

  • MatchNodeSelector:檢查Node節點的label定義是否滿足Pod的NodeSelector屬性需求
  • PodFitsResources:檢查主機的資源是否滿足Pod的需求,根據實際已經分配的資源(request)做調度,而不是使用已實際使用的資源量做調度
  • PodFitsHostPorts:檢查Pod內每一個容器所需的HostPort是否已被其它容器佔用,如果有所需的HostPort不滿足需求,那麼Pod不能調度到這個主機上
  • HostName:檢查主機名稱是不是Pod指定的NodeName
  • NoDiskConflict:檢查在此主機上是否存在卷衝突。如果這個主機已經掛載了卷,其它同樣使用這個卷的Pod不能調度到這個主機上,不同的存儲後端具體規則不同
  • NoVolumeZoneConflict:檢查給定的zone限制前提下,檢查如果在此主機上部署Pod是否存在卷衝突
  • PodToleratesNodeTaints:確保pod定義的tolerates能接納node定義的taints
  • CheckNodeMemoryPressure:檢查pod是否可以調度到已經報告了主機內存壓力過大的節點
  • CheckNodeDiskPressure:檢查pod是否可以調度到已經報告了主機的存儲壓力過大的節點
  • MaxEBSVolumeCount:確保已掛載的EBS存儲卷不超過設置的最大值,默認39
  • MaxGCEPDVolumeCount:確保已掛載的GCE存儲卷不超過設置的最大值,默認16
  • MaxAzureDiskVolumeCount:確保已掛載的Azure存儲卷不超過設置的最大值,默認16
  • MatchInterPodAffinity:檢查pod和其他pod是否符合親和性規則
  • GeneralPredicates:檢查pod與主機上kubernetes相關組件是否匹配
  • NoVolumeNodeConflict:檢查給定的Node限制前提下,檢查如果在此主機上部署Pod是否存在卷衝突

已註冊但默認不載入的Predicates策略有:

  • PodFitsHostPorts
  • PodFitsResources
  • HostName
  • MatchNodeSelector

PS:此外還有個PodFitsPorts策略(計劃停用),由PodFitsHostPorts替代

1.2 優選策略

同樣,Priorites策略也在隨著版本演進而豐富,v1.0版本僅支持3個策略,v1.7支持10個策略,每項策略都有對應權重,最終根據權重計算節點總分,Kubernetes(v1.7)中可用的Priorites策略有:

  • EqualPriority:所有節點同樣優先順序,無實際效果
  • ImageLocalityPriority:根據主機上是否已具備Pod運行的環境來打分,得分計算:不存在所需鏡像,返回0分,存在鏡像,鏡像越大得分越高
  • LeastRequestedPriority:計算Pods需要的CPU和內存在當前節點可用資源的百分比,具有最小百分比的節點就是最優,得分計算公式:cpu((capacity – sum(requested)) * 10 / capacity) + memory((capacity – sum(requested)) * 10 / capacity) / 2
  • BalancedResourceAllocation:節點上各項資源(CPU、內存)使用率最均衡的為最優,得分計算公式:10 – abs(totalCpu/cpuNodeCapacity-totalMemory/memoryNodeCapacity)*10
  • SelectorSpreadPriority:按Service和Replicaset歸屬計算Node上分布最少的同類Pod數量,得分計算:數量越少得分越高
  • NodeAffinityPriority:節點親和性選擇策略,提供兩種選擇器支持:requiredDuringSchedulingIgnoredDuringExecution(保證所選的主機必須滿足所有Pod對主機的規則要求)、preferresDuringSchedulingIgnoredDuringExecution(調度器會盡量但不保證滿足NodeSelector的所有要求)
  • TaintTolerationPriority:類似於Predicates策略中的PodToleratesNodeTaints,優先調度到標記了Taint的節點
  • InterPodAffinityPriority:pod親和性選擇策略,類似NodeAffinityPriority,提供兩種選擇器支持:requiredDuringSchedulingIgnoredDuringExecution(保證所選的主機必須滿足所有Pod對主機的規則要求)、preferresDuringSchedulingIgnoredDuringExecution(調度器會盡量但不保證滿足NodeSelector的所有要求),兩個子策略:podAffinity和podAntiAffinity,後邊會專門詳解該策略
  • MostRequestedPriority:動態伸縮集群環境比較適用,會優先調度pod到使用率最高的主機節點,這樣在伸縮集群時,就會騰出空閑機器,從而進行停機處理。

已註冊但默認不載入的Priorites策略有:

  • EqualPriority
  • ImageLocalityPriority
  • MostRequestedPriority

PS:此外還有個ServiceSpreadingPriority策略(計劃停用),由SelectorSpreadPriority替代

2 源碼解析

scheduler在整個集群中負責pod的調度,對機器(node)進行篩選和過濾,選擇最合適的機器運行pod。這部分組件是以插件的形式存在的,如果需要定製調度演算法也是比較方便

scheduler是作為plugin放在k8s裡面,代碼在/plugin下面。代碼結構如下:

├── cmdn│ └── kube-schedulern│ ├── appn│ │ └── server.gon│ └── scheduler.gon└── pkg n └── schedulern ├── algorithmn...n

還是按照一貫的風格, main函數在plugin/cmd目錄下面, 新建app.NewSchedulerServer(),解析命令行參數,執行Run函數。

在Run這個介面,首先是生成masterClient的配置, 因為配置指定會有多種方式,因此需要進行合併,規則如下:

  • 啟動的時候通過kubeconfig顯示(ExplicitPath)指定, 優先順序最高,其次是Precedence欄位指定的其他文件,優先順序以此降低,也就是先解析的配置覆蓋後面解析的配置項,但是如果遇到新的配置項,就會被加入。
  • ClusterInfo: clientcmdapi.Cluster{Server: s.Master} 指定 api-server(master)的信息,這裡會覆蓋前面文件指定的集群信息
  • .kubeconfig 文件,如果解析到相對路徑,就會以.kubeconfig的父文件夾為父路徑,合併成絕對路徑

合併完配置, 新建一個RESTClient,

kubeClient, err := client.New(kubeconfig)n...nconfigFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst))nconfig, err := s.createConfig(configFactory)n

createConfig會依次,直到調用

  • CreateFromConfig
  • CreateFromProvider
  • CreateFromKeys

來決策對應的調度演算法,默認情況下使用CreateFromProvider, providerName是AlgorithmProvider: factory.DefaultProvider, 也就是DefaultProvider調度演算法,然後調用CreateFromKeys,通過getFitPredicateFunctions獲得對應的調度演算法。 那這個演算法provider是什麼時候註冊進去的呢?

在./plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go的init可以看到

factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities())n

在這裡進行了實際的註冊,其中

  • defaultPredicates 主要進行過濾
  • PodFitsHostPorts 過濾埠衝突的機器
  • PodFitsResources 判斷是否有足夠的資源
  • NoDiskConflict 沒有掛載點衝突
  • MatchNodeSelector 指定相同標籤的node調度
  • HostName 指定機器調度
  • defaultPriorities 主要進行篩選
  • LeastRequestedPriority : 使用公式cpu((capacity - sum(requested)) * 10 / capacity) + memory((capacity - sum(requested)) * 10 / capacity) / 2 來計算node的score
  • BalancedResourceAllocation : score = 10 - abs(cpuFraction-memoryFraction)*10,cpuFraction是已經分配的除以整機的CPU比例,也就是說資源碎片越小得分越低,表示分配更「均衡」
  • SelectorSpreadPriority 降低聚集度,盡量的降低同一個service或者rc上的pods的數目,也就是在predicate的時候盡量的降低衝突的概率

每個步驟的得分進行相加,最後選出最高得分的機器,如果有多個相同得分的機器, 就從中隨機選擇一個。 作為pod的調度目標機器。

algo :=scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r)n

創建GenericScheduler,使用篩選和過濾的函數

func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm {n return &genericScheduler{n predicates: predicates,n prioritizers: prioritizers,n pods: pods,n random: random,n }n}n

GenericScheduler類實現了ScheduleAlgorithm介面的Schedule方法,Scheduler方式直接選出分配到的機器

type ScheduleAlgorithm interface {n //選出machinen Schedule(*api.Pod, NodeLister) (selectedMachine string, err error)n}n

回到創建配置文件的方法中:初始化scheduler.Config的NextPod介面,這裡就是每次調度的數據源。最後返回scheduler.config,配置創建完成

// Creates a scheduler from a set of registered fit predicate keys and priority keys.nnfunc (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String) (*scheduler.Config, error) {n ...n NextPod: func() *api.Pod {n pod := f.PodQueue.Pop().(*api.Pod)n glog.V(2).Infof("About to try and schedule pod %v", pod.Name)n return podn }, n ...n}n

然後新建新建一個事件廣播器,同時監聽對應的事件,並且通過EventSink將其存儲,代碼如下:

eventBroadcaster := record.NewBroadcaster()nconfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})n

最後啟動scheduler,此次的config就是上面創建的config

sched := scheduler.New(config)nsched.Run()n

這裡就會啟動一個goroutine調用pkg/scheduler/scheduler.go文件的Run,也就是scheduleOne函數。

// Run begins watching and scheduling. It starts a goroutine and returns immediately.nfunc (s *Scheduler) Run() {n go util.Until(s.scheduleOne, 0, s.config.StopEverything)n}nn// Until loops until stop channel is closed, running f every period.n// Catches any panics, and keeps going. f may not be invoked ifn// stop channel is already closed. Pass NeverStop to Until if youn// dont want it stop.nfunc Until(f func(), period time.Duration, stopCh <-chan struct{}) {n for {n select {n case <-stopCh:n returnn default:n }n func() {n defer HandleCrash()n f()n }()n time.Sleep(period)n }n}nnfunc (s *Scheduler) scheduleOne() {n //首先獲取調度的podn pod := s.config.NextPod()n if s.config.BindPodsRateLimiter != nil {n //等待token變為可用狀態n s.config.BindPodsRateLimiter.Accept()n } nn glog.V(3).Infof("Attempting to schedule: %+v", pod)n start := time.Now()n defer func() {n metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))n }() n //進行實際調度,默認是調度演算法是上面提到的DefaultProvider,也就是執行具體的調度演算法n dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister)nn metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))n if err != nil {n glog.V(1).Infof("Failed to schedule: %+v", pod)n s.config.Recorder.Eventf(pod, "FailedScheduling", "%v", err)n s.config.Error(pod, err)n returnn } nn //將pod綁定到noden b := &api.Binding{n ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},n Target: api.ObjectReference{n Kind: "Node",n Name: dest,n }, n } n// We want to add the pod to the model if and only if the bind succeeds,n // but we dont want to race with any deletions, which happen asynchronously.n s.config.Modeler.LockedAction(func() {n bindingStart := time.Now()n // 發送調度結果給mastern err := s.config.Binder.Bind(b)n metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))n if err != nil {n glog.V(1).Infof("Failed to bind pod: %+v", err)n s.config.Recorder.Eventf(pod, "FailedScheduling", "Binding rejected: %v", err)n s.config.Error(pod, err)n returnn }n //記錄一條調度信息n s.config.Recorder.Eventf(pod, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest)n // tell the model to assume that this binding took effect.n assumed := *podn assumed.Spec.NodeName = destn //激活被調度的podn s.config.Modeler.AssumePod(&assumed)n })n}n

推薦閱讀:

攜程容器雲實踐
Kubernetes 在華為全球 IT 系統中的實踐 | 架構師實踐日
攜程容器雲優化實踐

TAG:Kubernetes | 容器云 | Docker |