RxJS 入門

RxJS是什麼?

官方:RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源於 Reactive Extensions,是一個基於可觀測數據流在非同步編程應用中的庫。RxJS 是 Reactive Extensions 在 JavaScript 上的實現。

一般說到RxJS,都會講他是基於流的響應式的結合觀察者和迭代器模式的一種庫。所以下面會從這幾個關鍵詞來講。

前置知識點

響應式編程(RP —— Reactive Programming)

響應式編程是一種面向數據流和變化傳播的編程範式。在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。—— wikipedia

  • 響應式編程是使用非同步數據流進行編程。常見的非同步數據流包含Event buses。用包含這些事件在內的任何東西創建數據流(Data stream),監聽他並作出響應。
  • 只關注業務邏輯互相依賴的事件而不是實現細節
  • 適用於大量和數據有關的事件交互,特別是高實時性要求

以點擊按鈕這一個事件流舉例:

多次點擊按鈕事件流舉例:

  • 一個流就是一個不間斷的按照時間排序的序列。它產生三種不同類型事件: 值,錯誤,完成的信號。對這三個定義事件處理函數,就可以非同步的捕獲這些事件
  • 每個stream有多個方法,調用時會基於原來的流返回一個新的流,原來的流不做修改,保證不可變性
  • 數據流支持鏈式調用,你可以組合不同的函數來處理流,創建和過濾不同的流。甚至一個流或多個流可以作為另外一個流的輸入。你可以合併兩個數據流。你可以過濾一個數據流,從中獲取一個包含你感興趣的事件的數據流。你可以將來自一個數據流的值映射到另外一個數據流

觀察者模式(發布-訂閱模式)

例子:購房者和售房部之間的信息訂閱。購房者訂閱售房部的房價信息,售房部維護一張需要信息的客戶表,當有信息時,遍歷表給符合條件的購房者推送發布房屋信息。這裡,購房者擔任觀察者的角色,售房部是被觀察的角色,當售房部信息發生變化,則自動推送信息給購房者。

結論:流(售房部/rx.js的Observable)是被觀察的,某個函數訂閱流的某個事件(推送房價),該函數是觀察者(購房者/rx.js的Observer)。當流的某個事件產生了,對應的函數就會被執行。

迭代器模式

提供一種方法順序訪問一個聚合對象中的各個元素,而不需要暴露該對象的內部表示。最常見的就是JavaScript 中像 Array、Set 等這些內置的可迭代類型,可以通過 iterator 方法來獲取一個迭代對象,調用迭代對象的 next 方法將獲取一個元素對象。

var iterable = [1, 2];var iterator = iterable[Symbol.iterator]();iterator.next(); // => { value: "1", done: false}iterator.next(); // => { value: "2", done: false}iterator.next(); // => { value: undefined, done: true}

由上可知JavaScript 的 Iterator 只有一個 next 方法,這個 next 方法只會回傳這兩種結果。iterator通過調用next獲取值,是一種pull數據的形式。

與Promise的區別

  1. Promise本質上也是一個Observable,能使用fromPromise把Promise轉成Observable
  2. 但是Promise .then()只能返回一個值,Observable可以返回多個值
  3. Promise要麼resolve要麼reject,並且只響應一次。而Observable可以響應多次
  4. Promise不能取消,Observable可以調用unsubscribe()取消訂閱

解決的問題

  • 同步和非同步的統一
  • 可組合的數據變更過程
  • 數據和視圖的精確綁定
  • 條件變更之後的自動重新計算

核心概念

概述關係

Demo: 一個簡單的例子

Observable —— 被觀察者

RxJS是觀察者 + 迭代器模式的結合,Observable作為被觀察者,是一個值或事件的流集合。就像是一個序列,裡面的元素會隨著時間推送。

var observable = Rx.Observable// 通過create方法創建一個Observable// 回調函數會接受observer參數,也就是觀察者角色 .create(function(observer) { observer.next("hi"); observer.next("world"); setTimeout(() => { observer.next("這一段是非同步操作"); }, 30) })// 訂閱這個 observable// 只有在訂閱之後,才會在流Observable變化的時候,調用observer提供的方法,並通知他 // 訂閱之後也可以取消訂閱,調用unsubscribe()即可console.log("start")var subscription = observable.subscribe(function(value) { console.log(value);})console.log("end")setTimeOut(()=> { subscription.unsubscribe()}, 5000)// 程序會依次輸出"start""hi""world""end""這一段是非同步操作"

所以,Observable不同於觀察者模式中的被觀察者,他沒有一份需要維護訂閱者的清單,他只是一個函數。想要訂閱他只需要傳進回調函數observer就好。並且,Observable 可以同時處理同步和非同步操作!

同時,有很多創建Observable的方法,常用的如下:

Operator —— 操作符

操作Observable的函數就是操作符。他會接受傳入的Observable,但是會返回新的Observable。用map舉例說明。

Rx.Observable.of(2) .map(v => v * 2) .subscribe(v => console.log("output:" + v));// output:4

下面介紹幾個常用的操作符。具體的API可視化數據流動可參見寶石圖

Observer —— 觀察者

和迭代器模式一一對應,提供三個方法,next、error、complete

var Observer = { next(value) { /* 處理值*/ }, error(error) { /* 處理異常 */ }, complete() { /* 處理已完成態 */ }};next(): 接收Observable發出的值 必傳error(): 不同於迭代器裡面用try catchObserver用error方法接收錯誤 可選complete(): 當沒有新的數據發出的時候觸發操作 可選

Subject —— 觀察模式的實現,並繼承Observable

同一個Observable可以被多個observer訂閱。和addListener類似她們由Subject維護列表,Subject可以向多個Observer多路推送數值,是一類特殊的Observable。

  • 每一個Subject都是一個Observable,可以訂閱他。從Observer的視角看,它並不能區分自己的執行環境是普通Observable的單路推送還是基於Subject的多路推送
  • 每一個Subject也可以是Observer,因為他同樣由next、error、complete方法組成,調用next方法,Subject會給所有在他上面註冊的Observer多路推送當前的值

demo地址

// 創建一個Observable,一秒鐘輸出一個數字,只取三個就結束var source = Rx.Observable.interval(1000).take(3);// 定義兩個observer對象var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!")}var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!")}// 創建一個subject —— 特殊的Observablevar subject = new Rx.Subject()// observerA訂閱Subjectsubject.subscribe(observerA)// Subject又以observer的身份訂閱Observablesource.subscribe(subject);setTimeout(() => { subject.subscribe(observerB);}, 1000);// 輸出:// "A next: 0"// "A next: 1"// "B next: 1"// "A next: 2"// "B next: 2"// "A complete!"// "B complete!" AB兩個observer互不影響是獨立的

Scheduler —— 控制Observable的時間節點

控制一個Observable的訂閱開始執行和值送達的時機。RxJS 5中提供了4種scheduler。一般operator會有預設的scheduler。

  • queue —— 遞歸的時候,queue會把遞歸阻塞,避免不必要的性能損耗
  • asap —— as soon as possible,表現為setTimeout時間為0。多用於永不退訂的Observable,比如輪詢
  • async —— 規定當前的Observable執行方式為非同步,用setInterval實現。
  • animationFrame —— Window.requestAnimationFrame這個API實現,適合高頻率的UI動畫觸發

var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete();});console.log("before subscribe");observable.observeOn(Rx.Scheduler.async) // 本來是同步的,變成了非同步.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log("Error: " + err); }, complete: () => { console.log("complete"); }});console.log("after subscribe");// "before subscribe"// "after subscribe"// 1// 2// 3// "complete"

應用場景

例子: Trello 、 Teambition等協作工作軟體

核心:視圖的狀態是一個時間軸上的若干流的疊加效果。

業務場景特點:

  1. 有很多數據,非常多關於數據的操作
  2. 展示的數據是多個數據組合而成,比如任務、對應owner、標籤等
  3. 同一個數據的更新,可能來自不同的發起方
  4. 新增的數據需要的數據處理規則應該和原來的相同

解決:

  1. 數據通過緩存和非同步方式獲取
  2. 把每個數據流管道組合起來,流的疊合就是最後的數據
  3. 獲取和訂閱放在一起,也就不需要知道數據的來源是哪裡了
  4. 現在和未來的數據merge之後通過相同的API處理,保證數據的規則相同

其他

仿照優酷視頻的小窗效果

拖拽效果

資料

  • RxJS 5
  • The introduction to Reactive Programming you've been missing
  • 流動的數據——使用 RxJS 構造複雜單頁應用的數據邏輯

推薦閱讀:

TAG:前端开发 | RxJS | 入门指南 |