RxJS 入門
RxJS是什麼?
官方:RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源於 Reactive Extensions,是一個基於可觀測數據流在非同步編程應用中的庫。RxJS 是 Reactive Extensions 在 JavaScript 上的實現。
一般說到RxJS,都會講他是基於流的響應式的結合觀察者和迭代器模式的一種庫。所以下面會從這幾個關鍵詞來講。
前置知識點
響應式編程(RP —— Reactive Programming)
響應式編程是一種面向數據流和變化傳播的編程範式。在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。—— wikipedia
- 響應式編程是使用非同步數據流進行編程。常見的非同步數據流包含Event buses。用包含這些事件在內的任何東西創建數據流(Data stream),監聽他並作出響應。
- 只關注業務邏輯互相依賴的事件而不是實現細節
- 適用於大量和數據有關的事件交互,特別是高實時性要求
流
以點擊按鈕這一個事件流舉例:
多次點擊按鈕事件流舉例:
- 一個流就是一個不間斷的按照時間排序的序列。它產生三種不同類型事件: 值,錯誤,完成的信號。對這三個定義事件處理函數,就可以非同步的捕獲這些事件
- 每個stream有多個方法,調用時會基於原來的流返回一個新的流,原來的流不做修改,保證不可變性
- 數據流支持鏈式調用,你可以組合不同的函數來處理流,創建和過濾不同的流。甚至一個流或多個流可以作為另外一個流的輸入。你可以合併兩個數據流。你可以過濾一個數據流,從中獲取一個包含你感興趣的事件的數據流。你可以將來自一個數據流的值映射到另外一個數據流
觀察者模式(發布-訂閱模式)
例子:購房者和售房部之間的信息訂閱。購房者訂閱售房部的房價信息,售房部維護一張需要信息的客戶表,當有信息時,遍歷表給符合條件的購房者推送發布房屋信息。這裡,購房者擔任觀察者的角色,售房部是被觀察的角色,當售房部信息發生變化,則自動推送信息給購房者。
結論:流(售房部/rx.js的Observable)是被觀察的,某個函數訂閱流的某個事件(推送房價),該函數是觀察者(購房者/rx.js的Observer)。當流的某個事件產生了,對應的函數就會被執行。
迭代器模式
提供一種方法順序訪問一個聚合對象中的各個元素,而不需要暴露該對象的內部表示。最常見的就是JavaScript 中像 Array、Set 等這些內置的可迭代類型,可以通過 iterator 方法來獲取一個迭代對象,調用迭代對象的 next 方法將獲取一個元素對象。
var iterable = [1, 2];var iterator = iterable[Symbol.iterator]();iterator.next(); // => { value: "1", done: false}iterator.next(); // => { value: "2", done: false}iterator.next(); // => { value: undefined, done: true}
由上可知JavaScript 的 Iterator 只有一個 next 方法,這個 next 方法只會回傳這兩種結果。iterator通過調用next獲取值,是一種pull數據的形式。
與Promise的區別
- Promise本質上也是一個Observable,能使用fromPromise把Promise轉成Observable
- 但是Promise .then()只能返回一個值,Observable可以返回多個值
- Promise要麼resolve要麼reject,並且只響應一次。而Observable可以響應多次
- Promise不能取消,Observable可以調用unsubscribe()取消訂閱
解決的問題
- 同步和非同步的統一
- 可組合的數據變更過程
- 數據和視圖的精確綁定
- 條件變更之後的自動重新計算
核心概念
概述關係
Demo: 一個簡單的例子
Observable —— 被觀察者
RxJS是觀察者 + 迭代器模式的結合,Observable作為被觀察者,是一個值或事件的流集合。就像是一個序列,裡面的元素會隨著時間推送。
var observable = Rx.Observable// 通過create方法創建一個Observable// 回調函數會接受observer參數,也就是觀察者角色 .create(function(observer) { observer.next("hi"); observer.next("world"); setTimeout(() => { observer.next("這一段是非同步操作"); }, 30) })// 訂閱這個 observable// 只有在訂閱之後,才會在流Observable變化的時候,調用observer提供的方法,並通知他 // 訂閱之後也可以取消訂閱,調用unsubscribe()即可console.log("start")var subscription = observable.subscribe(function(value) { console.log(value);})console.log("end")setTimeOut(()=> { subscription.unsubscribe()}, 5000)// 程序會依次輸出"start""hi""world""end""這一段是非同步操作"
所以,Observable不同於觀察者模式中的被觀察者,他沒有一份需要維護訂閱者的清單,他只是一個函數。想要訂閱他只需要傳進回調函數observer就好。並且,Observable 可以同時處理同步和非同步操作!
同時,有很多創建Observable的方法,常用的如下:
Operator —— 操作符
操作Observable的函數就是操作符。他會接受傳入的Observable,但是會返回新的Observable。用map舉例說明。
Rx.Observable.of(2) .map(v => v * 2) .subscribe(v => console.log("output:" + v));// output:4
下面介紹幾個常用的操作符。具體的API可視化數據流動可參見寶石圖
Observer —— 觀察者
和迭代器模式一一對應,提供三個方法,next、error、complete
var Observer = { next(value) { /* 處理值*/ }, error(error) { /* 處理異常 */ }, complete() { /* 處理已完成態 */ }};next(): 接收Observable發出的值 (必傳)error(): 不同於迭代器裡面用try catch,Observer用error方法接收錯誤 (可選)complete(): 當沒有新的數據發出的時候,觸發操作 (可選)
Subject —— 觀察模式的實現,並繼承Observable
同一個Observable可以被多個observer訂閱。和addListener類似她們由Subject維護列表,Subject可以向多個Observer多路推送數值,是一類特殊的Observable。
- 每一個Subject都是一個Observable,可以訂閱他。從Observer的視角看,它並不能區分自己的執行環境是普通Observable的單路推送還是基於Subject的多路推送
- 每一個Subject也可以是Observer,因為他同樣由next、error、complete方法組成,調用next方法,Subject會給所有在他上面註冊的Observer多路推送當前的值
demo地址
// 創建一個Observable,一秒鐘輸出一個數字,只取三個就結束var source = Rx.Observable.interval(1000).take(3);// 定義兩個observer對象var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!")}var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!")}// 創建一個subject —— 特殊的Observablevar subject = new Rx.Subject()// observerA訂閱Subjectsubject.subscribe(observerA)// Subject又以observer的身份訂閱Observablesource.subscribe(subject);setTimeout(() => { subject.subscribe(observerB);}, 1000);// 輸出:// "A next: 0"// "A next: 1"// "B next: 1"// "A next: 2"// "B next: 2"// "A complete!"// "B complete!" A、B兩個observer互不影響,是獨立的
Scheduler —— 控制Observable的時間節點
控制一個Observable的訂閱開始執行和值送達的時機。RxJS 5中提供了4種scheduler。一般operator會有預設的scheduler。
- queue —— 遞歸的時候,queue會把遞歸阻塞,避免不必要的性能損耗
- asap —— as soon as possible,表現為setTimeout時間為0。多用於永不退訂的Observable,比如輪詢
- async —— 規定當前的Observable執行方式為非同步,用setInterval實現。
- animationFrame —— Window.requestAnimationFrame這個API實現,適合高頻率的UI動畫觸發
var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete();});console.log("before subscribe");observable.observeOn(Rx.Scheduler.async) // 本來是同步的,變成了非同步.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log("Error: " + err); }, complete: () => { console.log("complete"); }});console.log("after subscribe");// "before subscribe"// "after subscribe"// 1// 2// 3// "complete"
應用場景
例子: Trello 、 Teambition等協作工作軟體
核心:視圖的狀態是一個時間軸上的若干流的疊加效果。
業務場景特點:
- 有很多數據,非常多關於數據的操作
- 展示的數據是多個數據組合而成,比如任務、對應owner、標籤等
- 同一個數據的更新,可能來自不同的發起方
- 新增的數據需要的數據處理規則應該和原來的相同
解決:
- 數據通過緩存和非同步方式獲取
- 把每個數據流管道組合起來,流的疊合就是最後的數據
- 獲取和訂閱放在一起,也就不需要知道數據的來源是哪裡了
- 現在和未來的數據merge之後通過相同的API處理,保證數據的規則相同
其他
仿照優酷視頻的小窗效果
拖拽效果
資料
- RxJS 5
- The introduction to Reactive Programming you've been missing
- 流動的數據——使用 RxJS 構造複雜單頁應用的數據邏輯
推薦閱讀: