Rx 的編程方式(一)

1. Observables & Reactive

先來一個簡單直觀的例子:

const { Observable } = require("rxjs");const source$ = Observable.of([1, 2, 3]);source$.subscribe(x => console.log(x));

過濾器節點:subscribe

2. Declarative Transformation( 聲明式轉換 )

如果我們想要平時開發的數據轉換功能,可以使用一些類似管道的工具來做。

Observable.of(1, 2, 3) .map(n => n * 2) .subscribe(x => console.log(x));

過濾器節點:map、subscribe。我所理解的就像一條溪流流動的水,然後 map 這類 API 就像水桶,將水裝入進行加工,當然,其他 map 的特性先不用細緻了解。

3. Lazy Transformation( 懶執行透明特性 )

Observable 能夠在流動的過程中進行選擇,所謂的懶特性就是不會像 Promise 一樣,給了一個數據承認,就一定會讓你接受,你可選擇不接受或者現在不接受,先這樣子理解。

Observable.range(1, 100) // 轉換管道 .map(n => n * 2) // 攔截節點 .filter(n => n > 4) // 懶獲取 .take(2) .subscribe(x => { console.log(x); });

4. DOM Event

Rx 其實是可以獨立與任何框架使用了,也可以按需載入想用的方法,對於 Rx,其實和 Zone 一樣,有了自己的保護圈和玩法,如果你要玩它,就需要進入它的圈子,大多數的相關 API 就是 `from` 的特性,對於 Rx 5.0 模塊化做的更好,以及渲染 Timeline 上更可控。

<button id="btn"></button><h1 id="out">0</h1>

const btn = document.getElementById("btn");const out = document.getElementById("out");/* eslint-disable no-undef */const { Observable } = Rx;Observable.fromEvent(btn, "click") // mapTo 能映射一個常量,map 需要一個函數 .mapTo(1) .scan((acc, cur) => acc + cur, 0) .subscribe(count => { out.innerHTML = count; });

其中主要依賴了:

rxjs/Observable.jsrxjs/observable/fromEvent.jsrxjs/operator/mapTo.jsrxjs/operator/scan.js

如果是打包的方式可以按需載入,如果是 script 可以直接引入 `rxjs/bundles/Rx.js`,不過會比較大,所以最好是按需提取到公用的 external 文件中,也能使用 lite 的版本,是常用的精簡版。

如果有兩個事件怎麼辦呢?利用流的合併特性可以做:

// 加法流,然後轉換成數據 1 流入給下一個輸入const { Observable } = Rx;const inSource$ = Observable.fromEvent(inBtn, "click").mapTo(1);// 減法流,然後轉換成數據 -1 流入給下一個輸入const outSource$ = Observable.fromEvent(outBtn, "click").mapTo(-1);// 合併成最後需要操作的流const source$ = Observable.merge(inSource$, outSource$);// 操作流並且訂閱流的 next 函數source$.scan((acc, cur) => { if(acc + cur < 0) { return 0; } return acc + cur;}, 0).subscribe(count => { print.innerHTML = count;});

5. Async HTTP

前端主要除了這種簡單的數據操作,更多的是要看非同步場景下的複雜度,下面用 `Promise`、`async/await`、`Obserable` 三者的寫法來對比。

(1) Promise方案

const data = fetchOrders().then(res => { if(res.status === 200) { return res.data; }}); data.then(orders => orders.filter(order => order.text === "Bob")) .then(orders => orders.map(order => order.id)) .then(ids => console.log(ids));function fetchOrders() { return axios.post("orders.json");}

(2) async/await

function fetchOrders() { return axios.post("orders.json");}renderOrders();async function renderOrders() { const res = await fetchOrders(); if(res.status === 200) { const data = res.data; data.filter(order => order.text === "Bob") .map(order => order.id) .forEach(id => console.log(id)); }}

(3) Observable

const { Observable } = Rx;function fetchOrders() { const promise = axios.post("orders.json"); return Observable.fromPromise(promise);} const hook = fetchOrders() .switchMap(res => { let data = []; if(res.status === 200) { data = res.data; } return Observable.from(data); }) .filter(order => order.text === "Bob") .map(order => order.id) .filter(id => id !== void 0) .subscribe(id => { console.log(id); });

6. Cancellation( 可取消的特性 )

其實這個特性很好用,可以隨時取消你訂閱的流,這是 `Promise` 做不到的,一旦發起承認就無法取消。

hook.unsubscribe();

7. Create HTTP Request

上面的 ajax 使用的是 axios 封裝的,然後返回的是 Promise 的對象,如果要用 Observable 的生態,那麼就需要利用 `fromPromise` 進行轉化,這看上去不是特別好,所以能夠直接利用 Observable 生態下的 ajax 進行操作。

與之前方案需要修改的地方有 2 個:

1. fetchOrders 函數請求 ajax 的方式

2. ajax 響應返回的對象不一樣,畢竟之前使用的 axios

// 1. 使用 Observable.ajaxfunction fetchOrders() { return Observable.ajax.post("orders.json");}// 2. 修改一下 ajax 響應基礎數據的處理if(res.status === 200) { // res.data => res.response, ng2 如果不是 json 需要調用 json() 方法 data = res.response;}

8. Observable.create 的一些細節

(1) Obserable 的組成

我之前仿造過一個簡單 Obserable 的實現:fegg/Observable

這裡用簡單的結構來說一下:

class Observable { constructor (fn) { this.fn = fn; } static create(fn) { return new Observable(fn); } subscribe(next, error, complete) { if (typeof next !== "function") { return this.fn(next); } return this.fn({ next, error: error || () => {}, complete: complete || () => {} }); }}

從上面代碼可以看出來,構造器主要接收一個函數,然後 `create` 方法主要返回的是一個 Obserable 實例。然後 `subscribe` 方法接收的是三個參數,如果我們傳入的是一個 function 那麼就會調用 `this.fn` 傳入一個對象,這個對象有三個屬性值:`next、error、complete`,默認情況下 `next` 是必須傳入的,其餘的是可選,為了安全,自己手動創建的最好調用一下 `complete`。

這樣看上來其實也挺簡單的。下來回到 rxjs 本身來看一下 `create` 的用法:

function fetchSomeone() { return Observable.create(observer => { observer.next("ok"); observer.complete(); });}fetchSomeone().subscribe(x => console.log(x));// 或者如下寫法fetchSomeone().subscribe({ next: (x) => console.log(x), complete: () => {}});

這裡用法比較簡單,就是傳入一個常量字元串 `ok`,然後在這個 Observable 被訂閱的時候傳給訂閱者。

剩下的一些內容單獨來看:

(1) Error Handle( 錯誤的處理機制 )

(2) Hot and Cold 數據流以及它們之間的轉換關係

(3) 流節點之間的轉換

(4) 流節點的並發問題

後面可能會比較細節,如果有興趣可以直接看下面的 PPT,我主要也是按 PPT 的例子和思路來寫。

參考的PPT: DevNexus 2017: The Rise of Async JavaScript

原文出自:小擼博客-60sky


推薦閱讀:

前端 UI組件化的一些思考
在React.js中使用PureComponent的重要性和使用方式
koa 實現 react-view 原理
react 行,我等你

TAG:RxJS | React | Angular? |