誰才是PHP實現延遲消息隊列的最佳CP?
延遲消息
大白話來說,就是實現一種計劃任務的定時機制,希望在設定的時間到達後才觸發發送消息。
使用場景
舉兩個使用場景:
1、人為的控制,比如訂單系統里,用戶下單後規定,30分鐘內沒有支付,則自動取消該訂單。
2、程序處理比較耗時,比如發郵件,當郵件內容比較大、收件人多或者網路不好等,都有可能導致比較耗時,無法立即返回發送狀態。
初步的解決方案
Linux 里有 Crontab 定時任務,Windows 也有計劃任務。
1、使用定時任務每分鐘執行一次PHP腳本;
2、該腳本根據當前時間去查詢數據表,把符合條件的記錄(即時間已到的記錄)查出來發送即可。
那麼,這裡要思考的問題是,如果每條記錄因業務場景不同可能會比較耗時,如果不做處理則會阻塞後面的消息送達,還有可能因為腳本中斷導致後續消息記錄無法發送,輕則影響後續消息的發送時間,重則導致大量消息記錄積壓。那麼,此時需要做進一步處理,把查出來的消息記錄扔進 Redis 隊列,需要另起PHP進程去輪詢 Redis 隊列,取出消息來發送。
新的問題
一、由於PHP無法實現定時器功能,什麼時候啟動PHP進程合適?是使用長駐的PHP進程還是使用定時任務每分種查詢一次隊列?
二、啟動多少PHP進程合適?
三、如果一條消息因PHP進程意外退出導致沒有發送成功,如何回滾?
其實,如果沒有嚴格時效要求,我們可以這樣。可以想像,最壞的情況是消息延遲2分鐘(即從數據表裡查出來最多延遲1分鐘,然後再從Redis隊列拿出來最多延遲1分鐘)發送,如此的話使用定時任務每分種啟動PHP進程來查詢Redis 隊列即可。當然,這並不是最好的方式。
帶著上面的問題,我們再來看看需求方更細化的需求:
1、要求可以在添加任務時任意指定延遲時間觸發任務。比如精確到 30 秒種以後,或者幾分鐘、幾小時、幾天以後;
2、這種任務會出現比較多,有些消息重要且時效性要求高。
這個時候,單單使用 Crontab 定時任務已經無法滿足需求,需要尋找更好的解決方案。既然想到了消息隊列,那麼,我們是否可以從這方面切入,找到一個可以實現定時器功能的消息隊列,取代 Crontab 這種無法精確到秒的定時任務機制?
常見的兩種消息隊列
其實說到消息隊列,可能大家都會想到比較常見的 Rabbitmq、Redis。好,來看看是否是我們想要的。
1,Rabbitmq,原生不支持消息延遲,需要通過其它方式模擬。
比如,使用 Time To Live (TTL) + Dead Letter Exchanges(DLX)。 即進入這種隊列的消息在一定時間內超時會進入 exchange,然後再使用定時器,定時從 exchange 撈出來。
也可以使用插件 rabbitmq-delayed-message-exchange 來實現
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
遺憾的是,我們最需要的是定時器,因為PHP很難去實現一個定時觸發器。
2,Redis,原生不支持延遲消息隊列,可以通過設置過期時間,定時去隊列里撈過期的消息,但是存在過期消息被回收的風險。
更好的解決方案
以上兩種中間件都沒有集成我們最需要的定時器,而PHP這方面確實比較弱,沒有辦法去實現一個友好的定時器。那業界有沒有其它的解決辦法呢?
有的,那就是 Beanstalkd,輕量級消息中間件,原生支持延遲消息隊列,延遲時間精確到秒,絕對是PHP實現延遲消息隊列的最佳CP。
Beanstalkd,一個高性能、輕量級的分散式內存隊列系統,最初設計的目的是想通過後台非同步執行耗時的任務來降低高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。
其內部實現採用 libevent,伺服器-客戶端之間類似 memcached 輕量級 tcp 通訊協議,因此有很高的性能,這裡有個外國人做的測試對比:
Beanstalkd 利用任務(job) 代替消息(message) 的概念,每一個任務都有以下幾種狀態:
READY:需要立即處理的任務,當延時 (DELAYED) 任務到期後會自動成為當前任務;
DELAYED: 延遲執行的任務, 當消費者處理任務後, 可以用將消息再次放回 DELAYED 隊列延遲執行;
RESERVED:已經被消費者獲取, 正在執行的任務。Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;
BURIED:保留的任務: 任務不會被執行,也不會消失,除非有人把它 「踢」 回隊列;
DELETED:消息被徹底刪除。
從生產者 - 消費者的角度去看狀態流轉:
從開發者開發的角度去看狀態流轉:
Beanstalkd 最大特點是基於 管道(tube)和 任務 (job)的工作隊列(work-queue),支持以下特性:
任務優先順序 (priority):
任務 (job) 可以有 0~2^32 個優先順序,0 代表最高優先順序。 beanstalkd 採用最大最小堆 (Min-max heap) 處理任務優先順序排序, 任何時刻調用 reserve 命令的消費者總是能拿到當前優先順序最高的任務, 時間複雜度為 O(logn)。
延時任務 (delay):
有兩種方式可以延時執行任務 (job):
1、生產者發布任務時指定延時;
2、當任務處理完畢後, 消費者再次將任務放入隊列延時執行 (RELEASE with <delay>)。這種機制可以實現分散式的 java.util.Timer,這種分散式定時任務的優勢是:如果某個消費者節點故障,任務超時重發 (time-to-run) 能夠保證任務轉移到另外的節點執行。
任務超時重發 (time-to-run):
Beanstalkd 把任務返回給消費者以後:消費者必須在預設的 TTR (time-to-run) 時間內發送 delete / release/ bury 改變任務狀態,否則 Beanstalkd 會認為消息處理失敗,然後把任務交給另外的消費者節點執行。如果消費者預計在 TTR (time-to-run) 時間內無法完成任務,也可以發送 touch 命令,它的作用是讓 Beanstalkd 從系統時間重新計算 TTR (time-to-run)。
任務預留 (buried):
如果任務因為某些原因無法執行,消費者可以把任務置為 buried 狀態讓 Beanstalkd 保留這些任務。管理員可以通過 peek buried 命令查詢被保留的任務,並且進行人工干預。簡單的, kick <n> 能夠一次性把 n 條被保留的任務踢回隊列。
下面來看看如何與 PHP 結合使用,解決前面提到的問題。
這裡推薦個簡潔的 PHP 客戶端庫:davidpersson/beanstalk https://github.com/davidpersson/beanstalk
我們需要一個生產者和一個消費者。把消息扔進消息隊列即為生產者,取出消息來處理即為消費者。
一個簡易的生產者:
public function producer()n{n $this->beanstalkd->useTube(default);n $n = 1;n while ($n) {n $delay = mt_rand(0, 30);n $this->beanstalkd->put(n 2, // priority.n $delay, // delay. 秒數n 3, // run timen "beanstalkd $n delay $delay" // The jobs body.n );n $n --;n }n}n
一個簡易的消費者:
public function consumer()n{n $this->beanstalkd->watch(default);n $limit = 10;nn echo start consumer .chr(10);n while ($limit) {n $job = $this->beanstalkd->reserve(5); // reserve 會阻塞進程,適當設置超時時間,比如 5 秒超時後進入下一次等待nn var_dump($job);nn if ($job) {nn //$jobStats = $this->beanstalkd->statsJob($job[id]);n $this->beanstalkd->delete($job[id]);n sleep(5);n// if ($jobStats[reserves] > 8) {n// $this->beanstalkd->bury($jobStats[id], $jobStats[pri]);n// }n cilog($job);n echo chr(10) . $limit . chr(10);n $limit --;n }n }nn echo end .chr(10);n}n
從代碼中可以看到,其實消費者進程是一個阻塞進程,使用一個循環去監聽行等待 beanstalkd 返回消息,拿到消息後再進程處理。
那為什麼是阻塞進程呢?
這是因為連接 Beanstalkd 服務端的客戶端是用 `fsockopen/pfsockopen` 去連接通信的,默認情況下採用阻塞模式開啟套接字連接,發送請求指令後將阻塞程序以等待響應。另外一個原因,這也是業務的需要,我們總是希望有一個進程去監聽服務端給我們返回的消息,以便拿到消息後進行處理,然後進入下一次等待,而不是執行一次就退出,或者說在服務端沒有返回消息時我們的消息處理程序還在不斷的循環執行,浪費資源。
在實際應用中,可能會產生各種類型的消息,消費者也會存在多個進程。所以我們還要考慮更為複雜的情況,比如:
1、一個消息執行超時了我們應該如何處理(包括消息發送失敗或PHP進程意外退出的情況)?
由於Beanstalk的運行機制,一個job,即一條消息取出後如果不手動刪掉或置為其它狀態,則該消息將重回消息隊列,由其它消費者程序處理。所以,為了避免一條消息重複處理,取出一條消息後,需要判斷是否已經處理過,以及處理完一條消息後應該刪除或置為其它狀態。
Beanstalkd 的每個 job 都有記錄被消費者讀取的次數,以及超時的次數,更多信息如下圖:
2、一個消費者進程每次啟動後執行多少條消息合適?或者說一個消費者進程持續運行多長時間比較合適?
這裡主要是為了PHP進程能在執行一段時間後自動退出,因為PHP不適合做一個常駐進程,PHP的設計目的也並非是後台服務,所以,更好的辦法是在跑了一段時間後自動退出,新起一個進程。我們可以通過設置執行消息的次數以及持續運行的時間來讓進程自動退出。你可能會說因為阻塞所以根本沒法實現讓程序在執行過程中判斷次數和運行時間。放心,Beanstalkd 可以在監聽服務端的時候設置超時間,即使用 reserve with timeout 來預訂 job,設定後,在監聽超時後將會進入下一次循環。
另外,Beanstalkd 也可以開啟binlog,如果遇到 Benstalkd 進程因為某些原因掛了,或者機器需要重啟時,Beanstalkd 都能輕鬆地從 binlog 恢復這些消息。然而,總有一些消息是比較重要的,我們需要詳細記錄這個消息的發送情況,這就需要我們把消息落地,記錄到資料庫中,下圖是一個記錄消息狀態的架構圖。
至此,我們已經解決了前面提到的問題一和問題三。
針對問題二,我們需要根據業務量增加或減少消息處理進程。為了更好地管理這些處理進程,推薦使用 supervisor 進程管理器,可以輕鬆地解決下面幾點:
1、把不同類型的消息處理進程分組;
2、很方便的設定啟動進程的數量;
3、自動維護每個進程。
參考資料:
https://github.com/kr/beanstalkd/blob/master/doc/protocol.zh-CN.md
http://in355hz.iteye.com/blog/1395727
http://blog.csdn.net/u014308482/article/details/53036770
推薦閱讀:
※2017 年了,這麼多前端框架,你會怎樣選擇?
※編寫webpack 插件
※前端周刊第62期:學習學習再學習
※為什麼 ++[[]][+[]]+[+[]] = 10?