響應式編程(上):總覽
作者 | Emac
杏仁醫生架構師兼平台組負責人,關注微服務、DevOps領域。
引子:被譽為「中國大數據第一人」的塗子沛先生在其成名作《數據之巔》里提到,摩爾定律、社交媒體、數據挖掘是大數據的三大成因。IBM 的研究稱,整個人類文明所獲得的全部數據中,有 90% 是過去兩年內產生的。在此背景下,包括 NoSQL,Hadoop,Spark,Storm,Kylin 在內的大批新技術應運而生。其中以 RxJava 和 Reactor 為代表的響應式(Reactive)編程技術針對的就是經典的大數據 4V 定義(Volume,Variety,Velocity,Value)中的 Velocity,即高並發問題,而在剛剛發布的 Spring 5 中,也引入了響應式編程的支持。我將分上下兩篇與你分享與響應式編程有關的一些學習心得。本篇是上篇,以 Reactor 框架為例介紹響應式編程的幾個關鍵特性。
1. 響應式宣言
和敏捷宣言一樣,說起響應式編程,必先提到響應式宣言。
We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifesto
圖片出處:The Reactive Manifesto
不知道是不是為了向敏捷宣言致敬,響應式宣言中也包含了 4 組關鍵詞:
- Responsive: 可響應的。要求系統儘可能做到在任何時候都能及時響應。
- Resilient: 可恢復的。要求系統即使出錯了,也能保持可響應性。
- Elastic: 可伸縮的。要求系統在各種負載下都能保持可響應性。
- Message Driven: 消息驅動的。要求系統通過非同步消息連接各個組件。
可以看到,對於任何一個響應式系統,首先要保證的就是可響應性,否則就稱不上是響應式系統。從這個意義上來說,動不動就藍屏的 Windows 系統顯然不是一個響應式系統。
PS: 如果你贊同響應式宣言,不妨到官網上留下的你電子簽名,我的編號是 18989,試試看能不能找到我。
2. 響應式編程
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipedia
在上述響應式編程(後面簡稱 RP)的定義中,除了非同步編程,還包含兩個重要的關鍵詞:
- Data streams:即數據流,分為靜態數據流(比如數組,文件)和動態數據流(比如事件流,日誌流)兩種。基於數據流模型,RP 得以提供一套統一的 Stream 風格的數據處理介面。和 Java 8 中的 Stream API 相比,RP API 除了支持靜態數據流,還支持動態數據流,並且允許復用和同時接入多個訂閱者。
- The propagation of change:變化傳播,簡單來說就是以一個數據流為輸入,經過一連串操作轉化為另一個數據流,然後分發給各個訂閱者的過程。這就有點像函數式編程中的組合函數,將多個函數串聯起來,把一組輸入數據轉化為格式迥異的輸出數據。
一個容易混淆的概念是響應式設計,雖然它的名字中也包含了「響應式」三個字,但其實和 RP 完全是兩碼事。響應式設計是指網頁能夠自動調整布局和樣式以適配不同尺寸的屏幕,屬於網站設計的範疇,而 RP 是一種關注系統可響應性,面向數據流的編程思想或者說編程框架。
特性
從本質上說,RP 是一種非同步編程框架,和其他框架相比,RP 至少包含了以下三個特性:
- 描述而非執行:在你最終調用
subscribe()
方法之前,從發布端到訂閱端,沒有任何事會發生。就好比無論多長的水管,只要水龍頭不打開,水管里的水就不會流動。為了提高描述能力,RP 提供了比 Stream 豐富的多的多的API,比如buffer()
,merge()
,onErrorMap()
等。 - 提高吞吐量: 類似於 HTTP/2 中的連接復用,RP 通過線程復用來提高吞吐量。在傳統的Servlet容器中,每來一個請求就會發起一個線程進行處理。受限於機器硬體資源,單台伺服器所能支撐的線程數是存在一個上限的,假設為T,那麼應用同時能處理的請求數(吞吐量)必然也不會超過T。但對於一個使用 Spring 5 開發的 RP 應用,如果運行在像 Netty 這樣的非同步容器中,無論有多少個請求,用於處理請求的線程數是相對固定的,因此最大吞吐量就有可能超過T。
- 背壓(Backpressure)支持:簡單來說,背壓就是一種反饋機制。在一般的 Push 模型中,發布者既不知道也不關心訂閱者的處理速度,當數據的發布速度超過處理速度時,需要訂閱者自己決定是緩存還是丟棄。如果使用 RP,決定權就交回給發布者,訂閱者只需要根據自己的處理能力問發布者請求相應數量的數據。你可能會問這不就是 Pull 模型嗎?其實是不同的。在 Pull 模型中,訂閱者每次處理完數據,都要重新發起一次請求拉取新的數據,而使用背壓,訂閱者只需要發起一次請求,就能連續不斷的重複請求數據。
適用場景
了解了 RP 的這些特性,你可能已經猜想到 RP 有哪些適用場景了。一般來說,RP 適用於高並發、帶延遲操作的場景,比如以下這些情況(的組合):
- 一次請求涉及多次外部服務調用
- 非可靠的網路傳輸
- 高並發下的消息處理
- 彈性計算網路
代價
Every coin has two sides.
和任何框架一樣,有優勢必然就有劣勢。RP 的兩個比較大的問題是:
- 雖然復用線程有助於提高吞吐量,但一旦在某個回調函數中線程被卡住,那麼這個線程上所有的請求都會被阻塞,最嚴重的情況,整個應用會被拖垮。
- 難以調試。由於 RP 強大的描述能力,在一個典型的 RP 應用中,大部分代碼都是以鏈式表達式的形式出現,比如
flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe()
,一旦出錯,你將很難定位到具體是哪個環節出了問題。所幸的是,RP 框架一般都會提供一些工具方法來輔助進行調試。
3. Reactor 實戰
為了幫助你理解上面說的一些概念,下面我就通過幾個測試用例,演示 RP 的兩個關鍵特性:提高吞吐量和背壓。完整的代碼可參見我 GitHub 上的示例工程。
提高吞吐量
@Testn public void testImperative() throws InterruptedException {n _runInParallel(CONCURRENT_SIZE, () -> {n ImperativeRestaurantRepository.INSTANCE.insert(load);n });n }nn private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {n ExecutorService executorService = Executors.newFixedThreadPool(nThreads);n for (int i = 0; i < nThreads; i++) {n executorService.submit(task);n }n executorService.shutdown();n executorService.awaitTermination(1, TimeUnit.MINUTES);n }nn @Testn public void testReactive() throws InterruptedException {n CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);n for (int i = 0; i < CONCURRENT_SIZE; i++) {n ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {n }, e -> latch.countDown(), latch::countDown);n }n latch.await();n }n
用例解讀:
- 第一個測試用例使用的是多線程 + MongoDB Driver,同時起 100 個線程,每個線程往 MongoDB 中插入 10000 條數據,總共 100 萬條數據,平均用時15秒左右。
- 第二個測試用例使用的是 Reactor + MongoDB Reactive Streams Driver,同樣是插入 100 萬條數據,平均用時不到 10 秒,吞吐量提高了 50%!
背壓
在演示測試用例之前,先看兩張圖,幫助你更形象的理解什麼是背壓。
圖片出處:Dataflow and simplified reactive programming
兩張圖乍一看沒啥區別,但其實是完全兩種不同的背壓策略。第一張圖,發布速度(100/s)遠大於訂閱速度(1/s),但由於背壓的關係,發布者嚴格按照訂閱者的請求數量發送數據。第二張圖,發布速度(1/s)小於訂閱速度(100/s),當訂閱者請求100個數據時,發布者會積滿所需個數的數據再開始發送。可以看到,通過背壓機制,發布者可以根據各個訂閱者的能力動態調整發布速度。
@BeforeEachn public void beforeEach() {n // initialize publishern AtomicInteger count = new AtomicInteger();n timerPublisher = Flux.create(s ->n new Timer().schedule(new TimerTask() {n @Overriden public void run() {n s.next(count.getAndIncrement());n if (count.get() == 10) {n s.complete();n }n }n }, 100, 100)n );n }nn @Testn public void testNormal() throws InterruptedException {n CountDownLatch latch = new CountDownLatch(1);n timerPublishern .subscribe(r -> System.out.println("Continuous consuming " + r),n e -> latch.countDown(),n latch::countDown);n latch.await();n }nn @Testn public void testBackpressure() throws InterruptedException {n CountDownLatch latch = new CountDownLatch(1);n AtomicReference<Subscription> timerSubscription = new AtomicReference<>();n Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {n @Overriden protected void hookOnSubscribe(Subscription subscription) {n timerSubscription.set(subscription);n }nn @Overriden protected void hookOnNext(Integer value) {n System.out.println("consuming " + value);n }nn @Overriden protected void hookOnComplete() {n latch.countDown();n }nn @Overriden protected void hookOnError(Throwable throwable) {n latch.countDown();n }n };n timerPublisher.onBackpressureDrop().subscribe(subscriber);n new Timer().schedule(new TimerTask() {n @Overriden public void run() {n timerSubscription.get().request(1);n }n }, 100, 200);n latch.await();n }n
用例解讀:
- 第一個測試用例演示了在理想情況下,即訂閱者的處理速度能夠跟上發布者的發布速度(以 100ms 為間隔產生 10 個數字),控制台從 0 列印到 9,一共 10 個數字,和發布端一致。
- 第二個測試用例故意調慢了訂閱者的處理速度(每 200ms 處理一個數字),同時發布者採用了 Drop 的背壓策略,結果控制台只列印了一半的數字(0,2,4,6,8),另外一半的數字由於背壓的原因被發布者 Drop 掉了,並沒有發給訂閱者。
4 小結
通過上面的介紹,不難看出 RP 實際上是一種內置了發布者訂閱者模型的非同步編程框架,包含了線程復用,背壓等高級特性,特別適用於高並發、有延遲的場景。
下篇我將對剛剛發布的 Spring 5 中有關響應式編程的支持做一些簡單介紹,並詳解一個完整的 Spring 5 示例應用,敬請期待。
5 參考
- Understanding Reactive types:https://spring.io/blog/2016/04/19/understanding-reactive-types
- Designing, Implementing, and Using Reactive APIs:https://www.slideshare.net/SpringCentral/designing-implementing-and-using-reactive-apis
- Imperative to Reactive Web Applications:https://www.slideshare.net/SpringCentral/imperative-to-reactive-web-applications
全文完
我們正在招聘 Java 工程師,歡迎有興趣的同學投遞簡歷到 rd-hr@xingren.com 。
歡迎搜索關注微信公眾號:杏仁技術站(微信號 xingren-tech)。
推薦閱讀:
※漲姿勢!手Q刷一刷紅包後台設計總結
※mac下python爬蟲亂碼問題?
※你的後台夠強大嗎?
※Mac下Web開發為為什麼都用Sublime而不用VIM呢?
TAG:后台开发 | 响应式设计Responsivewebdesign |