容器編排Kubernetes之Heapster源碼解讀
- 莫名的採集不到某個node的數據,至今仍然懵逼,預估是時間差的鍋
- 莫名的得不到所有的Namespace(這是Grafana的坑,數據是採集到了的)
- 文檔與代碼不匹配,比如與apiserver相關的命令行參數
- HPA controller代碼有bug,已經提PR到官方,後續說明
因為有這些坑的存在,我們不得不深入源代碼去解讀heapster背後的故事。這篇文章就簡要的給大家分析其源碼,幫助大家加深理解。
Heapster官方代碼庫中其實包含兩個子項目,eventer和metrics。eventer從kubernetes集群中獲取事件列表並做持久化存儲。而metrics項目從集群節點讀取監控指標數據,處理後做持久化存儲,並提供API供其他組件使用。
eventer我還沒有接觸過,今天暫時不講。HPA controller主要使用的是metrics提供的API,我深究的也就是metrics的源碼。metrics其實簡單來說,分為了4部分,數據輸入source、數據處理processor、數據持久化sink及API。
入口函數
我們首先從入口函數看起(已精簡):
func main() {ntopt := options.NewHeapsterRunOptions()ntopt.AddFlags(pflag.CommandLine)ntflag.InitFlags() // 初始化命令行參數nntlogs.InitLogs()ntdefer logs.FlushLogs() // 初始化日誌nntsetLabelSeperator(opt) // label分隔符,默認逗號ntsetMaxProcs(opt) // 設置並行CPU數量ntif err := validateFlags(opt); err != nil {nttglog.Fatal(err)nt} // 檢查參數nntkubernetesUrl, err := getKubernetesAddress(opt.Sources)ntif err != nil {nttglog.Fatalf("Failed to get kubernetes address: %v", err)nt} // 獲取kubernetes的地址nntsourceManager := createSourceManagerOrDie(opt.Sources) // 解析source URL,生成一個source管理者對象nntsinkManager, metricSink, historicalSource := createAndInitSinksOrDie(opt.Sinks, opt.HistoricalSource) // 解析 sink URLs,生成數據導出對象。無論是否傳遞metric對象都會生成metric對象,其中指定為HistoricalSource URL的對象必須支持歷史數據查詢。nntpodLister, nodeLister := getListersOrDie(kubernetesUrl) // 創建pod和node列表監聽對象ntdataProcessors := createDataProcessorsOrDie(kubernetesUrl, podLister) // 創建一系列數據處理對象,諸如pod,namespace,node,clusternntman, err := manager.NewManager(sourceManager, dataProcessors, sinkManager,nttopt.MetricResolution, manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)ntif err != nil {nttglog.Fatalf("Failed to create main manager: %v", err)nt}ntman.Start() // 創建管理對象,將數據輸入、處理、導出集中管理nntif opt.EnableAPIServer {ntt// Run API server in a separate goroutinenttcreateAndRunAPIServer(opt, metricSink, nodeLister, podLister)nt} // 啟動apiserver,其作用還未搞清楚。好像是過渡期代碼nntmux := http.NewServeMux()ntpromHandler := prometheus.Handler()nthandler := setupHandlers(metricSink, podLister, nodeLister, historicalSource, opt.DisableMetricExport)nthealthz.InstallHandler(mux, healthzChecker(metricSink))ntif len(opt.TLSCertFile) > 0 && len(opt.TLSKeyFile) > 0 {nttstartSecureServing(opt, handler, promHandler, mux, addr)nt} else {nttmux.Handle("/", handler)nttmux.Handle("/metrics", promHandler)nttglog.Fatal(http.ListenAndServe(addr, mux))nt}n} // 設置健康檢查、監控及debug handlern
Manager
梳理完main函數,自然而然就知道我們的重點在manager上,深入manager的代碼,發現其主要邏輯就是持續去source讀取數據,然後讓數據通過一系列的processors,最後再將數據持久化到sink中。看代碼:
func (rm *realManager) Housekeep() {ntfor {ntt// Always try to get the newest metricsnttnow := time.Now()nttstart := now.Truncate(rm.resolution)nttend := start.Add(rm.resolution)ntttimeToNextSync := end.Add(rm.scrapeOffset).Sub(now) // 時間同步,默認取過去一分鐘以內的數據,延遲5秒拉取數據nnttselect {nttcase <-time.After(timeToNextSync):ntttrm.housekeep(start, end)nttcase <-rm.stopChan:ntttrm.sink.Stop()ntttreturnntt}nt}n}nnfunc (rm *realManager) housekeep(start, end time.Time) {ntselect {ntcase <-rm.housekeepSemaphoreChan:ntt// ok, good to gontcase <-time.After(rm.housekeepTimeout):nttglog.Warningf("Spent too long waiting for housekeeping to start")nttreturnnt}nntgo func(rm *realManager) {ntt// should always give back the semaphorenttdefer func() { rm.housekeepSemaphoreChan <- struct{}{} }()nttdata := rm.source.ScrapeMetrics(start, end) // 從source讀取數據nnttfor _, p := range rm.processors {ntttnewData, err := process(p, data)ntttif err == nil {nttttdata = newDatanttt} else {nttttglog.Errorf("Error in processor: %v", err)nttttreturnnttt}ntt} // processors鏈式處理數據nntt// Export data to sinksnttrm.sink.ExportData(data) // 導出數據到sinknnt}(rm) // 協程主要目的是不阻塞managern}n
看完大概流程代碼,我們大概知道了heapster主要做的事情。下面我們就分析數據源、數據處理、數據導出。
數據源
Heapster支持兩種類型的數據源,kubernetes和kubernetes.summary_api。前者通過監聽node節點,使用cadvisor獲取統計信息。後者使用node節點的summaryApi獲取統計信息。雖然獲取數據的途徑不同,但是最後獲取的數據是相同的。
數據處理
數據處理對象只需要實現DataProcessor介面即可。Name方法無非是返回該處理對象的名稱,Process方法接收數據流,處理後返回該數據流。Heapster實現的數據處理對象有:
- cluster_aggregator:聚合ns的監控指標到cluster下
- namespace_aggregator:聚合pod監控指標到namespace下
- namespace_based_enricher:給所有的監控數據加上namespace id
- node_aggregator:聚合pod監控指標到node下
- node_autoscaling_enricher:聚合node資源使用情況
- pod_aggregator:聚合container監控指標到pod下
- pod_based_enricher:簡單說就是給container加上pod的信息
- rate calculator:計算時間,使用率等指標
數據導出
可以同時存在多個數據導出對象,統一包裹在sinkManager里,當有數據需要導出時,數據傳遞到sinkManager,然後由manager依次將數據傳遞給sinkHolders,sinkHolder再調用實際的sink導出數據。現在支持多種sink類型,諸如elasticsearch、gcm、influxdb、kafka、log、opentsdb等,可自由配置。
結語
如此,我們大致了解了Heapster的源碼結構。後續細緻的閱讀,大家可以自行閱讀代碼。
推薦閱讀:
※攜程容器雲優化實踐
※Kubernetes 在華為全球 IT 系統中的實踐 | 架構師實踐日
※從 MVC 到微服務,技術演變的必經之路 | 架構師實踐日
※容器編排之Kubernetes多租戶網路隔離
TAG:容器云 | Kubernetes | 监控 |