深入golang之---goroutine並發控制與通信

深入golang之---goroutine並發控制與通信

來自專欄 貪歡半晌

開發go程序的時候,時常需要使用goroutine並發處理任務,有時候這些goroutine是相互獨立的,而有的時候,多個goroutine之間常常是需要同步與通信的。另一種情況,主goroutine需要控制它所屬的子goroutine,總結起來,實現多個goroutine間的同步與通信大致有:

- 全局共享變數

- channel通信(CSP模型)

- Context包

本文章通過goroutine同步與通信的一個典型場景-通知子goroutine退出運行,來深入講解下golang的控制並發。

通知多個子goroutine退出運行

goroutine作為go語言的並發利器,不僅性能強勁而且使用方便:只需要一個關鍵字go即可將普通函數並發執行,且goroutine佔用內存極小(一個goroutine只佔2KB的內存),所以開發go程序的時候很多開發者常常會使用這個並發工具,獨立的並發任務比較簡單,只需要用go關鍵字修飾函數就可以啟用一個goroutine直接運行;但是,實際的並發場景常常是需要進行協程間的同步與通信,以及精確控制子goroutine開始和結束,其中一個典型場景就是主進程通知名下所有子goroutine優雅退出運行。

由於goroutine的退出機制設計是,goroutine退出只能由本身控制,不允許從外部強制結束該goroutine。只有兩種情況例外,那就是main函數結束或者程序崩潰結束運行;所以,要實現主進程式控制制子goroutine的開始和結束,必須藉助其它工具來實現。

控制並發的方法

實現控制並發的方式,大致可分成以下三類:

- 全局共享變數

- channel通信

- Context包

全局共享變數

這是最簡單的實現控制並發的方式,實現步驟是:

1. 聲明一個全局變數;

2. 所有子goroutine共享這個變數,並不斷輪詢這個變數檢查是否有更新;

3. 在主進程中變更該全局變數;

4. 子goroutine檢測到全局變數更新,執行相應的邏輯。

示例如下:

package mainimport ( "fmt" "time")func main() { running := true f := func() { for running { fmt.Println("sub proc running...") time.Sleep(1 * time.Second) } fmt.Println("sub proc exit") } go f() go f() go f() time.Sleep(2 * time.Second) running = false time.Sleep(3 * time.Second) fmt.Println("main proc exit")}

全局變數的優勢是簡單方便,不需要過多繁雜的操作,通過一個變數就可以控制所有子goroutine的開始和結束;缺點是功能有限,由於架構所致,該全局變數只能是多讀一寫,否則會出現數據同步問題,當然也可以通過給全局變數加鎖來解決這個問題,但那就增加了複雜度,另外這種方式不適合用於子goroutine間的通信,因為全局變數可以傳遞的信息很小;還有就是主進程無法等待所有子goroutine退出,因為這種方式只能是單向通知,所以這種方法只適用於非常簡單的邏輯且並發量不太大的場景,一旦邏輯稍微複雜一點,這種方法就有點捉襟見肘。

channel通信

另一種更為通用且靈活的實現控制並發的方式是使用channel進行通信。

首先,我們先來了解下什麼是golang中的channel:Channel是Go中的一個核心類型,你可以把它看成一個管道,通過它並發核心單元就可以發送或者接收數據進行通訊(communication)。

要想理解 channel 要先知道 CSP 模型:

CSP 是 Communicating Sequential Process 的簡稱,中文可以叫做通信順序進程,是一種並發編程模型,由 Tony Hoare 於 1977 年提出。簡單來說,CSP 模型由並發執行的實體(線程或者進程)所組成,實體之間通過發送消息進行通信,這裡發送消息時使用的就是通道,或者叫 channel。CSP 模型的關鍵是關注 channel,而不關注發送消息的實體。Go 語言實現了 CSP 部分理論,goroutine 對應 CSP 中並發執行的實體,channel 也就對應著 CSP 中的 channel。

也就是說,CSP 描述這樣一種並發模型:多個Process 使用一個 Channel 進行通信, 這個 Channel 連結的 Process 通常是匿名的,消息傳遞通常是同步的(有別於 Actor Model)。

先來看示例代碼:

package mainimport ( "fmt" "os" "os/signal" "sync" "syscall" "time")func consumer(stop <-chan bool) { for { select { case <-stop: fmt.Println("exit sub goroutine") return default: fmt.Println("running...") time.Sleep(500 * time.Millisecond) } }}func main() { stop := make(chan bool) var wg sync.WaitGroup // Spawn example consumers for i := 0; i < 3; i++ { wg.Add(1) go func(stop <-chan bool) { defer wg.Done() consumer(stop) }(stop) } waitForSignal() close(stop) fmt.Println("stopping all jobs!") wg.Wait()}func waitForSignal() { sigs := make(chan os.Signal) signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, syscall.SIGTERM) <-sigs}

這裡可以實現優雅等待所有子goroutine完全結束之後主進程才結束退出,藉助了標準庫sync里的Waitgroup,這是一種控制並發的方式,可以實現對多goroutine的等待,官方文檔是這樣描述的:

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for.

Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

簡單來講,它的源碼里實現了一個類似計數器的結構,記錄每一個在它那裡註冊過的協程,然後每一個協程完成任務之後需要到它那裡註銷,然後在主進程那裡可以等待直至所有協程完成任務退出。

使用步驟:

1. 創建一個Waitgroup的實例wg;

2. 在每個goroutine啟動的時候,調用wg.Add(1)註冊;

3. 在每個goroutine完成任務後退出之前,調用wg.Done()註銷。

4. 在等待所有goroutine的地方調用wg.Wait()阻塞進程,知道所有goroutine都完成任務調用wg.Done()註銷之後,Wait()方法會返回。

該示常式序是一種golang的select+channel的典型用法,我們來稍微深入一點分析一下這種典型用法:

channel

首先了解下channel,可以理解為管道,它的主要功能點是:

  1. 隊列存儲數據
  2. 阻塞和喚醒goroutine

channel 實現集中在文件 runtime/chan.go 中,channel底層數據結構是這樣的:

type hchan struct { qcount uint // 隊列中數據個數 dataqsiz uint // channel 大小 buf unsafe.Pointer // 存放數據的環形數組 elemsize uint16 // channel 中數據類型的大小 closed uint32 // 表示 channel 是否關閉 elemtype *_type // 元素數據類型 sendx uint // send 的數組索引 recvx uint // recv 的數組索引 recvq waitq // 由 recv 行為(也就是 <-ch)阻塞在 channel 上的 goroutine 隊列 sendq waitq // 由 send 行為 (也就是 ch<-) 阻塞在 channel 上的 goroutine 隊列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another Gs status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex}

從源碼可以看出它其實就是一個隊列加一個鎖(輕量),代碼本身不複雜,但涉及到上下文很多細節,故而不易通讀,有興趣的同學可以去看一下,我的建議是,從上面總結的兩個功能點出發,一個是 ring buffer,用於存數據; 一個是存放操作(讀寫)該channel的goroutine 的隊列。

  • buf是一個通用指針,用於存儲數據,看源碼時重點關注對這個變數的讀寫
  • recvq 是讀操作阻塞在 channel 的 goroutine 列表,sendq 是寫操作阻塞在 channel 的 goroutine 列表。列表的實現是 sudog,其實就是一個對 g 的結構的封裝,看源碼時重點關注,是怎樣通過這兩個變數阻塞和喚醒goroutine的

由於涉及源碼較多,這裡就不再深入。

select

然後是select機制,golang 的 select 機制可以理解為是在語言層面實現了和 select, poll, epoll 相似的功能:監聽多個描述符的讀/寫等事件,一旦某個描述符就緒(一般是讀或者寫事件發生了),就能夠將發生的事件通知給關心的應用程序去處理該事件。 golang 的 select 機制是,監聽多個channel,每一個 case 是一個事件,可以是讀事件也可以是寫事件,隨機選擇一個執行,可以設置default,它的作用是:當監聽的多個事件都阻塞住會執行default的邏輯。

select的源碼在runtime/select.go ,看的時候建議是重點關注 pollorder 和 lockorder

  • pollorder保存的是scase的序號,亂序是為了之後執行時的隨機性。
  • lockorder保存了所有case中channel的地址,這裡按照地址大小堆排了一下lockorder對應的這片連續內存。對chan排序是為了去重,保證之後對所有channel上鎖時不會重複上鎖。

因為我對這部分源碼研究得也不是很深,故而點到為止即可,有興趣的可以去看看源碼啦!

具體到demo代碼:consumer為協程的具體代碼,裡面是只有一個不斷輪詢channel變數stop的循環,所以主進程是通過stop來通知子協程何時該結束運行的,在main方法中,close掉stop之後,讀取已關閉的channel會立刻返回該channel數據類型的零值,因此子goroutine里的<-stop操作會馬上返回,然後退出運行。

事實上,通過channel控制子goroutine的方法可以總結為:循環監聽一個channel,一般來說是for循環里放一個select監聽channel以達到通知子goroutine的效果。再藉助Waitgroup,主進程可以等待所有協程優雅退出後再結束自己的運行,這就通過channel實現了優雅控制goroutine並發的開始和結束。

channel通信控制基於CSP模型,相比於傳統的線程與鎖並發模型,避免了大量的加鎖解鎖的性能消耗,而又比Actor模型更加靈活,使用Actor模型時,負責通訊的媒介與執行單元是緊耦合的–每個Actor都有一個信箱。而使用CSP模型,channel是第一對象,可以被獨立地創建,寫入和讀出數據,更容易進行擴展。

殺器Context

Context通常被譯作上下文,它是一個比較抽象的概念。在討論鏈式調用技術時也經常會提到上下文。一般理解為程序單元的一個運行狀態、現場、快照,而翻譯中上下又很好地詮釋了其本質,上下則是存在上下層的傳遞,上會把內容傳遞給下。在Go語言中,程序單元也就指的是Goroutine。

每個Goroutine在執行之前,都要先知道程序當前的執行狀態,通常將這些執行狀態封裝在一個Context變數中,傳遞給要執行的Goroutine中。上下文則幾乎已經成為傳遞與請求同生存周期變數的標準方法。在網路編程下,當接收到一個網路請求Request,在處理這個Request的goroutine中,可能需要在當前gorutine繼續開啟多個新的Goroutine來獲取數據與邏輯處理(例如訪問資料庫、RPC服務等),即一個請求Request,會需要多個Goroutine中處理。而這些Goroutine可能需要共享Request的一些信息;同時當Request被取消或者超時的時候,所有從這個Request創建的所有Goroutine也應該被結束。

context在go1.7之後被引入到標準庫中,1.7之前的go版本使用context需要安裝golang.org/x/net/contex包,關於golang context的更詳細說明,可參考官方文檔:context

Context初試

Context的創建和調用關係是層層遞進的,也就是我們通常所說的鏈式調用,類似數據結構里的樹,從根節點開始,每一次調用就衍生一個葉子節點。首先,生成根節點,使用context.Background方法生成,而後可以進行鏈式調用使用context包里的各類方法,context包里的所有方法:

- func Background() Context

- func TODO() Context

- func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

- func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

- func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

- func WithValue(parent Context, key, val interface{}) Context

這裡僅以WithCancel和WithValue方法為例來實現控制並發和通信:

話不多說,上碼:

package mainimport ( "context" "crypto/md5" "fmt" "io/ioutil" "net/http" "sync" "time")type favContextKey stringfunc main() { wg := &sync.WaitGroup{} values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"} ctx, cancel := context.WithCancel(context.Background()) for _, url := range values { wg.Add(1) subCtx := context.WithValue(ctx, favContextKey("url"), url) go reqURL(subCtx, wg) } go func() { time.Sleep(time.Second * 3) cancel() }() wg.Wait() fmt.Println("exit main goroutine")}func reqURL(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() url, _ := ctx.Value(favContextKey("url")).(string) for { select { case <-ctx.Done(): fmt.Printf("stop getting url:%s
", url) return default: r, err := http.Get(url) if r.StatusCode == http.StatusOK && err == nil { body, _ := ioutil.ReadAll(r.Body) subCtx := context.WithValue(ctx, favContextKey("resp"), fmt.Sprintf("%s%x", url, md5.Sum(body))) wg.Add(1) go showResp(subCtx, wg) } r.Body.Close() //啟動子goroutine是為了不阻塞當前goroutine,這裡在實際場景中可以去執行其他邏輯,這裡為了方便直接sleep一秒 // doSometing() time.Sleep(time.Second * 1) } }}func showResp(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() for { select { case <-ctx.Done(): fmt.Println("stop showing resp") return default: //子goroutine里一般會處理一些IO任務,如讀寫資料庫或者rpc調用,這裡為了方便直接把數據列印 fmt.Println("printing ", ctx.Value(favContextKey("resp"))) time.Sleep(time.Second * 1) } }}

前面我們說過Context就是設計用來解決那種多個goroutine處理一個Request且這多個goroutine需要共享Request的一些信息的場景,以上是一個簡單模擬上述過程的demo。

首先調用context.Background()生成根節點,然後調用withCancel方法,傳入根節點,得到新的子Context以及根節點的cancel方法(通知所有子節點結束運行),這裡要注意:該方法也返回了一個Context,這是一個新的子節點,與初始傳入的根節點不是同一個實例了,但是每一個子節點裡會保存從最初的根節點到本節點的鏈路信息 ,才能實現鏈式。

程序的reqURL方法接收一個url,然後通過http請求該url獲得response,然後在當前goroutine里再啟動一個子groutine把response列印出來,然後從ReqURL開始Context樹往下衍生葉子節點(每一個鏈式調用新產生的ctx),中間每個ctx都可以通過WithValue方式傳值(實現通信),而每一個子goroutine都能通過Value方法從父goroutine取值,實現協程間的通信,每個子ctx可以調用Done方法檢測是否有父節點調用cancel方法通知子節點退出運行,根節點的cancel調用會沿著鏈路通知到每一個子節點,因此實現了強並發控制,流程如圖:

Context調用鏈路圖

該demo結合前面說的WaitGroup實現了優雅並發控制和通信,關於WaitGroup的原理和使用前文已做解析,這裡便不再贅述,當然,實際的應用場景不會這麼簡單,處理Request的goroutine啟動多個子goroutine大多是處理IO密集的任務如讀寫資料庫或rpc調用,然後在主goroutine中繼續執行其他邏輯,這裡為了方便講解做了最簡單的處理。

Context作為golang中並發控制和通信的大殺器,被廣泛應用,一些使用go開發http服務的同學如果閱讀過這些很多 web framework的源碼就知道,Context在web framework隨處可見,因為http請求處理就是一個典型的鏈式過程以及並發場景,所以很多web framework都會藉助Context實現鏈式調用的邏輯。有興趣可以讀一下context包的源碼,會發現Context的實現其實是結合了Mutex鎖和channel而實現的,其實並發、同步的很多高級組件萬變不離其宗,都是通過最底層的數據結構組裝起來的,只要知曉了最基礎的概念,上游的架構也可以一目了然。

context使用規範

最後,Context雖然是神器,但開發者使用也要遵循基本法,以下是一些Context使用的規範:

- Do not store Contexts inside a struct type; instead, pass a Context explicitly to each function that needs it. The Context should be the first parameter, typically named ctx;不要把Context存在一個結構體當中,顯式地傳入函數。Context變數需要作為第一個參數使用,一般命名為ctx;

  • Do not pass a nil Context, even if a function permits it. Pass context.TODO if you are unsure about which Context to use;即使方法允許,也不要傳入一個nil的Context,如果你不確定你要用什麼Context的時候傳一個context.TODO;
  • Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions;使用context的Value相關方法只應該用於在程序和介面中傳遞的和請求相關的元數據,不要用它來傳遞一些可選的參數;
  • The same Context may be passed to functions running in different goroutines; Contexts are safe for simultaneous use by multiple goroutines;同樣的Context可以用來傳遞到不同的goroutine中,Context在多個goroutine中是安全的;

參考鏈接

  • [1] deepzz.com/post/golang-
  • [2] flysnow.org/2017/05/12/
  • [3] golang.org/pkg/context/
  • [4]moye.me/2017/05/05/go-c

推薦閱讀:

TAG:Go語言 | 並發 | 協程 |