Webrtc的多路事件分離器及網路API
說到Webrtc,第一反應是網路聊天,好的聊天必須建立在穩定、高效的網路模塊和多線程同步基礎上。多路事件分離器是Webrtc中的基礎模塊,是網路和多線程同步的一個主要部分。對多路事件分離器中的多路,一方面指的是要同時支持多個socket,另一方面Webrtc在處理socket的同時處理了喚醒事件,這是種和socket無關的事件,它和多線程相關,為此又涉及到了線程模型。
已有不少分析Webrtc線程模型/多路事件分離器的文章,這裡不想深入代碼,只說和app編程相關部分,有點像Webrtc向app提供的網路API。在說之前首先要知道多路事件分離器處理的兩類事件:喚醒事件和IO事件。 喚醒事件。喚醒事件和多線程相關。Invoke(內部其實是Send)、Send、Post是產生喚醒事件的三大源頭。喚醒事件往往用於同步。舉個例子,要把一個目錄打包成一個文件,打包須要點時間,為達到好的用戶體驗就要把打包操作放在後台,為此系統就存在兩個線程,主線程A和用於打包的工作者線程B。B在打包時發現須要密鑰,而密鑰只能在A產生,於是它就調用a->Invoke(...)。Invoke會向A的消息隊列壓入此個消息,然後向A發喚醒事件,並把自個阻塞。A收到喚醒事件後,獲取和該事件相關的消息數據,執行產生密鑰。密鑰函數執行結束後B解除阻塞,恢復運行。喚醒事件並不總是其它線程發來,有時是自個發出。尤其Post,向自個發Post可實現延遲調用。舉個例子,網路通信時發生網卡突然被禁用,於是網路模塊就去調用app向它註冊的鉤子函數。鉤子函數知道得釋放資源,但此時又不能釋放,因為鉤子函數執行完後網路模塊要繼續獲得控制權,還須要依賴這些資源。這時鉤子函數就可以在自個結尾處放個發向自已的Post,當網路模塊處理禁用結束後,所在線程就會調用Post投遞的消息,在那裡就可以安全釋放資源。
IO事件。IO事件和socket相關,指的就是在socket上發生的事件。在Windows,socket會觸發十種事件,Webrtc只處理當中5種,分別是FD_READ、FD_WRITE、FD_ACCEPT、FD_CONNECT、FD_CLOSE。一個多路事件分離器可同時處理多個socket。線程模型/多路事件分離器Thread派生於MessageQueue,PhysicalSocketServer派生於SocketServer。MessageQueue::Get是多路事件分離器入口,大部分功能是通過調用PhysicalSocketServer::Wait。SocketServer雖然有Socket字樣,但不僅能偵聽socket相關的IO事件,還包括喚醒事件。對IO事件,不僅偵聽還處理,喚醒事件則只是偵聽,至於處理是後面的Dispatch。
MessageQueue負責接收、存儲Message類型消息,即喚醒事件,它決定了所有事件(包括IO事件)的執行線程。SocketServer負責阻塞、偵聽、處理(IO)事件。 一個多路事件分離器由一個MessageQueue和一個PhysicalSocketServer組成,這2個組件輪流獲得控制權。MessageQueue最先獲得控制權,它會檢查自己的消息隊列,如果有需要立即處理的消息(喚醒事件)就馬上處理,如果沒有就把控制權交給PhysicalSocketServer。PhysicalSocketServer將偵聽所有位於其分發器列表(PhysicalSocketServer::dispatchers_)的IO、喚醒事件。如果有事件被觸發,PhysicalSocketServer將調用對應的Dispatcher的消息響應函數(OnPreEvent、OnEvent)。對IO事件,OnPreEvent、OnEvent就把它們處理了,喚醒事件則要等到後面的Dispatch。 如果在PhysicalSocketServer阻塞偵聽時MessageQueue接收到喚醒事件,MessageQueue將會調用PhysicalSocketServer::WakeUp觸發PhysicalSocketServer::signal_wakeup_以解除PhysicalSocketServer的阻塞狀態。並將PhysicalSocketServer::fWait_設置為false,這將導致PhysicalSocketServer退出偵聽循環重新將控制權交給MessageQueue。MessageQueue獲得控制權後將立即處理消息,在完成消息處理後再將控制權交給PhysicalSocketServer。 傳給Wait的cmsWait參數是0時,它實現了輪詢事件。即當前有事件時就處理,沒有會立即返回。SDLThread就用了這個功能。Wait中的參數process_io指示了此次是否要處理IO事件,true時要處理。不管process_io是何值,一定會偵聽喚醒事件。
一個事件如何同時偵聽多個socket SocketServer可以同時偵聽多個socket,可它們在WSAWaitForMultipleEvents只佔一個事件的位置,這是怎麼回事?int WSAEventSelect(n _In_ SOCKET s, // 需要關聯的SOCKETn _In_ WSAEVENT hEventObject, // 需要關聯的事件對象n _In_ long lNetworkEvents // 感興趣的網路事件,不同的事件可以用 | 合併, FD_ALL_EVENTS 代表所有的事件n);n
socket事件和WSAWaitForMultipleEvents事件不是一回事。以上代碼中,參數hEventObject是WSAWaitForMultipleEvents事件,即那個佔了位置的事件,而s是我們想偵聽的socket,要同時偵聽多少個socket就要調用多少次WSAEventSelect。接下來,一旦這些socket上發生事件都會觸發到這個hEventObject。
既然這些socket觸發的都是位置0,代碼怎麼知道是哪個socket觸發?WSAWaitForMultipleEvents執行結束後,發現是位置0觸發,就會調用WSAEnumNetworkEvents去判斷觸發的是哪個socket。在app使用多路事件分離器
多路事件分離器中有個Thread,實際使用時這個Thread往往就是主線程。也就是說,當socket收到數據後,處理數據都放在了主線程,這有助於實現單線程app,避免了潛在的同步問題。 app使用多路事件分離器分兩步,一是定義個從Thread派生的類,實現輪詢函數;二是把這個類掛向主線程,app輪詢時調用它的輪詢函數。步驟一:定義個從Thread派生的類,實現輪詢函數namespace rtc {nclass SDLThread : public Thread {npublic:n SDLThread() : ss_() { set_socketserver(&ss_); }n virtual ~SDLThread() { set_socketserver(NULL); }n void pump();nprivate:n PhysicalSocketServer ss_;n};nnvoid SDLThread::pump()n{n Message msg;n size_t max_msgs = std::max<size_t>(1, size());n for (; max_msgs > 0 && Get(&msg, 0, true); --max_msgs) {n Dispatch(&msg);n }n}n}n
SDLThread::pump()是輪詢函數。邏輯很簡單,以cmsWait=0、process_io=true去調用Get。此刻有IO事件時,Get就給處理了,有喚醒事件時,Get會返回TRUE,並把事件相關的消息數據填入msg,輪詢函數就用這msg調用Dispatch,處理這個喚醒事件。
步驟二:把SDLThread掛向主線程。app輪詢時調用它的pump函數。rtc::SDLThread sdl_thread_;nrtc::ThreadManager::Instance()->SetCurrentThread(&sdl_thread_);nnvoid pump()n{n ......n sdl_thread_.pump();n ......n}n
在app主線程入口處執行SetCurrentThread(&sdl_thread_),它把sdl_thread_掛向當前線程,即主線程。
假設pump()是app的輪詢函數,在那調用sdl_thread_.pump()就可以了。網路API一、初始化socketrtc::AsyncSocket* socket_;nsocket_ = sdl_thread_.socketserver()->CreateAsyncSocket(family, SOCK_STREAM);nsocket_->Connect(server_address_);n
初始化分兩步,一是創建,二是連接ip:port對。創建socket就是調用PhysicalSocketServer的CreateAsyncSocket。CreateAsyncSocket須要兩個參數,分別對應創建socket時須要的af、type。af可選ipv4(AF_INET)或ipv6(AF_INET6),type可選tcp(SOCK_STREAM)還是udp(SOCK_DGRAM)。
CreateAsyncSocket會調用SocketDispatcher::Initialize(),後者會執行ss_->Add(this)。ss_是要加入到的PhysicalSocketServer,this就是這個socket,Add會把socket加入到事件分離器的IO事件集合。ss_所在的MessageQueue就是處理該socket事件的線程。 Connect執行連接ip:port對。二、讀寫socket 讀寫就是向socket掛接相應的處理函數,然後編寫這些函數。socket_->SignalCloseEvent.connect(this, &tchat2::OnClose);nsocket_->SignalConnectEvent.connect(this, &tchat2::OnConnect);nsocket_->SignalReadEvent.connect(this, &tchat2::OnRead);n
socket異常斷開(像禁用網卡)會調用OnClose,連接成功會調用OnConnect,收到數據則是OnRead。這些函數的執行線程就是創建socket_時的MessageQueue。
三、關閉socketsocket_->Close();n
如果socket_正連接著,Close會先執行斷開。接下會從事件分離器的IO事件集合移除該socket_。
推薦閱讀:
※Kurento是否可以讓客戶端選擇不同的實時監控視頻?
※《Learning WebRTC中文版》試讀 + 簽名優惠版
※webRTC : HTML5 視頻 直播 技術
※WebRTC有前途嗎?