Golang Websocket 實踐

這裡先簡單介紹一下 websocket,確實只是簡單介紹一下。

1. 應用場景

有些場景下,比如交易 K 線,我們需要前端對後端進行輪詢來不斷獲取或者更新資源狀態。輪詢的問題毫無以為是一種笨重的方式,因為每一次 http 請求除了本身的資源信息傳輸外還有三次握手以及四次揮手。替代輪詢的一種方案是復用一個 http 連接,更準確的復用同一個 tcp 連接。這種方式可以是 http 長連接,也可以是 websocket。

2. websocket 和 http 長連接的區別

首先 websocket 和 http 是完全不同的兩種協議,雖然底層都是 tcp/ip。http 長連接也是屬於 http 協議。http 協議和 websocket 的最大區別就是 http 是基於 request/response 模式,而 websocket 的 client 和 server 端卻可以隨意發起 data push,比如服務端向 app 端的消息下發就比較適合使用 websocket(這種場景下使用 http 長連接也是可以,client 端定時向 server 端發送消息,比如 heatbeat,然後 server 端要 push 的消息以 response 的形式返回給 client)。

這裡 gist.github.com/legendt 我寫一個 github gist 代碼片段,給大家體驗一下。

3. Golang 最佳實踐

這裡先定義一下我們的使用場景:交易所有很多數據,比如 K 線,比如盤口數據都是在定時刷新的,這裡就可以用 websocket 來做。簡單來說,前端向後端請求特定的數據,比如 K 線數據,前端和後端建立 websocket 連接,後端持續不斷返回信息給前端。

在我們編寫 websocket 介面之前,需要略微考慮一下如何抽象,如何設計我們 websocket 框架從而保證代碼的良好的擴展性。

3.1 Hub

首先 hub 是什麼東西,下圖是 google image 查出來的結果。簡單做個類比,圖片中的 USB 3.0 口(藍色)就相當於一個個 tcp 連接,上面匯總的介面就是我們 hub 的上流數據源。

在我第一時間想去定義 hub 的粒度想到的是使用 controller,也就是請求的 router。但是後來想了一下這樣設計太複雜了,因為一個 router 的參數有很多種,不同參數可能就對應不同數據。

那麼應該怎麼去定義呢?不是從功能性上去定義,而是從數據源上定義。我們只要簡單看一下需要提供多少類不停更新的數據,這裡的每一類就對應一個 hub。

3.2 Broadcast

通過 3.1 我們定義了 hub,下面要考慮的就是如何去做廣播。

最簡單的方式遍歷一個 hub 上面所有的 conn 然後進行 conn.Write()。這種方法非常的簡單粗暴,問題也很明顯:每個 conn.Write() 都是一個網路 IO,我們這是在串列地處理多個網路 IO,低效。

串列改並行。我們還是遍歷 hub 上面所有的 conn,然後每一個 conn.Write() 起一個 goroutine 去做,這樣其實就是 IO 多路復用。

思考一下上面這種方式還有沒有問題。其實是有的:擴展性的問題。如果 websocket 的介面參數比較多,我們要根據參數對不同的 conn 返回不同的結果,那麼應該怎麼做的?也很簡單,對上面的 conn 進行一次封裝,封裝成一個 struct。我在很久以前一篇文章討論函數的擴展性的時候也說過將函數形參設計成 struct 是一種不錯的擴展方式。

3.3 Hub 數據感知

接 3.2,broadcast 的數據怎麼得到,主動去信息源拉,還是別人 push 過來?最簡單的實現方式是構造生產者-消費者模型,而在 golang 中實現生產者-消費者模型尤其簡單。結合到我們這裡,我們只需要在 hub 中定一個 channel 即可。

我的理解,要廣播的數據如何生存應該都是業務邏輯,不應該和基礎框架耦合在一起。

4. talk is cheap, show me the code

代碼以下面兩個 package 為例:

  1. github.com/astaxie/beeg
  2. github.com/gorilla/webs

Controller 處理。

type WsController struct { beego.Controller}var upgrader = websocket.Upgrader{ ReadBufferSize: maxMessageSize, WriteBufferSize: maxMessageSize,}func (this *WsController) WSTest() { defer this.ServeJSON() ws, err := upgrader.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil) // 這裡 ws 就是 websocket.Conn,是 websocket 對 net.Conn 的封裝 if err != nil { this.Data["json"] = "fail" return } // WsClient 是我們對 websocket.Conn 的再一層封裝,後面細說 wsClient := &WsClient{ WsConn: ws, WsSend: make(chan []byte, maxMessageSize), HttpRequest: this.Ctx.Request, //記錄請求參數 } service.ServeWsExample(wsClient)}

WsClient 結構。

type WsClient struct { WsConn *websocket.Conn WsSend chan []byte HttpRequest http.Request }

WsClient 有兩個基本方法:對 client 端發送過來的數據進行處理,以及對 server 端下發的數據進行處理。這使用函數作為參數,也是為了實現最大的靈活性,但是函數參數的設計不一定是最合適的,如果大家有更合適的,歡迎指教。

func (client *WsClient) ReadMsg(fn func(c *WsClient, s string)) { for { _, msg, err := client.WsConn.ReadMessage() if err != nil { break } fn(client, string(msg)) }}func (client *WsClient) WriteMsg(fn func(s string) error) { for { select { case msg, ok := <-client.WsSend: if !ok { client.WsConn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := fn(string(msg)); err != nil { return } } }}

Hub。

type WsHub struct { Clients map[*WsClient]bool // clients to be broadcast Broadcast chan string Register chan *WsClient UnRegister chan *WsClient LastMsg string // 最近一次的廣播內容。如果我們是 1 分鐘廣播一次,新來一個請求還沒有到廣播的時間,就返回最近一次廣播的內容 Identity string //可以用作做標誌}

Hub 包括一個 export 的 Run 方法和一個私有方法 broadCast()。

func (hub *WsHub) Run() { for { select { case c := <-hub.Register: hub.Clients[c] = true c.WsSend <- []byte(hub.LastMsg) break case c := <-hub.UnRegister: _, ok := hub.Clients[c] if ok { delete(hub.Clients, c) close(c.WsSend) } break case msg := <-hub.Broadcast: hub.LastMsg = msg hub.broadCast() break } }}func (hub *WsHub) broadCast() { for c := range hub.Clients { select { case c.WsSend <- []byte(hub.LastMsg): break default: close(c.WsSend) delete(hub.Clients, c) } }}

我們現在把 client 和 hub 串起來,也就是第一個例子中的 service.ServeWsExample(wsClient)

// 初始化func initWs() { WsHubs = make(map[string]*util.WsHub) hubList := []string{"hub1", "hub2", "hub2"} for _, hub := range hubList { WsHubs[hub] = &WsHub { Clients: make(map[*util.WsClient]bool), Broadcast: make(chan string), Register: make(chan *util.WsClient), UnRegister: make(chan *util.WsClient), //Identity: hub.String(), } go mockBroadCast(WsHubs[hub].Broadcast) go WsHubs[hub].Run() }}func mockBroadCast(broadCast chan string) { for { broadCast <- "hello world" time.Sleep(time.Second * 10) }}// controller 請求路由到相應的 ServeWsExample 函數func ServeWsExample(c *util.WsClient, pair string) { defer func() { WsHubs[pair].UnRegister <- c c.WsConn.Close() }() WsHubs[pair].Register <- c go c.WriteMsg(func(string) error {}) c.ReadMsg(func(*WsClient, string){})}

還有一點需要說明的是,這裡沒有寫出生成者(也就是向 Hub 發送數據的進程),因為生產者的寫法比較靈活,這裡還是簡單寫一個吧。

//initfunc init() { go Producer()}// 生產者func Producer() { for { // generate msg msg := "hello, I am legendtkl" // select the proper hub to send the msg WsHubs["hub1"].Broadcast <- msg }}

5. 寫在最後

工作之後一直思考的一個問題是,怎麼衡量代碼的擴展性以及如何寫出高擴展性的代碼?歡迎交流。

上面的 demo 代碼如果有需要,我打包一下傳到 github。


推薦閱讀:

springboot + websocket 例子
tornado如何實現非同步websocket推送?
一步一步教您用websocket+nodeJS搭建簡易聊天室(4)
TCP、WebSocket等網路協議簡單分析

TAG:Go語言 | WebSocket |