標籤:

用 RxJS 連接世界

本文是一系列介紹 RxJS 文章的第二篇,這一系列的文章將從一個小的例子開始,逐漸深入的講解 RxJS 在各種場景下的應用。對應的,也會有對 RxJS 各種操作符的講解。這篇文章將接著第一篇Hello Rx 中的例子,將更多的非同步業務(Http 請求) 接入我們的 Todo App 中。在例子中,會使用更多操作符(RxJS Operator) 來處理我們的業務,後續的文章中會詳細的講解這些操作符的作用和使用場景。

準備工作

首先在 GitHub - teambition/learning-rxjs: Learning RxJS step by stepclone 項目所需的 seed,並基於 article1 分支 checkout 一個你的 article2 分支。本文中所有涉及到 RxJS 的代碼將全部使用 TypeScript 編寫。

這篇文章中,我們將使用 RxJS 實現以下幾個功能:

  1. 按回車或點擊 add button 後發送一個請求,在請求返回結果後再清空輸入框,並將返回的結果變成一個 todo item。如果在請求返回結果前又一次按下回車或 add 按鈕,對比此時輸入框的值和上次發送的值是否相同,如果相同則不進行任何操作,如果不同則取消掉上次的請求並發送新的請求。
  2. 在點擊一個 todo item 的時候發送請求,間隔 300 毫秒內的點擊,只會發出一次請求。
  3. 在輸入框中每次輸入字元,都會在 200 毫秒後發送一個請求,搜索是否匹配到已存在的 todo item,如果已存在則高亮這個 todo item。如果在一次搜索的結果返回前輸入了新的字元,則取消掉前一個請求,再發一個搜索請求。

使用 switchMap 切換 Observable

為了實現需求,首先需要在原有的邏輯中加入請求的邏輯,我們可以在 lib.ts 中找到 mockHttpPost 方法:

export const mockHttpPost = (value: string): Observable<HttpResponse> => {n return Observable.create((observer: Observer<HttpResponse>) => {n let status = pendingn const timmer = setTimeout(() => {n const result = {n _id: ++dbIndex, value,n isDone: falsen }n searchStorage.set(result._id, result)n status = donen observer.next(result)n observer.complete()n }, random(10, 1000))n return () => {n clearTimeout(timmer)n if (status === pending) {n console.warn(post canceled)n }n }n })n}n

這裡我並沒有真正的發送一個 http 請求,在真實的業務場景中,將請求轉化成 Observable 的過程應該是這樣的:

Observable.create(observer => {n request(xxxx, response => {n // success callbackn observer.next(parse(response))n observer.complete()n }, err => {n // error callbackn observer.error(err)n })n // teardown logicn return () => request.abort()n})n

app.ts 中引入 mockHttpPost:

...nimport {n createTodoItem,n mockHttpPostn} from ./libnn...nnconst item$ = input$n .map(() => $input.value)n .filter(r => r !== )n .switchMap(mockHttpPost)n .map(data => createTodoItem(data.value))n .do((ele: HTMLLIElement) => {n $list.appendChild(ele)n $input.value = n })n .publishReplay(1)n .refCount()n

修改 createTodoItem helper,讓它支持傳入 HttpResponse 格式的數據:

// lib.tsnexport const createTodoItem = (data: HttpResponse) => {n const result = <HTMLLIElement>document.createElement(LI)n result.classList.add(list-group-item, `todo-item-${data._id}`)n result.setAttribute(data-id, `${data._id}`)n const innerHTML = `n ${data.value}n <button type="button" class="btn btn-default button-remove pull-right" aria-label="right Align">n <span class="glyphicon glyphicon-remove" aria-hidden="true"></span>n </button>n `n result.innerHTML = innerHTMLn return resultn}n

這樣 $item 部分的代碼可以簡化成:

const item$ = input$n .map(() => $input.value)n .filter(r => r !== )n .switchMap(mockHttpPost)n .map(createTodoItem)n .do((ele: HTMLLIElement) => {n $list.appendChild(ele)n $input.value = n })n .publishReplay(1)n .refCount()n

此時代碼運行的行為是這樣的:

  1. 直接輸入值並回車,todo item 像以前一樣被創建
  2. 輸入值,並在 todo item 生成前多次回車,可以看到請求被 cancel 了多次:

這裡的 switchMap 其實是 map and switch,而 switch 操作符的行為是:

如果 Observable 中流動的數據也是 Observable,switch 會將數據流中最新的一個 Observable 訂閱並將它的值傳遞給下一個操作符,然後取消訂閱之前的 Observable。

所以這裡的 switchMap 實際是:

const item$ = input$n .map(() => $input.value)n .filter(r => r !== )n .map(mockHttpPost)n .switch()n .map(createTodoItem)n...n

的縮寫。同樣的,之前用到的 mergeMap 也是 map and merge

如果你有興趣,可以嘗試下面的代碼觀察 switchMap 行為:

// 你可以在項目目錄下執行: npm i -g ts-node && ts-node example/switchMap.ts 觀察運行結果nimport { Observable, Observer } from rxjsnnconst stream = Observable.create((observer: Observer<number>) => {n let i = 0n const intervalId = setInterval(() => {n observer.next(++i)n }, 1000)n return () => clearInterval(intervalId)n})nnfunction createIntervalObservable(base: number) {n let i = 0n return Observable.create((observer: Observer<string>) => {n const intervalId = setInterval(() => {n observer.next(`base: ${base}, value: ${++i}`)n }, 200)n return () => {n clearInterval(intervalId)n console.log(`unsubscribe base: ${base}`)n }n })n}nnstream.switchMap(createIntervalObservable)n .subscribe(result => console.log(result))n

使用 distinct* 操作符過濾數據

但這裡的邏輯還有一點不足,我們輸入一個值並快速按下多次回車,前幾次的請求被 cancel ,但如果 input 的值不變我們其實不需要 cancel 掉這些請求,只需要忽略後幾次的點擊即可。可以使用 distinct 操作符實現這個需求:

const item$ = input$n .map(() => $input.value)n .filter(r => r !== )n .distinct()n .switchMap(mockHttpPost)n .map(createTodoItem)n .do((ele: HTMLLIElement) => {n $list.appendChild(ele)n $input.value = n })n .publishReplay(1)n .refCount()n

此時,如果在請求返回前不停的按下回車,只有在 input value 改變的時候才會 cancel 上一個請求:

使用 Subject 推送數據

此時還存在一個小問題,在生成 todo item 後再輸入與上次同樣的值並按下回車,這次的值會被 distinct 操作符過濾掉。為了解決這個問題,我們可以指定 distinct 操作符的第二個參數 flushes 來清除 distinct 操作符的緩存:

import { Observable, Subject } from rxjsn...nnconst clearInputSubject$ = new Subject<void>()nnconst item$ = input$n .map(() => $input.value)n .filter(r => r !== )n .distinct(null, clearInputSubject$)n .switchMap(mockHttpPost)n .map(createTodoItem)n .do((ele: HTMLLIElement) => {n $list.appendChild(ele)n $input.value = n clearInputSubject$.next()n })n .publishReplay(1)n .refCount()n

這裡出現的 Subject 既有 Observer 的功能,也有 Observable 的功能,但又有一些區別。上一篇講過了 Observable 是 unioncast 的,也就意味著 Observable 中一個值只會發送給一個訂閱者。而 publish/share 操作符可以將它們變成 muticast 的,但它依然是 lazy 的,也就是要有訂閱者它才會執行。而這裡的 Subject 與 Observable 相比,不僅是 muticast 的,而且是非 lazy 的,它可以在任意時刻任意地點推送數據,這些數據可以被任意多的訂閱者共享。

根據 Subject 的特性可以看出來,item$ 這個 publish 出來的 Observable 可以改寫成一個 Subject,有興趣的讀者可以自行嘗試(訂閱 input 並在 subscribe 中 next 值)。

使用 debounceTime 過濾重複的操作

我們已經實現了第一個需求,接下來要完成第二個 在點擊一個 todo item 的時候發送請求,在請求返回結果前的點擊都會被忽略。請求的邏輯和上一個一樣:

...nimport {n createTodoItem,n mockToggle,n mockHttpPostn} from ./libn...nnconst toggle$ = item$.mergeMap($todoItem => {nreturn Observable.fromEvent<MouseEvent>($todoItem, click)n .filter(e => e.target === $todoItem)n .mapTo({n data: {n _id: $todoItem.dataset[id],n isDone: $todoItem.classList.contains(done)n }, $todoItemn })n})n .switchMap(result => {n return mockToggle(result.data._id, result.data.isDone)n .mapTo(result.$todoItem)n })n...n

這裡短時間的重複點擊,會讓前一個點擊請求取消掉,但這與我們的需求不符,我們需要的是間隔 300 毫秒內的點擊,只會發出一次請求,debounceTime 操作符可以完成這個工作:

const toggle$ = item$.mergeMap($todoItem => {n return Observable.fromEvent<MouseEvent>($todoItem, click)n .debounceTime(300)n .filter(e => e.target === $todoItem)n .mapTo({n data: {n _id: $todoItem.dataset[id],n isDone: $todoItem.classList.contains(done)n }, $todoItemn })n})n .switchMap(result => {n return mockToggle(result.data._id, result.data.isDone)n .mapTo(result.$todoItem)n })n

debounce and switchMap,最小化使用你的資源

最後一個需求,需要同時使用 debounceTime 和 switchMap :

...nimport {n createTodoItem,n mockToggle,n mockHttpPost,n search,n HttpResponsen} from ./libnn...n// 後面的 search$ 與 enter 應該時從同一個 Observable 中轉換出來,這裡將 input 事件的 Observable publish 成 muticastnconst type$ = Observable.fromEvent<KeyboardEvent>($input, keydown)n .publish()n .refCount()nnconst enter$ = type$n .filter(r => r.keyCode === 13)nn...nconst search$ = type$.debounceTime(200)n .filter(evt => evt.keyCode !== 13)n .map(result => (<HTMLInputElement>result.target).value)n .switchMap(search)n .do((result: HttpResponse | null) => {n const actived = document.querySelectorAll(.active)n Array.prototype.forEach.call(actived, (item: HTMLElement) => {n item.classList.remove(active)n })n if (result) {n const item = document.querySelector(`.todo-item-${result._id}`)n item.classList.add(active)n }n })nnconst app$ = toggle$.merge(remove$, search$)n .do(r => {n console.log(r)n })n

試著用不同的速度輸入一系列的字元串,觀察控制台的響應。在 200 毫秒內的輸入被忽略,在 response 回來之前的輸入會讓前一個 request abort 掉。如果匹配到相同的 todo item 則會高亮它。

總結

一個簡陋的 Todo App 就此完成(delete 與 toggle 類似,有興趣可以自行實現),它涵蓋了一些 RxJS 擅長的領域:

  1. 將同步/非同步代碼抽象成同樣的形狀,並使用操作符加以組合
  2. 在需要的時候 cancel ,最大化節約資源

但可以明顯看出,在業務逐漸複雜以後,直接的組合 Observable 與 Observable 已經會讓數據流變得難以預測(hard to reason about),特別是在它們互相依賴互相派生的情況更加複雜的場景下。而大家知道,Flux/Redux 非常擅長處理這種場景,後面的文章中也會講到如何運用單向數據流的思想管理 Observable,以及如何使用 Redux Observable 將 RxJS 作為 Redux 的 Epics。


推薦閱讀:

RxJS 入門

TAG:RxJS | TypeScript |