標籤:

Webrtc的多路事件分離器及網路API

  說到Webrtc,第一反應是網路聊天,好的聊天必須建立在穩定、高效的網路模塊和多線程同步基礎上。多路事件分離器是Webrtc中的基礎模塊,是網路和多線程同步的一個主要部分。對多路事件分離器中的多路,一方面指的是要同時支持多個socket,另一方面Webrtc在處理socket的同時處理了喚醒事件,這是種和socket無關的事件,它和多線程相關,為此又涉及到了線程模型。

  已有不少分析Webrtc線程模型/多路事件分離器的文章,這裡不想深入代碼,只說和app編程相關部分,有點像Webrtc向app提供的網路API。在說之前首先要知道多路事件分離器處理的兩類事件:喚醒事件和IO事件。

  喚醒事件。喚醒事件和多線程相關。Invoke(內部其實是Send)、Send、Post是產生喚醒事件的三大源頭。喚醒事件往往用於同步。舉個例子,要把一個目錄打包成一個文件,打包須要點時間,為達到好的用戶體驗就要把打包操作放在後台,為此系統就存在兩個線程,主線程A和用於打包的工作者線程B。B在打包時發現須要密鑰,而密鑰只能在A產生,於是它就調用a->Invoke(...)。Invoke會向A的消息隊列壓入此個消息,然後向A發喚醒事件,並把自個阻塞。A收到喚醒事件後,獲取和該事件相關的消息數據,執行產生密鑰。密鑰函數執行結束後B解除阻塞,恢復運行。

  喚醒事件並不總是其它線程發來,有時是自個發出。尤其Post,向自個發Post可實現延遲調用。舉個例子,網路通信時發生網卡突然被禁用,於是網路模塊就去調用app向它註冊的鉤子函數。鉤子函數知道得釋放資源,但此時又不能釋放,因為鉤子函數執行完後網路模塊要繼續獲得控制權,還須要依賴這些資源。這時鉤子函數就可以在自個結尾處放個發向自已的Post,當網路模塊處理禁用結束後,所在線程就會調用Post投遞的消息,在那裡就可以安全釋放資源。

  IO事件。IO事件和socket相關,指的就是在socket上發生的事件。在Windows,socket會觸發十種事件,Webrtc只處理當中5種,分別是FD_READ、FD_WRITE、FD_ACCEPT、FD_CONNECT、FD_CLOSE。一個多路事件分離器可同時處理多個socket。

線程模型/多路事件分離器

  Thread派生於MessageQueue,PhysicalSocketServer派生於SocketServer。MessageQueue::Get是多路事件分離器入口,大部分功能是通過調用PhysicalSocketServer::Wait。SocketServer雖然有Socket字樣,但不僅能偵聽socket相關的IO事件,還包括喚醒事件。對IO事件,不僅偵聽還處理,喚醒事件則只是偵聽,至於處理是後面的Dispatch。

  MessageQueue負責接收、存儲Message類型消息,即喚醒事件,它決定了所有事件(包括IO事件)的執行線程。SocketServer負責阻塞、偵聽、處理(IO)事件。

  一個多路事件分離器由一個MessageQueue和一個PhysicalSocketServer組成,這2個組件輪流獲得控制權。MessageQueue最先獲得控制權,它會檢查自己的消息隊列,如果有需要立即處理的消息(喚醒事件)就馬上處理,如果沒有就把控制權交給PhysicalSocketServer。PhysicalSocketServer將偵聽所有位於其分發器列表(PhysicalSocketServer::dispatchers_)的IO、喚醒事件。如果有事件被觸發,PhysicalSocketServer將調用對應的Dispatcher的消息響應函數(OnPreEvent、OnEvent)。對IO事件,OnPreEvent、OnEvent就把它們處理了,喚醒事件則要等到後面的Dispatch。

  如果在PhysicalSocketServer阻塞偵聽時MessageQueue接收到喚醒事件,MessageQueue將會調用PhysicalSocketServer::WakeUp觸發PhysicalSocketServer::signal_wakeup_以解除PhysicalSocketServer的阻塞狀態。並將PhysicalSocketServer::fWait_設置為false,這將導致PhysicalSocketServer退出偵聽循環重新將控制權交給MessageQueue。MessageQueue獲得控制權後將立即處理消息,在完成消息處理後再將控制權交給PhysicalSocketServer。

  傳給Wait的cmsWait參數是0時,它實現了輪詢事件。即當前有事件時就處理,沒有會立即返回。SDLThread就用了這個功能。

  Wait中的參數process_io指示了此次是否要處理IO事件,true時要處理。不管process_io是何值,一定會偵聽喚醒事件。

一個事件如何同時偵聽多個socket

  SocketServer可以同時偵聽多個socket,可它們在WSAWaitForMultipleEvents只佔一個事件的位置,這是怎麼回事?

int WSAEventSelect(n _In_ SOCKET s, // 需要關聯的SOCKETn _In_ WSAEVENT hEventObject, // 需要關聯的事件對象n _In_ long lNetworkEvents // 感興趣的網路事件,不同的事件可以用 | 合併, FD_ALL_EVENTS 代表所有的事件n);n

  socket事件和WSAWaitForMultipleEvents事件不是一回事。以上代碼中,參數hEventObject是WSAWaitForMultipleEvents事件,即那個佔了位置的事件,而s是我們想偵聽的socket,要同時偵聽多少個socket就要調用多少次WSAEventSelect。接下來,一旦這些socket上發生事件都會觸發到這個hEventObject。

  既然這些socket觸發的都是位置0,代碼怎麼知道是哪個socket觸發?WSAWaitForMultipleEvents執行結束後,發現是位置0觸發,就會調用WSAEnumNetworkEvents去判斷觸發的是哪個socket。

在app使用多路事件分離器

  多路事件分離器中有個Thread,實際使用時這個Thread往往就是主線程。也就是說,當socket收到數據後,處理數據都放在了主線程,這有助於實現單線程app,避免了潛在的同步問題。

  app使用多路事件分離器分兩步,一是定義個從Thread派生的類,實現輪詢函數;二是把這個類掛向主線程,app輪詢時調用它的輪詢函數。

步驟一:定義個從Thread派生的類,實現輪詢函數

namespace rtc {nclass SDLThread : public Thread {npublic:n SDLThread() : ss_() { set_socketserver(&ss_); }n virtual ~SDLThread() { set_socketserver(NULL); }n void pump();nprivate:n PhysicalSocketServer ss_;n};nnvoid SDLThread::pump()n{n Message msg;n size_t max_msgs = std::max<size_t>(1, size());n for (; max_msgs > 0 && Get(&msg, 0, true); --max_msgs) {n Dispatch(&msg);n }n}n}n

  SDLThread::pump()是輪詢函數。邏輯很簡單,以cmsWait=0、process_io=true去調用Get。此刻有IO事件時,Get就給處理了,有喚醒事件時,Get會返回TRUE,並把事件相關的消息數據填入msg,輪詢函數就用這msg調用Dispatch,處理這個喚醒事件。

步驟二:把SDLThread掛向主線程。app輪詢時調用它的pump函數。

rtc::SDLThread sdl_thread_;nrtc::ThreadManager::Instance()->SetCurrentThread(&sdl_thread_);nnvoid pump()n{n ......n sdl_thread_.pump();n ......n}n

  在app主線程入口處執行SetCurrentThread(&sdl_thread_),它把sdl_thread_掛向當前線程,即主線程。

  假設pump()是app的輪詢函數,在那調用sdl_thread_.pump()就可以了。

網路API

一、初始化socket

rtc::AsyncSocket* socket_;nsocket_ = sdl_thread_.socketserver()->CreateAsyncSocket(family, SOCK_STREAM);nsocket_->Connect(server_address_);n

  初始化分兩步,一是創建,二是連接ip:port對。創建socket就是調用PhysicalSocketServer的CreateAsyncSocket。CreateAsyncSocket須要兩個參數,分別對應創建socket時須要的af、type。af可選ipv4(AF_INET)或ipv6(AF_INET6),type可選tcp(SOCK_STREAM)還是udp(SOCK_DGRAM)。

  CreateAsyncSocket會調用SocketDispatcher::Initialize(),後者會執行ss_->Add(this)。ss_是要加入到的PhysicalSocketServer,this就是這個socket,Add會把socket加入到事件分離器的IO事件集合。ss_所在的MessageQueue就是處理該socket事件的線程。

  Connect執行連接ip:port對。

二、讀寫socket

  讀寫就是向socket掛接相應的處理函數,然後編寫這些函數。

socket_->SignalCloseEvent.connect(this, &tchat2::OnClose);nsocket_->SignalConnectEvent.connect(this, &tchat2::OnConnect);nsocket_->SignalReadEvent.connect(this, &tchat2::OnRead);n

  socket異常斷開(像禁用網卡)會調用OnClose,連接成功會調用OnConnect,收到數據則是OnRead。這些函數的執行線程就是創建socket_時的MessageQueue。

三、關閉socket

socket_->Close();n

  如果socket_正連接著,Close會先執行斷開。接下會從事件分離器的IO事件集合移除該socket_。


推薦閱讀:

Kurento是否可以讓客戶端選擇不同的實時監控視頻?
《Learning WebRTC中文版》試讀 + 簽名優惠版
webRTC : HTML5 視頻 直播 技術
WebRTC有前途嗎?

TAG:WebRTC | CC |