一些常見的並發編程錯誤
來自專欄 Linux15 人贊了文章
Go 是一個內置支持並發編程的語言。藉助使用 go
關鍵字去創建 協程(goroutine)(輕量級線程)和在 Go 中提供的 使用 信道 和 其它的並發 同步方法,使得並發編程變得很容易、很靈活和很有趣。
另一方面,Go 並不會阻止一些因 Go 程序員粗心大意或者缺乏經驗而造成的並發編程錯誤。在本文的下面部分將展示一些在 Go 編程中常見的並發編程錯誤,以幫助 Go 程序員們避免再犯類似的錯誤。
需要同步的時候沒有同步
代碼行或許 不是按出現的順序運行的。
在下面的程序中有兩個錯誤。
- 第一,在
main
協程中讀取b
和在新的 協程 中寫入b
可能導致數據爭用。 - 第二,條件
b == true
並不能保證在main
協程 中的a != nil
。在新的協程中編譯器和 CPU 可能會通過 重排序指令 進行優化,因此,在運行時b
賦值可能發生在a
賦值之前,在main
協程 中當a
被修改後,它將會讓部分a
一直保持為nil
。
package mainimport ( "time" "runtime")func main() { var a []int // nil var b bool // false // a new goroutine go func () { a = make([]int, 3) b = true // write b }() for !b { // read b time.Sleep(time.Second) runtime.Gosched() } a[0], a[1], a[2] = 0, 1, 2 // might panic}
上面的程序或者在一台計算機上運行的很好,但是在另一台上可能會引發異常。或者它可能運行了 N 次都很好,但是可能在第 (N+1) 次引發了異常。
我們將使用 sync
標準包中提供的信道或者同步方法去確保內存中的順序。例如,
package mainfunc main() { var a []int = nil c := make(chan struct{}) // a new goroutine go func () { a = make([]int, 3) c <- struct{}{} }() <-c a[0], a[1], a[2] = 0, 1, 2}
使用 time.Sleep
調用去做同步
我們先來看一個簡單的例子。
package mainimport ( "fmt" "time")func main() { var x = 123 go func() { x = 789 // write x }() time.Sleep(time.Second) fmt.Println(x) // read x}
我們預期程序將列印出 789
。如果我們運行它,通常情況下,它確定列印的是 789
。但是,這個程序使用的同步方式好嗎?No!原因是 Go 運行時並不保證 x
的寫入一定會發生在 x
的讀取之前。在某些條件下,比如在同一個操作系統上,大部分 CPU 資源被其它運行的程序所佔用的情況下,寫入 x
可能就會發生在讀取 x
之後。這就是為什麼我們在正式的項目中,從來不使用 time.Sleep
調用去實現同步的原因。
我們來看一下另外一個示例。
package mainimport ( "fmt" "time")var x = 0func main() { var num = 123 var p = &num c := make(chan int) go func() { c <- *p + x }() time.Sleep(time.Second) num = 789 fmt.Println(<-c)}
你認為程序的預期輸出是什麼?123
還是 789
?事實上它的輸出與編譯器有關。對於標準的 Go 編譯器 1.10 來說,這個程序很有可能輸出是 123
。但是在理論上,它可能輸出的是 789
,或者其它的隨機數。
現在,我們來改變 c <- *p + x
為 c <- *p
,然後再次運行這個程序。你將會發現輸出變成了 789
(使用標準的 Go 編譯器 1.10)。這再次說明它的輸出是與編譯器相關的。
是的,在上面的程序中存在數據爭用。表達式 *p
可能會被先計算、後計算、或者在處理賦值語句 num = 789
時計算。time.Sleep
調用並不能保證 *p
發生在賦值語句處理之前進行。
對於這個特定的示例,我們將在新的協程創建之前,將值保存到一個臨時值中,然後在新的協程中使用臨時值去消除數據爭用。
... tmp := *p + x go func() { c <- tmp }()...
使協程掛起
掛起協程是指讓協程一直處於阻塞狀態。導致協程被掛起的原因很多。比如,
- 一個協程嘗試從一個 nil 信道中或者從一個沒有其它協程給它發送值的信道中檢索數據。
- 一個協程嘗試去發送一個值到 nil 信道,或者發送到一個沒有其它的協程接收值的信道中。
- 一個協程被它自己死鎖。
- 一組協程彼此死鎖。
- 當運行一個沒有
default
分支的select
代碼塊時,一個協程被阻塞,以及在select
代碼塊中case
關鍵字後的所有信道操作保持阻塞狀態。
除了有時我們為了避免程序退出,特意讓一個程序中的 main
協程保持掛起之外,大多數其它的協程掛起都是意外情況。Go 運行時很難判斷一個協程到底是處於掛起狀態還是臨時阻塞。因此,Go 運行時並不會去釋放一個掛起的協程所佔用的資源。
在 誰先響應誰獲勝 的信道使用案例中,如果使用的 future 信道容量不夠大,當嘗試向 Future 信道發送結果時,一些響應較慢的信道將被掛起。比如,如果調用下面的函數,將有 4 個協程處於永遠阻塞狀態。
func request() int { c := make(chan int) for i := 0; i < 5; i++ { i := i go func() { c <- i // 4 goroutines will hang here. }() } return <-c}
為避免這 4 個協程一直處於掛起狀態, c
信道的容量必須至少是 4
。
在 實現誰先響應誰獲勝的第二種方法 的信道使用案例中,如果將 future 信道用做非緩衝信道,那麼有可能這個信息將永遠也不會有響應而掛起。例如,如果在一個協程中調用下面的函數,協程可能會掛起。原因是,如果接收操作 <-c
準備就緒之前,五個發送操作全部嘗試發送,那麼所有的嘗試發送的操作將全部失敗,因此那個調用者協程將永遠也不會接收到值。
func request() int { c := make(chan int) for i := 0; i < 5; i++ { i := i go func() { select { case c <- i: default: } }() } return <-c}
將信道 c
變成緩衝信道將保證五個發送操作中的至少一個操作會發送成功,這樣,上面函數中的那個調用者協程將不會被掛起。
在 sync
標準包中拷貝類型值
在實踐中,sync
標準包中的類型值不會被拷貝。我們應該只拷貝這個值的指針。
下面是一個錯誤的並發編程示例。在這個示例中,當調用 Counter.Value
方法時,將拷貝一個 Counter
接收值。作為接收值的一個欄位,Counter
接收值的各個 Mutex
欄位也會被拷貝。拷貝不是同步發生的,因此,拷貝的 Mutex
值可能會出錯。即便是沒有錯誤,拷貝的 Counter
接收值的訪問保護也是沒有意義的。
import "sync"type Counter struct { sync.Mutex n int64}// This method is okay.func (c *Counter) Increase(d int64) (r int64) { c.Lock() c.n += d r = c.n c.Unlock() return}// The method is bad. When it is called, a Counter// receiver value will be copied.func (c Counter) Value() (r int64) { c.Lock() r = c.n c.Unlock() return}
我們只需要改變 Value
接收類型方法為指針類型 *Counter
,就可以避免拷貝 Mutex
值。
在官方的 Go SDK 中提供的 go vet
命令將會報告潛在的錯誤值拷貝。
在錯誤的地方調用 sync.WaitGroup
的方法
每個 sync.WaitGroup
值維護一個內部計數器,這個計數器的初始值為 0。如果一個 WaitGroup
計數器的值是 0,調用 WaitGroup
值的 Wait
方法就不會被阻塞,否則,在計數器值為 0 之前,這個調用會一直被阻塞。
為了讓 WaitGroup
值的使用有意義,當一個 WaitGroup
計數器值為 0 時,必須在相應的 WaitGroup
值的 Wait
方法調用之前,去調用 WaitGroup
值的 Add
方法。
例如,下面的程序中,在不正確位置調用了 Add
方法,這將使最後列印出的數字不總是 100
。事實上,這個程序最後列印的數字可能是在 [0, 100)
範圍內的一個隨意數字。原因就是 Add
方法的調用並不保證一定會發生在 Wait
方法調用之前。
package mainimport ( "fmt" "sync" "sync/atomic")func main() { var wg sync.WaitGroup var x int32 = 0 for i := 0; i < 100; i++ { go func() { wg.Add(1) atomic.AddInt32(&x, 1) wg.Done() }() } fmt.Println("To wait ...") wg.Wait() fmt.Println(atomic.LoadInt32(&x))}
為讓程序的表現符合預期,在 for
循環中,我們將把 Add
方法的調用移動到創建的新協程的範圍之外,修改後的代碼如下。
... for i := 0; i < 100; i++ { wg.Add(1) go func() { atomic.AddInt32(&x, 1) wg.Done() }() }...
不正確使用 futures 信道
在 信道使用案例 的文章中,我們知道一些函數將返回 futures 信道。假設 fa
和 fb
就是這樣的兩個函數,那麼下面的調用就使用了不正確的 future 參數。
doSomethingWithFutureArguments(<-fa(), <-fb())
在上面的代碼行中,兩個信道接收操作是順序進行的,而不是並發的。我們做如下修改使它變成並發操作。
ca, cb := fa(), fb()doSomethingWithFutureArguments(<-c1, <-c2)
沒有等協程的最後的活動的發送結束就關閉信道
Go 程序員經常犯的一個錯誤是,還有一些其它的協程可能會發送值到以前的信道時,這個信道就已經被關閉了。當這樣的發送(發送到一個已經關閉的信道)真實發生時,將引發一個異常。
這種錯誤在一些以往的著名 Go 項目中也有發生,比如在 Kubernetes 項目中的 這個 bug 和 這個 bug。
如何安全和優雅地關閉信道,請閱讀 這篇文章。
在值上做 64 位原子操作時沒有保證值地址 64 位對齊
到目前為止(Go 1.10),在標準的 Go 編譯器中,在一個 64 位原子操作中涉及到的值的地址要求必須是 64 位對齊的。如果沒有對齊則導致當前的協程異常。對於標準的 Go 編譯器來說,這種失敗僅發生在 32 位的架構上。請閱讀 內存布局 去了解如何在一個 32 位操作系統上保證 64 位對齊。
沒有注意到大量的資源被 time.After
函數調用佔用
在 time
標準包中的 After
函數返回 一個延遲通知的信道。這個函數在某些情況下用起來很便捷,但是,每次調用它將創建一個 time.Timer
類型的新值。這個新創建的 Timer
值在通過傳遞參數到 After
函數指定期間保持激活狀態,如果在這個期間過多的調用了該函數,可能會有太多的 Timer
值保持激活,這將佔用大量的內存和計算資源。
例如,如果調用了下列的 longRunning
函數,將在一分鐘內產生大量的消息,然後在某些周期內將有大量的 Timer
值保持激活,即便是大量的這些 Timer
值已經沒用了也是如此。
import ( "fmt" "time")// The function will return if a message arrival interval// is larger than one minute.func longRunning(messages <-chan string) { for { select { case <-time.After(time.Minute): return case msg := <-messages: fmt.Println(msg) } }}
為避免在上述代碼中創建過多的 Timer
值,我們將使用一個單一的 Timer
值去完成同樣的任務。
func longRunning(messages <-chan string) { timer := time.NewTimer(time.Minute) defer timer.Stop() for { select { case <-timer.C: return case msg := <-messages: fmt.Println(msg) if !timer.Stop() { <-timer.C } } // The above "if" block can also be put here. timer.Reset(time.Minute) }}
不正確地使用 time.Timer
值
在最後,我們將展示一個符合語言使用習慣的 time.Timer
值的使用示例。需要注意的一個細節是,那個 Reset
方法總是在停止或者 time.Timer
值釋放時被使用。
在 select
塊的第一個 case
分支的結束部分,time.Timer
值被釋放,因此,我們不需要去停止它。但是必須在第二個分支中停止定時器。如果在第二個分支中 if
代碼塊缺失,它可能至少在 Reset
方法調用時,會(通過 Go 運行時)發送到 timer.C
信道,並且那個 longRunning
函數可能會早於預期返回,對於 Reset
方法來說,它可能僅僅是重置內部定時器為 0,它將不會清理(耗盡)那個發送到 timer.C
信道的值。
例如,下面的程序很有可能在一秒內而不是十秒時退出。並且更重要的是,這個程序並不是 DRF 的(LCTT 譯註:data race free,多線程程序的一種同步程度)。
package mainimport ( "fmt" "time")func main() { start := time.Now() timer := time.NewTimer(time.Second/2) select { case <-timer.C: default: time.Sleep(time.Second) // go here } timer.Reset(time.Second * 10) <-timer.C fmt.Println(time.Since(start)) // 1.000188181s}
當 time.Timer
的值不再被其它任何一個東西使用時,它的值可能被停留在一種非停止狀態,但是,建議在結束時停止它。
在多個協程中如果不按建議使用 time.Timer
值並發,可能會有 bug 隱患。
我們不應該依賴一個 Reset
方法調用的返回值。Reset
方法返回值的存在僅僅是為了兼容性目的。
via: https://go101.org/article/concurrent-common-mistakes.html
作者:go101.org 譯者:qhwdw 校對:wxy
本文由 LCTT 原創編譯,Linux中國 榮譽推出
推薦閱讀:
※《收穫》主編程永新罷看《文學報》對該報「新批評」專刊上發表的李建軍批評莫言的文章表示抗議
※小哥哥,學數控編程嗎?莫莫教你哦!
※C++性能榨汁機之指針與引用
※Ian Goodfellow 最新問答:11 歲學會編程,因為喜歡遊戲編程入坑深度學習
※python3零基礎快速通關