基於消息傳遞的並發模型
來自專欄升級之路7 人贊了文章
An object oriented language is a language with good support for objects.
A concurrency oriented language has good support for concurrency.--Joe Armstrong
兩類通用並發模型:參考七周七並發模型
- 共享內存型Shared Memory
- 線程Threads
- 鎖Locks
- 互斥l量Mutexes
- 消息傳送型(CSP和Actor模型)
- 進程Processes
- 消息Messages
- 不共享數據(狀態)No shared data
重點介紹消息傳送型的兩種模型Actor和CSP(Communicating Sequential Process)的各項對比,主要目的:除了常用的Python、Java等用的並發模型之外,還存在這麼個東西。
先看兩段代碼
代碼示例對比
使用Erlang代碼和Go代碼分別實現列印服務print_server,用來對比模型使用差異
Actor模型-Erlang代碼
%%%-------------------------------------------------------------------%%% @author Suncle%%% @doc%%% print_server%%% @end%%% Created : 2017/12/18 14:53%%%--------------------------------------------------------------------module(print_server).-author("Flowsnow").%% API-export([print_server/0, start_print_server/0, send_msg/2]).print_server() -> receive Msg -> io:format("print_server received msg: ~p~n", [Msg]), print_server() end.start_print_server() -> Pid = spawn(?MODULE, print_server, []), Pid.send_msg(Msg, Pid) -> Pid ! Msg, io:format("send_normal_msg: ~p~n", [Msg]).
Erlang shell輸出結果如下:
1> c("print_server.erl").{ok,print_server}2> Pid = print_server:start_print_server().<0.39.0>3> print_server:send_msg("hello", Pid).send_normal_msg: "hello"print_server received msg: "hello"ok
以上print_server使用的是最原始的Erlang語法實現的,也可以使用OTP gen_server原語實現更加清晰易懂。
CSP模型-Go代碼
print函數從channel讀取消息並阻塞,直到主函數向channel寫入hello消息
package mainimport ( "fmt" "time")func main() { c := make(chan string) go print(c) time.Sleep(1 * time.Second) fmt.Println("main function: start writing msg") c <- "hello" var input string fmt.Scanln(&input)}func print(c <-chan string) { fmt.Println("print function: start reading") fmt.Println("print function: reading: " + <-c) time.Sleep(1 * time.Second)}
輸出結果如下:
D:workspaceGo>go run print_server.goprint function: start readingmain function: start writing msgprint function: reading: hello
模型圖對比
Actor
Actor1發送消息到Actor2的郵箱中,郵箱本質是隊列,由Actor2消費
CSP
Process1在Channel的寫入端添加消息,Process2在channel的讀取端讀取消息
基本特性對比
Actor
- 基於消息傳遞message-passing
- 消息和信箱機制:消息非同步發送
- 保留可變狀態但不共享
- 失敗檢測和任其崩潰
- 重點在於發送消息時的實體
CSP
- 基於消息傳遞message-passing
- 順序進程Sequential processes
- 通過channel同步通信Synchronous communication through channels
- 頻道交替復用Multiplexing of channels with alternation
- 重點在於發送消息時使用的通道channel
通信語義對比
Actor
Actor1等待消息並阻塞,直到Actor2發送消息給Actor1
Actor2發送消息給Actor3,暫存在Actor3的Mailbox中,直到Actor3接受並處理
CSP
Process1讀取channel因沒有消息阻塞,直到Process2向該channel添加消息
Process2向channel添加消息並阻塞,直到Process3讀取該channel消息
Erlang實現簡易銀行賬戶
使用Erlang原語,代碼如下:
https://gist.github.com/Flowsnow/5da4565718bb6c3ec3f0a79cfedf0b00使用OTP的gen_server,代碼如下:
https://gist.github.com/Flowsnow/18a580313ac0b7ea54e5eddd9e2b2265Erlang小項目:IP資料庫
使用Erlang/OTP實現的IP資料庫,可以根據IP查詢到具體的國家省份等,代碼如下:
Flowsnow/ip_db不一樣的Erlang特性
- Let it crash思想:值得借鑒
- 變數是不可變的,變數一旦賦予值就無法再改變:帶來的好處就是沒有可變狀態,就不需要內存共享,也就不需要有鎖
- Erlang進程之間的唯一交互方式就是消息傳遞:Erlang中沒有像C++那樣,進程間擁有多種不同的交互方式(管道、消息隊列、存儲共享等等)
FAQ
為什麼沒有容量自動增大的緩衝區?
即使現在有一個看上去永不枯竭的資源,總有一天這個資源還是會被用盡的。可能是因為時過境遷,當初的老程序現在需要解決更大規模的問題;也可能是存在一個bug,消息沒有被及時處理,導致被堆積。如果沒有思考緩衝區塞滿時的對策,那麼在未來的某個時間就有可能出現一個破壞性極強,隱蔽性極深且難以診斷的bug。最好的策略是在現在就思考如何處理緩存區被塞滿的情況,將問題消滅在萌芽階段。
因此常用的緩存區類型有三種:阻塞型(blocking),棄用新值型(dropping),移出舊值型(sliding)
Python有什麼消息傳遞並發模型?
Actor模型pykka:https://github.com/jodal/pykka
CSP模型pycsp:https://github.com/runefriborg/pycsp/wiki/Getting_Started_With_PyCSP
圖片均來源於here!
參考:
- Communicating Sequential Processes (CSP)-An alternative to the actor model
- Concurrency Oriented Programming In Erlang-Joe Armstrong.pdf
推薦閱讀:
※Go並發調度器解析之實現一個協程池
※Galera Cluster—MySQL新型的高並發集群架構
※高並發下MySQL架構性能升級
※互聯網高並發大流量訪問的處理及解決方法
※golang簡單key/value資料庫(二)