解析 | openshift源碼簡析之pod網路配置(下)

解析 | openshift源碼簡析之pod網路配置(下)

前言

openshift底層是通過kubelet來管理pod,kubelet通過CNI插件來配置pod網路.openshift node節點在啟動的時會在一個goroutine中啟動kubelet, 由kubelet來負責pod的管理工作。

本文主要從源碼的角度入手,簡單分析在openshift環境下kubelet是如何通過調用openshift sdn插件來配置pod網路。

上一節分析了openshift-sdn插件是如何配置Pod網路的,本節分析openshift-sdn插件獲取Pod IP時cniServer的處理流程。

CNIServer流程

在上面的分析中我們知道,openshift-sdn插件是通過方法doCNIServerAdd向cniserver來請求IP的,那cniserver是如何處理請求的呢?我們先來看cniServer的邏輯。

cniServer的定義位於openshit代碼庫的pkg/network/node/cniserver/cniserver.go文件,定義如下:

type CNIServer struct { http.Server requestFunc cniRequestFunc rundir string config *Config}

它包括了一個http server,以及一個處理請求的handler cniRequestFunc, 還有一些配置相關的欄位。

cniSever的構造器方法位於pkg/network/node/cniserver/cniserver.go#L120, 內容如下:

// Create and return a new CNIServer object which will listen on a socket in the given pathfunc NewCNIServer(rundir string, config *Config) *CNIServer { router := mux.NewRouter() s := &CNIServer{ Server: http.Server{ Handler: router, }, rundir: rundir, config: config, } router.NotFoundHandler = http.HandlerFunc(http.NotFound) router.HandleFunc("/", s.handleCNIRequest).Methods("POST") return s}

從上面第13行的代碼可以看出,該server只處理一條POST方法的路由,處理請求的handler是handleCNIRequest這個方法,該方法的定義位於 pkg/network/node/cniserver/cniserver.go#L277,內容如下:

// Dispatch a pod request to the request handler and return the result to the// CNI server clientfunc (s *CNIServer) handleCNIRequest(w http.ResponseWriter, r *http.Request) { req, err := cniRequestToPodRequest(r) if err != nil { http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) return } glog.V(5).Infof("Waiting for %s result for pod %s/%s", req.Command, req.PodNamespace, req.PodName) result, err := s.requestFunc(req) if err != nil { http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) } else { // Empty response JSON means success with no body w.Header().Set("Content-Type", "application/json") if _, err := w.Write(result); err != nil { glog.Warningf("Error writing %s HTTP response: %v", req.Command, err) } }}

從第11行可以看出,該方法又是調用requestFunc這個方法來處理請求,請求結束後通過w.Write或者是http.Error返回調用者的response。requestFunc是在cniserver的Start的方法中傳入的,傳入的實際上是podManager的handleCNIRequest方法,該方法位於文件pkg/network/node/pod.go#L25,內容如下:

// Enqueue incoming pod requests from the CNI server, wait on the result,// and return that result to the CNI clientfunc (m *podManager) handleCNIRequest(request *cniserver.PodRequest) ([]byte, error) { glog.V(5).Infof("Dispatching pod network request %v", request) m.addRequest(request) result := m.waitRequest(request) glog.V(5).Infof("Returning pod network request %v, result %s err %v", request, string(result.Response), result.Err) return result.Response, result.Err}

在第5行該方法先通過addRequest方法把請求放到一個隊列裡面,然後調用第6行的waitRequest等待請求執行完成。addRequest定義位於pkg/network/node/pod.go#L240, 內容如下:

// Add a request to the podManager CNI request queuefunc (m *podManager) addRequest(request *cniserver.PodRequest) { m.requests <- request}

可以看出請求被放到了m.requests這個channel裡面,也就是在這裡用channel做的隊列。

waitRequest是從一個channel里取出結果,定義位於pkg/network/node/pod.go#L245,內容如下:

// Wait for and return the result of a pod requestfunc (m *podManager) waitRequest(request *cniserver.PodRequest) *cniserver.PodResult { return <-request.Result}

剛才說了addRequest會把請求放到m.requests這個隊列裡面,那隊列里的請求是如何被執行的呢?答案就是podManager在啟動時會在一個gorotine里調用processCNIRequests這個方法,該方法會循環的從m.requests這個channel裡面取出請求執行。processCNIRequests定義位於pkg/network/node/pod.go#L286,內容如下:

// Process all CNI requests from the request queue serially. Our OVS interaction// and scripts currently cannot run in parallel, and doing so greatly complicates// setup/teardown logicfunc (m *podManager) processCNIRequests() { for request := range m.requests { glog.V(5).Infof("Processing pod network request %v", request) result := m.processRequest(request) glog.V(5).Infof("Processed pod network request %v, result %s err %v", request, string(result.Response), result.Err) request.Result <- result } panic("stopped processing CNI pod requests!")}

可以看出該方法通過一個for循環不斷的從m.requests裡面取出請求,然後調用processRequest方法來處理請求,最後把處理的結果在放到request.Result裡面由上面的waitRequest來獲取。

我們來分析processRequest方法的執行邏輯,該方法定義位於pkg/network/node/pod.go#L296,內容如下:

func (m *podManager) processRequest(request *cniserver.PodRequest) *cniserver.PodResult { m.runningPodsLock.Lock() defer m.runningPodsLock.Unlock() pk := getPodKey(request) result := &cniserver.PodResult{} switch request.Command { case cniserver.CNI_ADD: ipamResult, runningPod, err := m.podHandler.setup(request) if ipamResult != nil { result.Response, err = json.Marshal(ipamResult) if err == nil { m.runningPods[pk] = runningPod if m.ovs != nil { m.updateLocalMulticastRulesWithLock(runningPod.vnid) } } } if err != nil { PodOperationsErrors.WithLabelValues(PodOperationSetup).Inc() result.Err = err } case cniserver.CNI_UPDATE: vnid, err := m.podHandler.update(request) if err == nil { if runningPod, exists := m.runningPods[pk]; exists { runningPod.vnid = vnid } } result.Err = err case cniserver.CNI_DEL: if runningPod, exists := m.runningPods[pk]; exists { delete(m.runningPods, pk) if m.ovs != nil { m.updateLocalMulticastRulesWithLock(runningPod.vnid) } } result.Err = m.podHandler.teardown(request) if result.Err != nil { PodOperationsErrors.WithLabelValues(PodOperationTeardown).Inc() } default: result.Err = fmt.Errorf("unhandled CNI request %v", request.Command) } return result}

可以看出該方法針對request.Command的三種不同取值有三部分邏輯來分別處理,我們重點分析Command等於cniserver.CNI_ADD時的邏輯,也就是前面調用openshift-sdn時傳遞ADD參數的處理邏輯。在Command等於cniserver.CNI_ADD部分的代碼主要是調用第9行的podHandler的setup方法,該方法的定義位於pkg/network/node/pod.go#L497,內容如下:

// Set up all networking (host/container veth, OVS flows, IPAM, loopback, etc)func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *runningPod, error) { defer PodOperationsLatency.WithLabelValues(PodOperationSetup).Observe(sinceInMicroseconds(time.Now())) pod, err := m.kClient.Core().Pods(req.PodNamespace).Get(req.PodName, metav1.GetOptions{}) if err != nil { return nil, nil, err } ipamResult, podIP, err := m.ipamAdd(req.Netns, req.SandboxID) if err != nil { return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.SandboxID, err) } // Release any IPAM allocations and hostports if the setup failed var success bool defer func() { if !success { m.ipamDel(req.SandboxID) if mappings := m.shouldSyncHostports(nil); mappings != nil { if err := m.hostportSyncer.SyncHostports(Tun0, mappings); err != nil { glog.Warningf("failed syncing hostports: %v", err) } } } }() // Open any hostports the pod wants var v1Pod v1.Pod if err := kapiv1.Convert_core_Pod_To_v1_Pod(pod, &v1Pod, nil); err != nil { return nil, nil, err } podPortMapping := kubehostport.ConstructPodPortMapping(&v1Pod, podIP) if mappings := m.shouldSyncHostports(podPortMapping); mappings != nil { if err := m.hostportSyncer.OpenPodHostportsAndSync(podPortMapping, Tun0, mappings); err != nil { return nil, nil, err } } vnid, err := m.policy.GetVNID(req.PodNamespace) if err != nil { return nil, nil, err } if err := maybeAddMacvlan(pod, req.Netns); err != nil { return nil, nil, err } ofport, err := m.ovs.SetUpPod(req.SandboxID, req.HostVeth, podIP, vnid) if err != nil { return nil, nil, err } if err := setupPodBandwidth(m.ovs, pod, req.HostVeth, req.SandboxID); err != nil { return nil, nil, err } m.policy.EnsureVNIDRules(vnid) success = true return ipamResult, &runningPod{podPortMapping: podPortMapping, vnid: vnid, ofport: ofport}, nil}

該方法的主要邏輯有兩個,一是第10行調用m.ipamAdd獲取IP,這裡涉及到IPAM,後面單獨分析;另一個是第49行調用ovs.SetUpPod設置OVS規則,後面也會單獨分析。

至此,openshfit-sdn請求IP時cniServer的處理流程分析結束,下節我們分析cniServer如何調用IPAM插件來管理IP。

IPAM

上面分析了openshfit-sdn請求IP時cniServer的處理流程,這一節我們分析cniServer調用IPAM插件來管理IP的邏輯。cniServer是調用IPAM插件host-local來做IP管理的,該插件位於/opt/cni/bin目錄,是一個預編譯的二進位可執行程序。本節將從IP的分配和釋放兩方面來分析cniServer跟host-local的交互流程。

IP分配

前面章節說了cniServer是調用了podManager的ipamAdd方法來獲取IP的,那它又是如何同host-local插件交互的呢,我們來展開分析。

ipamAdd方法的定義位於pkg/network/node/pod.go#L422, 內容如下:

// Run CNI IPAM allocation for the container and return the allocated IP addressfunc (m *podManager) ipamAdd(netnsPath string, id string) (*cni020.Result, net.IP, error) { if netnsPath == "" { return nil, nil, fmt.Errorf("netns required for CNI_ADD") } args := createIPAMArgs(netnsPath, m.cniBinPath, cniserver.CNI_ADD, id) r, err := invoke.ExecPluginWithResult(m.cniBinPath+"/host-local", m.ipamConfig, args) if err != nil { return nil, nil, fmt.Errorf("failed to run CNI IPAM ADD: %v", err) } // We gave the IPAM plugin 0.2.0 config, so the plugin must return a 0.2.0 result result, err := cni020.GetResult(r) if err != nil { return nil, nil, fmt.Errorf("failed to parse CNI IPAM ADD result: %v", err) } if result.IP4 == nil { return nil, nil, fmt.Errorf("failed to obtain IP address from CNI IPAM") } return result, result.IP4.IP.IP, nil}

上面代碼第7行先通過createIPAMArgs方法構建一個參數變數args,變數定義如下:

struct { Command string ContainerID string NetNS string PluginArgs [][2]string PluginArgsStr string IfName string Path string}

構建後的變數的Command的值是「ADD」,這樣在調用host-local時就會執行ADD相關的操作。

第8行通過invoke.ExecPluginWithResult來調用執行host-local插件,傳入了上面創建的參數變數args,同時傳入了一個變數ipamConfig,ipamConfig裡面包含了pod所在node的子網相關配置以及一些host-local插件的配置,內容類似如下:

{ "cniVersion":"0.3.1", "name":"examplenet", "ipam":{ "type":"host-local", "ranges":[ [ { "subnet":"203.0.113.0/24" } ] ], "dataDir":"/tmp/cni-example" }}

調用host-local類似如下命令:

echo { "cniVersion": "0.3.1", "name": "examplenet", "ipam": { "type": "host-local", "ranges": [ [{"subnet": "203.0.113.0/24"}]], "dataDir": "/tmp/cni-example" } } | CNI_COMMAND=ADD CNI_CONTAINERID=example CNI_NETNS=/proc/48776/ns/net CNI_IFNAME=eth0 CNI_PATH=/opt/cni/bin /opt/cni/bin/host-local

調用返回的resut的值類似:

{ "ips":[ { "version":"4", "address":"203.0.113.2/24", "gateway":"203.0.113.1" } ]}

獲取的IP信息以及網關信息在上面代碼的第22行返回給調用者,也就是第三節中分析的podManager的setup方法的第10行。

IP釋放

當cniServer接收到釋放IP的請求時,會調用podManager的ipamDel方法,定義位於pkg/network/node/pod.go#L445,內容如下:

// Run CNI IPAM release for the containerfunc (m *podManager) ipamDel(id string) error { args := createIPAMArgs("", m.cniBinPath, cniserver.CNI_DEL, id) err := invoke.ExecPluginWithoutResult(m.cniBinPath+"/host-local", m.ipamConfig, args) if err != nil { return fmt.Errorf("failed to run CNI IPAM DEL: %v", err) } return nil}

該方法的邏輯跟ipamAdd一樣,都是通過調用host-local插件來完成相應的操作,不同的是該方法在調用時傳入了一個Command等於CNI_DEL的args,這樣在調用host-local時就會執行IP釋放的相關操作。

host-local會把所有已經分配過的IP記錄到本地,也就是ipamConfig配置的dataDir目錄下,在openshit環境下是記錄到/var/lib/cni/networks/openshift-sdn目錄下。目錄下的內容類似如下:

[root@master227 ~]# ls /var/lib/cni/networks/openshift-sdn10.128.0.114 10.128.0.116 last_reserved_ip.0[root@master227 ~]#

上面列出的每一個以ip命名的文件都代表一個已經分配的IP,它的內容是該IP所在的pod的ID. 內容類似如下:

[root@master227 ~]# cat /var/lib/cni/networks/openshift-sdn/10.128.0.1147a1c2e242c2a2d750382837b81283952ad9878ae496195560f9854935d7e4d31[root@master227 ~]#

當分配IP時,host-local會在該目錄下添加一條記錄,釋放IP時會刪除相應的記錄。

關於host-local的邏輯不再作分析,後面會有單獨的章節來分析,有興趣的可以看看源碼,位於github.com/containernet代碼庫下。

至此,IPAM的邏輯分析結束,下一節我們分析cniServer是如何調用ovs controller來設置Pod ovs規則。

OVS規則設置

openshift底層的網路用的是ovs, 那麼在配置好pod IP之後,又是如何設置跟pod相關的ovs規則的呢?下面作一分析。

openshift node在啟動時會創建一個ovs controller,由它來完成ovs網路配置的各種操作。在第三節我們分析過,cniServer是通過調用ovs controller的SetUpPod方法來設置pod ovs規則,調用的代碼位於: pkg/network/node/pod.go#L544, 內容如下:

ofport, err := m.ovs.SetUpPod(req.SandboxID, req.HostVeth, podIP, vnid)

SetUpPod的定義位於pkg/network/node/ovscontroller.go#L267,內容如下:

func (oc *ovsController) SetUpPod(sandboxID, hostVeth string, podIP net.IP, vnid uint32) (int, error) { ofport, err := oc.ensureOvsPort(hostVeth, sandboxID, podIP.String()) if err != nil { return -1, err } return ofport, oc.setupPodFlows(ofport, podIP, vnid)}

在上面代碼的第2行,SetUpPod又調用了ensureOvsPort這個方法,該方法的定義位於pkg/network/node/ovscontroller.go#L227,內容如下:

func (oc *ovsController) ensureOvsPort(hostVeth, sandboxID, podIP string) (int, error) { return oc.ovs.AddPort(hostVeth, -1, fmt.Sprintf(`external-ids=sandbox="%s",ip="%s"`, sandboxID, podIP), )}

如代碼所示,該方法又調用了ovs的AddPort方法,我們再來分析AddPort方法。該方法的定義位於pkg/util/ovs/ovs.go#L31,內容如下:

func (ovsif *ovsExec) AddPort(port string, ofportRequest int, properties ...string) (int, error) { args := []string{"--may-exist", "add-port", ovsif.bridge, port} if ofportRequest > 0 || len(properties) > 0 { args = append(args, "--", "set", "Interface", port) if ofportRequest > 0 { args = append(args, fmt.Sprintf("ofport_request=%d", ofportRequest)) } if len(properties) > 0 { args = append(args, properties...) } } _, err := ovsif.exec(OVS_VSCTL, args...) if err != nil { return -1, err } ofport, err := ovsif.GetOFPort(port) if err != nil { return -1, err } if ofportRequest > 0 && ofportRequest != ofport { return -1, fmt.Errorf("allocated ofport (%d) did not match request (%d)", ofport, ofportRequest) } return ofport, nil}

分析上面的代碼你會發現,AddPort實際上是調用了底層的ovs-vsctl命令將pod的host端的虛擬網卡加入到了ovs網橋br0上,這樣br0上的流量就可以通過該網卡進入pod了。該方法的調用類似於下面的命令行,假設pod host端的網卡是veth3258a5e2:

ovs-vsctl --may-exist add-port br0 veth3258a5e2

接著回到SetUpPod方法,在第6行中調用了setupPodFlows來設置pod IP的ovs規則,該方法的定義位於pkg/network/node/ovscontroller.go#L233,內容如下:

func (oc *ovsController) setupPodFlows(ofport int, podIP net.IP, vnid uint32) error { otx := oc.ovs.NewTransaction() ipstr := podIP.String() podIP = podIP.To4() ipmac := fmt.Sprintf("00:00:x:x:x:x/00:00:ff:ff:ff:ff", podIP[0], podIP[1], podIP[2], podIP[3]) // ARP/IP traffic from container otx.AddFlow("table=20, priority=100, in_port=%d, arp, nw_src=%s, arp_sha=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, ipmac, vnid) otx.AddFlow("table=20, priority=100, in_port=%d, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, vnid) if oc.useConnTrack { otx.AddFlow("table=25, priority=100, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:30", ipstr, vnid) } // ARP request/response to container (not isolated) otx.AddFlow("table=40, priority=100, arp, nw_dst=%s, actions=output:%d", ipstr, ofport) // IP traffic to container otx.AddFlow("table=70, priority=100, ip, nw_dst=%s, actions=load:%d->NXM_NX_REG1[], load:%d->NXM_NX_REG2[], goto_table:80", ipstr, vnid, ofport) return otx.Commit()}

在上面代碼的第9行到第19行,分別調用了AddFlow來設置各種ovs規則,第9行到第10行設置了從pod出去的ARP/IP流量的規則,第16行設置了進入POD的ARP流量規則,第19行設置了進入POD的IP流量規則。 AddFlow實際上是調用了命令行工具ovs-ofctl來設置各種ovs規則。關於這些規則的詳細內容不再作分析,感興趣的同學可以自行研究。

至此,ovs規則的設置流程分析完畢,openshit pod網路配置的流程也全部分析完畢。


註:本文由優雲數智工程師牛敏國撰寫,閱讀原文:

解析 | openshift源碼簡析之pod網路配置(下)?

mp.weixin.qq.com圖標
推薦閱讀:

微軟的決絕:抱緊雲與 AI 的未來
雲計算、大數據的人才需求有多大?
雲計算對各行業變革的影響究竟有多大?
數據中心利用餘熱得不償失
為數據賦予超能力,阿里雲重磅推出Serverless數據

TAG:容器 | 雲計算 | 開源軟體 |