[譯] RxJS Observable 與 Promises 和 Async-Await 交互
原文鏈接: https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875
本文為 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合作! 如果你也想和我們一起,翻譯更多優質的 RxJS 文章以奉獻給大家,請點擊【這裡】
不時地會有人問我關於如何與 RxJS 配合使用 async 函數或 promises,還有更糟的,我被告之「事實」的真相是 async-await 和 Observables 並不能「在一起使用」。RxJS 從一開始就具備與 Promises 的高度互操作性。希望這篇文章能對此有所啟發。
如果可以接收 Observable,就可以接收 Promise
例如,如果使用 switchMap,你可以返回 Promise 來替代,就像返回 Observable 那樣。以下這些都是有效的:
// Observable: 每1秒發出自增數值乘以100,共發出10次nconst source$ = Observable.interval(1000)n .take(10)n .map(x => x * 100);n/**n * 返回 promise,它等待 `ms` 毫秒並發出 "done" n */nfunction promiseDelay(ms) {n return new Promise(resolve => {n setTimeout(() => resolve(done), ms);n });n}nn// 在 switchMap 中使用 promiseDelaynsource$.switchMap(x => promiseDelay(x)) // 正常運行n .subscribe(x => console.log(x)); nnsource$.switchMap(promiseDelay) // 更簡潔了n .subscribe(x => console.log(x)); nn// 或者使用 takeUntilnsource$.takeUntil(doAsyncThing(hi)) // 完全可以運行n .subscribe(x => console.log(x))nn// 或者類似這樣的奇怪組合nObservable.of(promiseDelay(100), promiseDelay(10000)).mergeAll()n .subscribe(x => console.log(x))n
使用 defer 使得返回 Promise 的函數可以重試
如果你可以訪問創建 promise 的函數,你可以使用 Observable.defer() 來包裝它,以使 Observable 可以在報錯時進行重試。
function getErroringPromise() {n console.log(getErroringPromise called);n return Promise.reject(new Error(sad));n}nnObservable.defer(getErroringPromise)n .retry(3)n .subscribe(x => console.log);nn// 輸出 "getErroringPromise called" 4次 (開始1次 + 3次重試), 然後報錯n
使用 defer() 定義使用 async-await 的 Observable
事實證明, defer 是個非常強大的小工具。你可以使用它,基本上是直接使用 async 函數,它會創建一個發出返回值及完成的 Observable 。
Observable.defer(async function() {n const a = await promiseDelay(1000).then(() => 1);n const b = a + await promiseDelay(1000).then(() => 2);n return a + b + await promiseDelay(1000).then(() => 3);n})n.subscribe(x => console.log(x)) // 輸出 7n
使用 forEach 訂閱 Observable 以在 async-await 中創建並發任務
這是 RxJS 中較少使用的功能,它來自 TC39 Observable 提議。訂閱 Observable 可不止一種方式! subscribe 是訂閱 Observable 的傳統方式,它返回用來取消數據流 Subscription 對象。而 forEach 以一種不可取消的方式來訂閱 Observable ,它接收一個函數用於每值,並返回 Promise,該 Promise 體現了 Observable 的完成和錯誤路徑。
const click$ = Observable.fromEvent(button, click);n/**n * 等待10次按鈕點擊,然後使用 fetch 將第10次點擊的時間戳發送給端點n */nasync function doWork() {n await click$.take(10)n .forEach((_, i) => console.log(`click ${i + 1}`));n return await fetch(n notify/tenclicks,n { method: POST, body: Date.now() }n );n}n
使用 toPromise() 和 async/await 將 Observable 最後發出的值作為 Promise 發出
toPromise 函數實際上是有些巧妙的,因為它並不是真正的「操作符」,而是以一種 RxJS 特定的方式來訂閱 Observable 並將其包裝成一個 Promise 。一旦 Observable 完成,Promise 便會 resolve Observable 最後發出的值。這意味著如果 Observable 發出值 「hi」 然後等待10秒才完成,那麼返回的 Promise 會等待10秒才 resolve 「hi」 。如果 Observable 一直不完成,那麼 Promise 便永遠不會 resolve 。
注意: 使用 toPromise() 是一種反模式,除非當你正在處理預期為 Promise 的 API, 比如 async-await
const source$ = Observable.interval(1000).take(3); // 0, 1, 2n// 等待3秒,然後輸出 "2"n// 因為 Observable 需要3秒才能完成,而 interval 發出從0開始自增的數字nasync function test() {n console.log(await source$.toPromise());n}n
Observables 和 Promises 能很好地一起使用
不可否認地,如果你的目標是響應式編程,那麼大多數時間裡你可能想要使用 Observable ,但是 RxJS 嘗試去儘可能地滿足大眾需求,畢竟當下 Promises 還是很受歡迎的。此外,在 async 函數中使用 RxJS Observables 和 forEach,為管理並發性和在 async-await 中「只能正常運行」的任務開啟了大量有趣的可能性。
想學習更多 RxJS 知識, 我可以親自教學或選擇在線學習,盡在rxworkshop.com!
推薦閱讀:
※如何看待基於 Atom 和 Lens 的狀態管理工具 Calmm-js 和 Focal?
TAG:RxJS | FunctionalReactiveProgramming | Promise |