又見Rx——Rx via UniRx
緣起
在最早我大三的時候,我就看見了Rx這麼一個函數響應式編程的東西,並且激發了濃厚的興趣,不過那時候的源碼閱讀水平也不足,隨便翻了一翻UniRx的源碼就草草了事。
自己寫的基於池的事件系統其實是對自己的事件系統的擴展,對Rx的仿照,但是事實上一點也不好用,是我對Rx本身原理還了解得非常淺,以至於非常難以擴展。
(
基於池的事件系統擴展 | Unity之路)
就在最近,我又重新打開了UniRx 的源碼,就在現在,無論是JavaRx還是JsRx都受到了前端的追捧,一度成為被矚目的技術。
不過在遊戲開發當中,Rx就像守望先鋒那種數據與行為完全分離的ECS框架一樣,叫好不叫座(能這麼用嗎)實際工程開發當中使用到Rx的場景並不多(雖然uFrame中嵌入了UniRx),大多數人還是停留在「卧槽牛逼!還能這麼搞吶!」這樣的階段,但是我覺得總有一天,這個技術能在遊戲開發領域也大放異彩(Maybe)。
為什麼要Rx
我們先用一個簡單的例子,來引出Rx的必要性:
例如:我們的功能現在在使用Linq(打個比方)訪問一個資料庫,訪問的數據就直接顯示出來,這個時候資料庫突然出現了問題,我們無法拿到下一個數據,這個時候我們的程序就完全卡住了!
那這個時候我們就想到了,使用非同步的方式來做,所以我們現在大多數的網路數據訪問、載入等等操作都是使用了非同步的操作來進行的——為了不讓我們的程序卡頓。
但是我們都知道,非同步編程是非常複雜的,我們需要考慮到各種情況,特別是我們需要對這個一步操作進行處理的時候非常地麻煩,我們會寫大量的重複代碼,寫出來的代碼難以維護以及擴展。
就舉個最簡單的例子吧,我們需要編寫一個雙擊的功能,我們會如何寫?我們能夠拿到最簡單的介面就是onClick。
首先我們第一次click,起一個定時器,當下一次點擊的時候這個定時器還沒有到時間那麼就進行雙擊響應。
看起來很簡單?
那擴展到多次點擊響應?要起很多定時器嗎?
然後擴展到事件本身?多次同一個事件觸發之後才進行響應?
再考慮到效率本身,是否複雜度就成倍擴展了?
Rx本身就是為了解決這些問題而存在的
順便,在UniRx中的雙擊是這麼實現的:
//創建一個點擊事件流var clickStream = Observable.EveryUpdate() .Where(_ => Input.GetMouseButtonDown(0));//將點擊事件流中的事件以250毫秒緩存起來,//如果緩存池的數量大於等於2就觸發事件clickStream.Buffer(clickStream.Throttle(TimeSpan.FromMilliseconds(250))) .Where(xs => xs.Count >= 2) .Subscribe(xs => Debug.Log("DoubleClick Detected! Count:" + xs.Count));
寫法是不是和Linq很像?
什麼是Rx
Rx是Reactive Extension的簡單寫法,翻譯過來就是響應擴展。
不過在講Rx之前我們要先講一講Linq。
我們都知道Linq,人人都愛Linq,因為Linq通過函數式編程的形式(甚至可以用類似Query查詢的語法糖來將我們想要的數據拿到手)
但是在上一節中我們已經知道,Linq會被阻塞(資料庫的話可能是Linq to SQL),導致程序卡死。那如何解決這個問題?
二元性,這是Rx與Linq很關鍵的聯繫。Linq是通過從數據源拉數據來完成讀取的,而Rx則是告訴數據源,「有數據了就告訴我哦~」,等待數據源自己推數據來讀取的,說得再籠統一點,就是觀察者模式。
我們看看迭代器的介面:
public interface IEnumerator{ bool MoveNext(); object Current { get; } void Reset();//這個介面函數我們會忽略掉}
我們再來看看IObserver的介面
public interface IObserver<T>{ void OnCompleted(); void OnError(Exception error); void OnNext(T value);}
這兩個介面看似毫無聯繫,但是卻有著密不可分的聯繫。
Current -> void OnNext(T value)
Current是用於獲取當前數據的,直接從遠端拉,然後返回的一個數據。
而OnNext則是遠端推一個數據過來的時候進行的相應的處理。
MoveNext() ->OnCompleted()與OnError()
MoveNext是用於位移到下一個數據的,有可能有兩種結果,有下一個或者沒下一個。
如果沒有下一個就結束遍歷,對應到Rx則是OnComplete()
事實上還有第三種結果,拋出異常。
對應了Rx中的OnError。
仔細思考一下,這兩個過程是不是互相對應?
如果是的話,那已經離答案非常接近了。
以下兩份代碼的運行結果是完全一致的,唯一的區別就是,Rx是非阻塞的,迭代器是阻塞的。
Rx的源碼閱讀路線
微軟官方在描述Rx的時候也有說道,Rx有三個部分組成:
- Observable
- Linq
- Scheduler
所以我們接下來也會從這三個方面入手。
剛開始閱讀UniRx的時候會發現,進入了IObservable之后里面全是介面!實現到底在什麼地方!我剛開始閱讀的時候也是這種感覺,但是後來才發現,這就是實現類似於Linq方式編程的關鍵,具體是什麼我們在後面會講到。
如果覺得UniRx還是過於複雜的話,推薦去閱讀RxLua試試,會好理解很多,當然功能也沒有UniRx強大,只是能更快看到本質的原理。
(
bjornbytes/RxLua)
在看該文章的時候我希望讀者也能將UniRx下到自己的工程當中一邊看一遍理解。
(
UniRx - Reactive Extensions for Unity - Asset Store
)
我們從最簡單的代碼開始:
Observable.Range(1, 10) //創建1~10的數據源.Where((x) => x % 2 == 0)//篩選其中能夠被2整除的部分//監聽數據並進行處理.Subscribe((arg) =>{ Debug.Log(arg);});
大家或許會覺得這段代碼過於簡單,但是相信我,學會了簡單的,複雜的代碼也不過是舉一反三而已。
Observable觀察者模式
事實上並不存在一個實際的,名字叫做Observable或者Observer的類,存在的只有IObservable以IObserver兩個介面,一切實現了這兩個介面的東西都能被用上。不過恰恰是因為這個原因,導致代碼讀起來著不了力,因為我們太想去看基類的實現了(繼承的思想),但是不存在基類。
我們後面會講到,Rx的主要思想是包裝模式。
下面是這兩個介面:
public interface IObservable<T>{ IDisposable Subscribe(IObserver<T> observer);}public interface IObserver<T>{ void OnCompleted(); void OnError(Exception error); void OnNext(T value);}
需要引起注意的一點是,Subscribe中的返回值是IDisposable。
為什麼是IDisposable?因為觀察者模式中需要取消訂閱,當我們知道數據源不需要再向我們推送消息的時候我們需要主動銷毀事件,就例如按鈕銷毀之後我們要銷毀按鈕點擊事件一樣。
我們繼續看我們剛剛提到過我們要看的簡單源碼
首先通過Range來創建一個數據源(Observable)
實際上UniRx創建了一個名叫RangeObservable類的實例,該類繼承自OperatorObservableBase<int>,我們現在先不管這個基類,因為其中主要是對多線程的處理,我們現在只需要知道它實現了IObservable即可。
Range中有一個關鍵的函數:*SubscribeCore*,告訴我們該Observable以什麼方式被不同的監聽者監聽。我們在這裡可以直接把它當做是IObservable介面中的Subscribe函數。
protected override IDisposable SubscribeCore(IObserver<int> observer, IDisposable cancel){ observer = new Range(observer, cancel); if (scheduler == Scheduler.Immediate) { for (int i = 0; i < count; i++) { int v = start + i; observer.OnNext(v); } observer.OnCompleted(); return Disposable.Empty; } else { var i = 0; return scheduler.Schedule((Action self) => { if (i < count) { int v = start + i; observer.OnNext(v); i++; self(); } else { observer.OnCompleted(); } }); }}
剔除調度的成分,我們直接將其作為Immediate模式的調度器
也就是
for (int i = 0; i < count; i++){ int v = start + i; observer.OnNext(v);}observer.OnCompleted();return Disposable.Empty;
Observable直接會向Observer發送OnNext()以及OnComplete的結果。
當然讀過源碼都明白這個Observer是經過包裝的(RangeObserver的內部類Range),當發生錯誤,會直接調用Observer的OnError
事實上,這就是一個簡單的觀察者模式。
舉一反三,我們會發現Observable的類中大多數的方法都是通過這個方式來創建經過包裝的數據源並且將傳入的最基礎的Observer包裝成相應的功能更複雜的Observer。
事件的卸載
在Rx中IDisposable佔到了非常重要的一個角色,那就是卸載事件。與迭代器不同,沒下一個就完事兒了,觀察者模式永遠也不知道有沒有下一個,所以主動地卸載事件是非常重要的一個事情。我在之前講到行為隊列的時候也有講到事件泄漏的情形((
董宸:DSL在項目中的應用:用DSL高效組織遊戲情節))。
以至於,Rx的源碼當中存在大量關於Dispose的代碼。
在調用SubscribeCore中需要傳入一個ICancelable,並且需要返回一個IDisopsable。
SubscribeCore的函數原型:
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
ICancelable的介面:
public interface ICancelable : IDisposable{ bool IsDisposed { get; }}
當然,並不是所有東西都需要實現,所以Rx通常會返回一個代表不需要Dispose的常量來代表不需要銷毀。
Linq操作
事實上,我們剛剛已經接觸了一部分包裝模式,只不過我們的數據源是直接創建出來的,那麼Linq操作又是如何實現的呢?
類似於.Where、.Amb等等操作是如何連起來讓整個數據流通過短短的幾個函數就可以被我們自由操作?
答案還是包裝模式。
我們進入我們找到Where的實現,實際上是創建了一個名叫WhereObservable類的實例,我們到裡面去看,與Range的實現大同小異。
它的SubscribeCore非常簡單:
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel){ if (predicate != null) { return source.Subscribe(new Where(this, observer, cancel)); } else { return source.Subscribe(new Where_(this, observer, cancel)); }}
只是對Observer進行簡單包裝之後直接將其監聽到傳進來的數據源中。
所以我們只需要看Where這個Observer即可:
class Where : OperatorObserverBase<T, T> { readonly WhereObservable<T> parent; public Where(WhereObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public override void OnNext(T value) { var isPassed = false; try { isPassed = parent.predicate(value); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); } return; } if (isPassed) { observer.OnNext(value); } } }
我們看到它的代碼非常簡單,OnNext的時候判斷一下條件,如果條件通過則調用其包裝好的Observer,否則就直接跳過,出錯就OnError。
舉一反三,我們可以通過這種方法來做其他各類的操作符,類似於Select、Amb等等等,方式完全一樣!只不過是包裝了傳入的Observer而已!
Schedule調度器
事實上,我們大多數的時候都不會注意到調度器的存在,因為它大多數情況下以默認值的形式存在,當我們想要更加靈活地使用Rx的時候也許就會用到它。
調度器的介面如下:
public interface IScheduler{ DateTimeOffset Now { get; } // Interface is changed from official Rx for avoid iOS AOT problem (state is dangerous). IDisposable Schedule(Action action); IDisposable Schedule(TimeSpan dueTime, Action action);}
其中Now代表的是當前時間,下面兩個方法一個是對傳入的方法進行調度,而另一個是傳入時間對行為進行調度。
查看引用之後我們會發現很多與時間相關的Observer與Observable都會採用調度器來進行實現,例如Range可以使用自定義的調度器來創建,我們可以隔幾秒鐘之後將一個值出隊,其默認調度器是ImmediateScheduler也就是直接調用。
代碼如下
//直接進行調度if (scheduler == Scheduler.Immediate){ for (int i = 0; i < count; i++) { int v = start + i; observer.OnNext(v); } observer.OnCompleted(); return Disposable.Empty;}//其他的調度方式,傳入的Action類似於行為隊列的實現else{ var i = 0; return scheduler.Schedule((Action self) => { if (i < count) { int v = start + i; observer.OnNext(v); i++; self(); } else { observer.OnCompleted(); } });}
一些操作符也用到了調度器,例如Timer、Delay等。
以下為Delay中的代碼片段:
public override void OnNext(T value){ var next = parent.scheduler.Now.Add(delay); var shouldRun = false; lock (gate) { queue.Enqueue(new Timestamped<T>(value, next)); shouldRun = ready && !active; active = true; } if (shouldRun) { cancelable.Disposable = parent.scheduler.Schedule(delay, DrainQueue); }}
當有新值到達的時候先放入隊列,通過調度器來調用出隊函數以達到Delay的目的。
調度器實際上也只是將行為放入隊列,然後在Thread中進行wait,並沒有使用協程,畢竟UniRx大部分代碼是從http://Rx.Net改過來的吧。
對接Unity功能
講到這裡,事實上或許連http://Rx.Net與UniRx的區別都沒有提到,但是最核心的內容已經講完了,UniRx中提供了大量的方法來幫助開發者在Unity中使用Rx,實際上我們只需要將Unity中的各個回調包裝成事件流便可以使用Rx了。
類似我們剛剛提及的雙擊功能:
//創建一個點擊事件流var clickStream = Observable.EveryUpdate() .Where(_ => Input.GetMouseButtonDown(0));
每一次Update被看做是一次事件,從中選出有點擊事件的幀,即可獲得點擊事件流。
接下來的使用方法便和其他任何一種平台的Rx都一樣了。
這部分的源碼全部都在UnityEngineBridge文件夾下,都非常好理解,有需要的同學也可以擴展出自己的事件流,非常容易。
未提及的部分
既然我們已經知道了Observable、Linq以及Scheduler是怎麼一回事,事實上我們自己也能去實現一個簡單的Rx了,但是事實上還有一些我們沒有閱讀的部分,大家可以繼續深入閱讀。
* UniRx在多線程中的情形
* UniRx在IOS上避免JIT發生而進行的重寫
* 在MVVM中使用Rx
推薦資料
Rx官網:(
ReactiveX)
Channel9上關於Rx介紹的講座(強烈推薦!)
Reactive Extension)
最後,還是一樣,我覺得我講的東西肯定會有疏漏,有大神看到的話還請多多指正了!
如果有在團隊中大規模使用Rx的團隊也希望能夠好好請教請教如何在團隊中推廣使用Rx,並且有項目中的最佳實踐能分享的話就更好了!
推薦閱讀:
TAG:FunctionalReactiveProgramming | UniRx | 源碼閱讀 |