如何形象的描述反應式編程中的背壓(Backpressure)機制?

按照https://github.com/ReactiveX/RxJava/wiki/Backpressure的描述和人講,大多數人很難聽懂。特別是Throttling、throttleLast、throttleFirst、debounce等概念。


上周日在知乎舉辦的線下 Meetup 中我重點講了這個概念。當時重點講它的原因就是,我發現目前網上大多數對 Backpressure 的解釋都是錯誤的。如果你認為你對 Backpressure 「有一定的理解,但不太能講清楚」,那麼你很可能其實並沒有理解它,因為 Backpressure 其實是一個非常簡單的概念

首先,Backpressure 並不是響應式編程(Reactive Programming,或者有的人喜歡按字直譯為「反應式編程」)獨有的;其次,Backpressure 並不是一種「機制」,也不是一種「策略」。Backpressure 其實是一種現象:在數據流從上游生產者向下游消費者傳輸的過程中,上游生產速度大於下游消費速度,導致下游的 Buffer 溢出,這種現象就叫做 Backpressure 出現。

編程中的 Backpressure 這個概念源自工程概念中的 Backpressure:在管道運輸中,氣流或液流由於管道突然變細、急彎等原因導致由某處出現了下游向上游的逆向壓力,這種情況稱作「back pressure」。這是一個很直觀的詞:向後的、往回的壓力——back pressure。可是,國內的熱力工程界對這個詞的正式翻譯是「背壓」,把「back」翻譯成了「背」,著實有點讓人無力吐槽。

相輔相成地,由於「back pressure」被國內翻譯為了「背壓」,那麼這個概念對於國內的程序員來說就更加難懂了,很多人對此或多或少加了一些自己的猜測:「背壓?來自背後的壓力?是說上游給下游的壓力太大了嗎?」

其實程序開發中的 Backpressure ,只是一種和工程上的 back pressure 相似的概念,我在這裡再重複一遍:在數據流從上游生產者向下游消費者傳輸的過程中,上游生產速度大於下游消費速度,導致下游的 Buffer 溢出,這種現象就叫做 Backpressure 出現。需要強調的是:這句話的重點不在於「上游生產速度大於下游消費速度」,而在於「Buffer 溢出」

Backpressure 和 Buffer 是一對相生共存的概念,只有設置了 Buffer,才有 Backpressure 出現;只要設置了 Buffer,一定存在出現 Backpressure 的風險。

不懂?我舉個實際的例子。

例如你是開發伺服器後端的,有一個 Socket 不斷地接收來自用戶的 http 請求來把用戶需要的網頁返回給用戶。你的伺服器所能承受的同時訪問用戶數是有上限的吧?比如說,你的伺服器主機的處理器和內存情況決定了,它最多只能承受 5000~6000 個用戶同時訪問,再多的話伺服器就有當掉的風險了。那麼你決定:把用戶數上限設置為 5000,當超出 5000 用戶數的時候,再有新的訪問就把它丟棄或者拒絕。那麼對於這個案例,5000 就是你對於用戶訪問數設置的 Buffer;第 5001 個用戶的訪問,就叫做造成了 Backpressure 的產生;而你的「丟棄或拒絕」的行為,就是對於 Backpressure 的處理。

我來多問幾個問題來把事情說得更加透徹一點。

為什麼要設置 Buffer?

因為下游消費速度小於上游生產速度(對用戶訪問的處理速度小於新訪問的出現速度)。

為什麼要丟棄 Backpressure 出現時的新事件?

  1. 因為處理不過來(本來就是因為處理不過來,所以才設置了 Buffer 的)
  2. 因為事件可丟棄

有人說了,卧槽卧槽要死要死,你敢說用戶的請求可以丟棄?打你哦。

是的,就是可以丟棄。由於消費速度可能會小於生產速度,所以才設置了 Buffer;而由於一些外部條件的限制(例如主機內存大小),所以 Buffer 需要有上限;而當 Backpressure 出現時,你其實已經在面臨「要麼丟棄新事件,要麼系統崩潰」的選擇。所以說是選擇,其實根本沒得選,只能選擇丟棄新事件。

所以明白了嗎?

生產速度大於消費速度,所以需要 Buffer;

外部條件有限制,所以 Buffer 需要有上限;

Buffer 達到上限這個現象,有一個簡化的等價詞叫做 Backpressure;

Backpressure 的出現其實是一種危險邊界,唯一的選擇是丟棄新事件。

這就是 Backpressure 的本質。

再給個實用性的總結:

永遠不要用「上游生產速度是否大於下游消費速度」來判斷你的某個模塊是否需要 Backpressure 的支持,因為現實場景是不可預估的,生產速度總是有一定的可能會大於下游消費的速度,所以 Buffer 是永遠需要的。再所以:

  1. 只要你的上游生產速度不會快到把系統搞崩潰,那麼不用設置 Buffer 上限(從而也就不用考慮 Backpressure),隨它去吧。例:按鈕點擊事件與處理點擊 -&> 就算這個事件處理很慢,就算這個用戶的手點抽筋了,他能點多快?
  2. 只有上游生產速度可能會快到把系統搞崩潰,並且事件是可以丟棄的,才需要設置 Buffer 上限。當 Buffer 有上限的時候,Backpressure 也就存在了出現的可能。一旦 Backpressure 出現,只能選擇丟棄,只是具體的丟棄策略可能不同(全部丟棄、只保留最新的一個而丟棄其餘的等等,但丟棄是不變的基本原則)。例:前面提到的服務端處理用戶請求。
  3. 如果上游生產速度可能會快到把系統搞崩潰,而事件也不可丟棄,怎麼辦?這個時候,你就要修改程序的設計了:修改代碼設計來規避風險,或者修改軟體設計、通過讓步的方式來從根源上避免問題發生。總之,這已經不是 Buffer 或者 Backpressure 能解決的問題了。

總結:Backpressure 指的是在 Buffer 有上限的系統中,Buffer 溢出的現象;它的應對措施只有一個:丟棄新事件。

Backpressure 只是一種現象,而不是一種機制;至於你說的 throttleFirst、debounce ,更不是某個機制中的一環,它們只是可以通過人為過濾的方式來降低生產速度,從而降低 Backpressure 出現的幾率罷了。*註:它們並不是專門用來降低生產速度的,只是可以這麼用。


謝邀。說下我個人的理解吧,如果有不確切的地方,歡迎交流。

在 Rx 中,只要生產者生產完數據,就把數據發出去了。同步的情況下,先後事件的消費會阻塞等待,這樣當然不會有什麼問題;但是非同步情況下,問題就來了,消費者消費的速度趕不上生產者發射數據的速度,消費者沒能力去消費生產者發過來的消息了,生活中供大於求,就會造成通貨膨脹,在 RxJava 里也一樣,供大於求,就會拋出異常(rx.exceptions.MissingBackpressureException),需要一些『宏觀調控』的政策。一些常見場景:快速點擊、資料庫查詢、鍵盤輸入、網路請求等等。

既然明確了問題癥結所在,那麼相應的解決方案就顯而易見了,既然『供大於求』,那麼

①要麼減少單位時間的供給;

②要麼浪費掉;

③要麼存起來,等到有能力了再去消費;

④要麼提高消費能力。

在 Rx 里,觀察者和被觀察者都已經確定了,要提高消費者(即觀察者)的消費能力,顯然是不行了,所以就剩下其他的解決方案了。

  1. 既然數據發送的過快,就可以選擇性地丟棄一些數據,題主提到的幾個操作符都是這種方案(throttle,debounce,onBackpressureDrop);
  2. 將多餘的暫時無法處理的數據緩存起來,直到消費者有能力處理(buffer,window,onBackpressureBuffer);
  3. 從生產者入手,降低生產者發送數據的速度。在 Subscriber 里有 request(long n) 方法,在 onStart 里調用這個方法,告訴生產者,消費者每次可以處理多少數據;在 onStart 里調用 request(0),然後通過提供 requestMore(long n) 方法去調用 request(n) ,可以實現生產者 lazy push。


謝邀。

看到這個問題我著實地想了半天,因為不太好回答。

題主肯定是能看懂那個文檔的,只是需要一個「形象的描述」。可以想像,ReactiveX本來是從眾多場景中「抽象」出來的,如果要用「形象的描述」來表達它,其實需要的是例子和應用場景。那下面我盡量以舉例子的方式來說一下我的理解。

首先,從大的方面說,這篇文檔的名字,雖然叫「Backpressure」(背壓),但卻是在講述一個更大的話題,「Flow Control」(流控)。Backpressure只是解決Flow Control的其中一個方案。

就像小學做的那道數學題:一個水池,有一個進水管和一個出水管。如果進水管水流更大,過一段時間水池就會滿(溢出)。這就是沒有Flow Control導致的結果。

而解決Flow Control有幾種思路呢?

(1)Backpressure,就是消費者需要多少,生產者就生產多少。這有點類似於TCP里的流量控制,接收方根據自己的接收窗口的情況來控制接收速率,並通過反向的ACK包來控制發送方的發送速率。這種方案只對於cold Observable有效。cold Observable是那些允許降低速率的發送源,比如兩台機器傳一個文件,速率可大可小,即使降低到每秒幾個位元組,只要時間足夠長,還是能夠完成的。相反的例子就是音視頻直播,速率低於某個值整個功能就沒法用了(這種類似於hot Observable)。

(2)節流(Throttling),說白了就是丟棄。消費不過來,就處理其中一部分,剩下的丟棄。至於處理哪些和丟棄哪些,就有不同的策略,也就是sample (or throttleLast)、throttleFirst、debounce (or throttleWithTimeout)這三種。還是舉音視頻直播的例子,在下游處理不過來的時候,就需要丟棄數據包。

(3)打包(buffer和window)。buffer和window基本一樣,只是輸出格式不太一樣。它們是把上游多個小包裹打成大包裹,分發到下游。這樣下游需要處理的包裹的個數就減少了。

(4)是一種特殊情況,阻塞住整個調用鏈(Callstack blocking)。之所以說這是一種特殊情況,是因為這種方式只適用於整個調用鏈都在一個線程上同步執行,這要求中間的各個operator都不能啟動新的線程。在平常使用中這種應該是比較少見的,因為我們經常使用subscribeOn或observeOn來切換執行線程,而且有些複雜的operator本身也會內部啟動新的線程來處理。另外,如果真的出現了完全同步的調用鏈,前面的(1)(2)(3)仍然有可能適用的,只不過這種阻塞的方式更簡單,不需要額外的支持。

舉個例子比較一下(1)和(4)。(4)相當於很多車行駛在盤山公路上,而公路只有一條車道。那麼排在最前面的第一輛車就擋住了整條路,後面的車也只能排在後面。而(1)相當於銀行辦業務時的窗口叫號,窗口主動叫某個號過去(相當於請求),那個人才過去辦理。

然後,從細的方面解釋一下sample,throttleFirst,debounce。以及onBackpressureBuffer,onBackpressureDrop,onBackpressureBlock和ConnectableObservable(multicast)。

sample就是throttleLast,採樣。類比一下音頻採樣,8kHz的音頻就是每125微秒采一個值。sample可以配置成,比如每100毫秒採樣一個值,但100毫秒內上游可能過來很多值,選那個值呢,就是選最後那個值。所以它也叫throttleLast。

throttleFirst跟sample類似,比如還是每100毫秒採樣一個值,但選這100毫秒內的第一個值。

debounce,也叫throttleWithTimeout,名字里就包含一個例子。比如,一個網路程序維護一個TCP連接,不停地收發數據,但中間沒數據可以收發的時候,就有間歇。這段間歇的時間,可以稱為idle time。當idle time超過一個預設值的時候,就算超時了(timeout),這個時候可能就需要把連接斷開了。實際上一些做server端的網路程序就是這麼工作的。每收發一個數據包之後,啟動一個計時器,等待idle time過去之後的超時,如果計時器到時之前,又有收發數據包的行為,那麼計時器重置,等待一個新的idle time。當計時器到時了,就time out了,這個連接就可以關閉了。debounce的行為,跟這個非常類似,可以用它來找到連續的收發事件之後idle time超時後的timeout事件。

最後還有一個新的問題需要說明。Backpressure有些Observable是支持的,有些不支持。但它們可以通過operator來轉化。

onBackpressureBuffer,onBackpressureDrop,onBackpressureBlock就可以把一個不支持Backpressure的Observable轉成一個支持Backpressure的Observable(即支持request請求)。但轉完之後的策略不太相同。

onBackpressureBuffer是不丟棄數據的處理方式。把上游收到的全部緩存下來,等下游來請求再發給下游。相當於一個水庫。但上游太快,就會buffer溢出。

onBackpressureDrop就是當上游來數據的時候,看下游有沒有需求,有需求就發給下游,否則上游來的數據就丟掉。

onBackpressureBlock也是看下游有沒有需求,下游沒有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的情況了),自己並不緩存。

相反,有時候一些operator也能把一個支持Backpressure的Observable變成一個不支持Backpressure的Observable。比如,ConnectableObservable就是這樣。它類似於把一條河的主幹,在下游分成若干支流(但不太一樣的是每條支流的水量都跟主幹一樣,是拷貝的)。那麼很好理解,下游某個支流想對上游產生背壓,是不太可能的,它阻止不了水流流向其它支流。


首先要明白 Rx 中的冷信號與熱信號,冷信號類似於拉取模型,通常都是接收到請求後才生成信號,所以一般不存在背壓的問題(如網路請求等)。而熱信號則是會主動產生數據(推送模型,不管消費者是否請求,如滑鼠移動事件等),當熱信號產生的速度遠大於訂閱者消費的速度,就會產生不平衡,過多的熱信號會擠壓,這時就需要一種背壓策略來解決這個問題。

Rx 提供了幾種解決方法:拋出異常、緩存、丟棄。這些策略都非常通俗易懂,緩存顧名思義,就是將未能來得及消費的信號暫時存在一個大小有限的 buffer 里,供訂閱者慢慢消費,這通常用於偶發性背壓。丟棄則是將來不及消費的信號直接忽略掉。

Rx 同時也提供了幾種 operator 來解決同類問題,比較典型的就是 throttle 和 debounce,但這幾種 operator 不管訂閱者消費速度如何,總是生效,也許訂閱者消費速度很快,但信號也會被節流。如果想靈活地應對這種生產消費速率不平衡的問題,就要配合相應 API 來使用背壓策略。

另外補充一下 throttle 和 debounce 的區別。

throttle:簡單來說,就是控制下游信號的產生頻率,結果就是信號總是每隔一段時間產生一次(當然前提是此時要有緩存的上游信號)。

debounce:等待上游信號穩定後再產生下游信號。結果就是只有當一個上游信號發出並停留一段時間沒有其他信號發出時才會產生下游信號。


簡單來說,背壓就是一種反饋機制。在一般的Push模型中,發布者既不知道也不關心訂閱者的處理速度,當數據的發布速度超過處理速度時,需要訂閱者自己決定是緩存還是丟棄。如果使用RP,決定權就交回給發布者,訂閱者只需要根據自己的處理能力問發布者請求相應數量的數據。你可能會問這不就是Pull模型嗎?其實是不同的。在Pull模型中,訂閱者每次處理完數據,都要重新發起一次請求拉取新的數據,而使用背壓,訂閱者只需要發起一次請求,就能連續不斷的重複請求數據。

下面兩張圖,幫助你更形象的理解什麼是背壓。

圖片出處:Dataflow and simplified reactive programming

兩張圖乍一看沒啥區別,但其實是完全兩種不同的背壓策略。第一張圖,發布速度(100/s)遠大於訂閱速度(1/s),但由於背壓的關係,發布者嚴格按照訂閱者的請求數量發送數據。第二張圖,發布速度(1/s)小於訂閱速度(100/s),當訂閱者請求100個數據時,發布者會積滿所需個數的數據再開始發送。可以看到,通過背壓機制,發布者可以根據各個訂閱者的能力動態調整發布速度。

進一步解讀可以參考響應式編程總覽


打個比方,兩個人填寫出貨單,A添加商品在一張紙質單據上,然後給B填寫買家地址.

A和B在同一個線程上,相當於A和B只有一隻筆,得輪著用.

A和B在不同線程,相當於各有各的筆.

buffer就A寫完了,放在一個卡盒尾部,B從卡盒頭取出.

backpressure就是說A寫的快,不能總是立刻放在盒子里,要看B的情況.

這個比喻應該能覆蓋rx的各種情況了吧,有空我再細化一下各種情景.


flow control:流控

back pressure: 反壓


推薦閱讀:

怎麼去矽谷做碼農?
圖像處理用python 還是matlab?
如何學習程序結構力學?

TAG:編程 | 反應式編程ReactiveProgramming |