伺服器端編程心得(八)——高性能伺服器架構設計總結——以flamigo伺服器代碼為例

這篇文章算是對這個系列的一個系統性地總結。我們將介紹伺服器的開發,並從多個方面探究如何開發一款高性能高並發的伺服器程序。

所謂高性能就是伺服器能流暢地處理各個客戶端的連接並盡量低延遲地應答客戶端的請求;所謂高並發,指的是伺服器可以同時支持多的客戶端連接,且這些客戶端在連接期間內會不斷與伺服器有數據來往。

這篇文章將從兩個方面來介紹,一個是伺服器的框架,即單個伺服器程序的代碼組織結構;另外一個是一組服務程序的如何組織與交互,即架構。注意:本文以下內容中的客戶端是相對概念,指的是連接到當前討論的服務程序的終端,所以這裡的客戶端既可能是我們傳統意義上的客戶端程序,也可能是連接該服務的其他伺服器程序。

一、框架篇

按上面介紹的思路,我們先從單個服務程序的組織結構開始介紹。

(一)、網路通信

既然是伺服器程序肯定會涉及到網路通信部分,那麼伺服器程序的網路通信模塊要解決哪些問題?

筆者認為至少要解決以下問題:

1. 如何檢測有新客戶端連接?

2. 如何接受客戶端連接?

3. 如何檢測客戶端是否有數據發來?

4.如何收取客戶端發來的數據?

5.如何檢測連接異常?發現連接異常之後,如何處理?

6.如何給客戶端發送數據?

7.如何在給客戶端發完數據後關閉連接?

稍微有點網路基礎的人,都能回答上面說的其中幾個問題,比如接收客戶端連接用socket API的accept函數,收取客戶端數據用recv函數,給客戶端發送數據用send函數,檢測客戶端是否有新連接和客戶端是否有新數據可以用IO multiplexing技術(IO復用)的select、poll、epoll等socket API。確實是這樣的,這些基礎的socket API構成了伺服器網路通信的地基,不管網路通信框架設計的如何巧妙,都是在這些基礎的socket API的基礎上構建的。但是如何巧妙地組織這些基礎的socket API,才是問題的關鍵。我們說伺服器很高效,支持高並發,實際上只是一個技術實現手段,不管怎樣從軟體開發的角度來講無非就是一個程序而已,所以,只要程序能最大可能地滿足「盡量減少等待」就是高效。也就是說高效不是「忙的忙死,閑的閑死」,而是大家都可以閑著,但是如果有活要干,大家盡量一起干,而不是一部分忙著依次做事情123456789,另外一部分閑在那裡無所事事。說的可能有點抽象,下面我們來舉一些例子具體來說明一下。

  • 比如默認recv函數如果沒有數據的時候,線程就會阻塞在那裡;
  • 默認send函數,如果tcp窗口不是足夠大,數據發不出去也會阻塞在那裡;
  • connect函數默認連接另外一端的時候,也會阻塞在那裡;
  • 又或者是給對端發送一份數據,需要等待對端回答,如果對方一直不應答,當前線程就阻塞在這裡。

以上都不是高效伺服器的開發思維方式,因為上面的例子都不滿足「盡量減少等待」的原則,為什麼一定要等待呢?有沒用一種方法,這些過程不需要等待,最好是不僅不需要等待,而且這些事情完成之後能通知我。這樣在這些本來用於等待的cpu時間片內,我就可以做一些其他的事情。有,也就是我們下文要討論的IO Multiplexing技術(IO復用技術)。

(二)、幾種IO復用機制的比較

目前windows系統支持select、WSAAsyncSelect、WSAEventSelect、完成埠(IOCP),linux系統支持select、poll、epoll。這裡我們不具體介紹每個具體的函數的用法,我們來討論一點深層次的東西,以上列舉的API函數可以分為兩個層次:

  • 層次一 select和poll
  • 層次二 WSAAsyncSelect、WSAEventSelect、完成埠(IOCP)、epoll

為什麼這麼分呢?先來介紹第一層次,select和poll函數本質上還是在一定時間內主動去查詢socket句柄(可能是一個也可能是多個)上是否有事件,比如可讀事件,可寫事件或者出錯事件,也就是說我們還是需要每隔一段時間內去主動去做這些檢測,如果在這段時間內檢測出一些事件來,我們這段時間就算沒白花,但是倘若這段時間內沒有事件呢?我們只能是做無用功了,說白了,還是在浪費時間,因為假如一個伺服器有多個連接,在cpu時間片有限的情況下,我們花費了一定的時間檢測了一部分socket連接,卻發現它們什麼事件都沒有,而在這段時間內我們卻有一些事情需要處理,那我們為什麼要花時間去做這個檢測呢?把這個時間用在做我們需要做的事情不好嗎?所以對於伺服器程序來說,要想高效,我們應該盡量避免花費時間主動去查詢一些socket是否有事件,而是等這些socket有事件的時候告訴我們去處理。這也就是層次二的各個函數做的事情,它們實際相當於變主動查詢是否有事件為當有事件時,系統會告訴我們,此時我們再去處理,也就是「好鋼用在刀刃」上了。只不過層次二的函數通知我們的方式是各不相同,比如WSAAsyncSelect是利用windows消息隊列的事件機制來通知我們設定的窗口過程函數,IOCP是利用GetQueuedCompletionStatus返回正確的狀態,epoll是epoll_wait函數返回而已。

比如connect函數連接另外一端,如果連接socket是非同步的,那麼connect雖然不能立刻連接完成,但是也是會立刻返回,無需等待,等連接完成之後,WSAAsyncSelect會返回FD_CONNECT事件告訴我們連接成功,epoll會產生EPOLLOUT事件,我們也能知道連接完成。甚至socket有數據可讀時,WSAAsyncSelect產生FD_READ事件,epoll產生EPOLLIN事件,等等。所以有了上面的討論,我們就可以得到網路通信檢測可讀可寫或者出錯事件的正確姿勢。這是我這裡提出的第二個原則:盡量減少做無用功的時間。這個在服務程序資源夠用的情況下可能體現不出來什麼優勢,但是如果有大量的任務要處理,個人覺得這個可能帶來無用

(三)、檢測網路事件的正確姿勢

根據上面的介紹,第一,為了避免無意義的等待時間,第二,不採用主動查詢各個socket的事件,而是採用等待操作系統通知我們有事件的狀態的策略。我們的socket都要設置成非同步的。在此基礎上我們回到欄目(一)中提到的七個問題:

1. 如何檢測有新客戶端連接?

2. 如何接受客戶端連接?

默認accept函數會阻塞在那裡,如果epoll檢測到偵聽socket上有EPOLLIN事件,或者WSAAsyncSelect檢測到有FD_ACCEPT事件,那麼就表明此時有新連接到來,這個時候調用accept函數,就不會阻塞了。當然產生的新socket你應該也設置成非阻塞的。這樣我們就能在新socket上收發數據了。

3. 如何檢測客戶端是否有數據發來?

4.如何收取客戶端發來的數據?

同理,我們也應該在socket上有可讀事件的時候才去收取數據,這樣我們調用recv或者read函數時不用等待,至於一次性收多少數據好呢?我們可以根據自己的需求來決定,甚至你可以在一個循環裡面反覆recv或者read,對於非阻塞模式的socket,如果沒有數據了,recv或者read也會立刻返回,錯誤碼EWOULDBLOCK會表明當前已經沒有數據了。示例:

bool CIUSocket::Recv() { int nRet = 0; while(true) { char buff[512]; nRet = ::recv(m_hSocket, buff, 512, 0); if(nRet == SOCKET_ERROR) //一旦出現錯誤就立刻關閉Socket { if (::WSAGetLastError() == WSAEWOULDBLOCK) break; else return false; } else if(nRet < 1) return false; m_strRecvBuf.append(buff, nRet); ::Sleep(1); } return true; }

5.如何檢測連接異常?發現連接異常之後,如何處理?

同樣當我們收到異常事件後例如EPOLLERR或關閉事件FD_CLOSE,我們就知道了有異常產生,我們對異常的處理一般就是關閉對應的socket。另外,如果send/recv或者read/write函數對一個socket進行操作時,如果返回0,那說明對端已經關閉了socket,此時這路連接也沒必要存在了,我們也可以關閉對應的socket。

6.如何給客戶端發送數據?

給客戶端發送數據,比收數據要稍微麻煩一點,也是需要講點技巧的。首先我們不能像檢測數據可讀一樣檢測數據可寫,因為如果檢測可寫的話,一般情況下只要對端正常收取數據,我們的socket就都是可寫的,如果我們設置監聽可寫事件,會導致頻繁地觸發可寫事件,但是我們此時並不一定有數據需要發送。所以正確的做法是:如果有數據要發送,則先嘗試著去發送,如果發送不了或者只發送出去部分,剩下的我們需要將其緩存起來,然後設置檢測該socket上可寫事件,下次可寫事件產生時,再繼續發送,如果還是不能完全發出去,則繼續設置偵聽可寫事件,如此往複,一直到所有數據都發出去為止。一旦所有數據都發出去以後,我們要移除偵聽可寫事件,避免無用的可寫事件通知。不知道你注意到沒有,如果某次只發出去部分數據,剩下的數據應該暫且存起來,這個時候我們就需要一個緩衝區來存放這部分數據,這個緩衝區我們稱為「發送緩衝區」。發送緩衝區不僅存放本次沒有發完的數據,還用來存放在發送過程中,上層又傳來的新的需要發送的數據。為了保證順序,新的數據應該追加在當前剩下的數據的後面,發送的時候從發送緩衝區的頭部開始發送。也就是說先來的先發送,後來的後發送。

7.如何在給客戶端發完數據後關閉連接?

這個問題比較難處理,因為這裡的「發送完」不一定是真正的發送完,我們調用send或者write函數即使成功,也只是向操作系統的協議棧裡面成功寫入數據,至於能否被發出去、何時被發出去很難判斷,發出去對方是否收到就更難判斷了。所以,我們目前只能簡單地認為send或者write返回我們發出數據的位元組數大小,我們就認為「發完數據」了。然後調用close等socket API關閉連接。關閉連接的話題,我們再單獨開一個小的標題來專門討論一下。

(四)被動關閉連接和主動關閉連接

在實際的應用中,被動關閉連接是由於我們檢測到了連接的異常事件,比如EPOLLERR,或者對端關閉連接,send或recv返回0,這個時候這路連接已經沒有存在必要的意義了,我們被迫關閉連接。

而主動關閉連接,是我們主動調用close/closesocket來關閉連接。比如客戶端給我們發送非法的數據,比如一些網路攻擊的嘗試性數據包。這個時候出於安全考慮,我們關閉socket連接。

(五)發送緩衝區和接收緩衝區

上面已經介紹了發送緩衝區了,並說明了其存在的意義。接收緩衝區也是一樣的道理,當收到數據以後,我們可以直接進行解包,但是這樣並不好,理由一:除非一些約定俗稱的協議格式,比如http協議,大多數伺服器的業務的協議都是不同的,也就是說一個數據包裡面的數據格式的解讀應該是業務層的事情,和網路通信層應該解耦,為了網路層更加通用,我們無法知道上層協議長成什麼樣子,因為不同的協議格式是不一樣的,它們與具體的業務有關。理由二:即使知道協議格式,我們在網路層進行解包處理對應的業務,如果這個業務處理比較耗時,比如讀取磁碟文件,或者連接資料庫進行賬號密碼驗證,那麼我們的網路線程會需要大量時間來處理這些任務,這樣其它網路事件可能沒法及時處理。鑒於以上二點,我們確實需要一個接收緩衝區,將收取到的數據放到該緩衝區裡面去,並由專門的業務線程或者業務邏輯去從接收緩衝區中取出數據,並解包處理業務。

說了這麼多,那發送緩衝區和接收緩衝區該設計成多大的容量?這是一個老生常談的問題了,因為我們經常遇到這樣的問題:預分配的內存太小不夠用,太大的話可能會造成浪費。怎麼辦呢?答案就是像string、vector一樣,設計出一個可以動態增長的緩衝區,按需分配,不夠還可以擴展。

需要特別注意的是,這裡說的發送緩衝區和接收緩衝區是每一個socket連接都存在一個。這是我們最常見的設計方案。

(六)協議的設計

除了一些通用的協議,如http、ftp協議以外,大多數伺服器協議都是根據業務制定的。協議設計好了,數據包的格式就根據協議來設置。我們知道tcp/ip協議是流式數據,所以流式數據就是像流水一樣,數據包與數據包之間沒有明顯的界限。比如A端給B端連續發了三個數據包,每個數據包都是50個位元組,B端可能先收到10個位元組,再收到140個位元組;或者先收到20個位元組,再收到20個位元組,再收到110個位元組;也可能一次性收到150個位元組。這150個位元組可以以任何位元組數目組合和次數被B收到。所以我們討論協議的設計第一個問題就是如何界定包的界線,也就是接收端如何知道每個包數據的大小。目前常用有如下三種方法:

  • 固定大小,這種方法就是假定每一個包的大小都是固定位元組數目,比如上文中討論的每個包大小都是50個位元組,接收端每收氣50個位元組就當成一個包;
  • 指定包結束符,比如以一個
    (換行符和回車符)結束,這樣對端只要收到這樣的結束符,就可以認為收到了一個包,接下來的數據是下一個包的內容;
  • 指定包的大小,這種方法結合了上述兩種方法,一般包頭是固定大小,包頭中有一個欄位指定包體或者整個大的大小,對端收到數據以後先解析包頭中的欄位得到包體或者整個包的大小,然後根據這個大小去界定數據的界線。

協議要討論的第二個問題是,設計協議的時候要盡量方便解包,也就是說協議的格式欄位應該盡量清晰明了。

協議要討論的第三個問題是,根據協議組裝的數據包應該盡量小,這樣有如下好處:第一、對於一些移動端設備來說,其數據處理能力和帶寬能力有限,小的數據不僅能加快處理速度,同時節省大量流量費用;第二、如果單個數據包足夠小的話,對頻繁進行網路通信的伺服器端來說,可以大大減小其帶寬壓力,其所在的系統也能使用更少的內存。試想:假如一個股票伺服器,如果一隻股票的數據包是100個位元組或者1000個位元組,那100隻股票和10000隻股票區別呢?

協議要討論的第二個問題是,對於數值類型,我們應該顯式地指定數值的長度,比如long型,如果在32位機器上是32位的4個位元組,但是如果在64位機器上,就變成了64位8個位元組了。這樣同樣是一個long型,發送方和接收方可能會用不同的長度去解碼。所以建議最好,在涉及到跨平台使用的協議最好顯式地指定協議中整型欄位的長度,比如int32,int64等等。下面是一個協議的介面的例子:

class BinaryReadStream { private: const char* const ptr; const size_t len; const char* cur; BinaryReadStream(const BinaryReadStream&); BinaryReadStream& operator=(const BinaryReadStream&); public: BinaryReadStream(const char* ptr, size_t len); virtual const char* GetData() const; virtual size_t GetSize() const; bool IsEmpty() const; bool ReadString(string* str, size_t maxlen, size_t& outlen); bool ReadCString(char* str, size_t strlen, size_t& len); bool ReadCCString(const char** str, size_t maxlen, size_t& outlen); bool ReadInt32(int32_t& i); bool ReadInt64(int64_t& i); bool ReadShort(short& i); bool ReadChar(char& c); size_t ReadAll(char* szBuffer, size_t iLen) const; bool IsEnd() const; const char* GetCurrent() const{ return cur; } public: bool ReadLength(size_t & len); bool ReadLengthWithoutOffset(size_t &headlen, size_t & outlen); }; class BinaryWriteStream { public: BinaryWriteStream(string* data); virtual const char* GetData() const; virtual size_t GetSize() const; bool WriteCString(const char* str, size_t len); bool WriteString(const string& str); bool WriteDouble(double value, bool isNULL = false); bool WriteInt64(int64_t value, bool isNULL = false); bool WriteInt32(int32_t i, bool isNULL = false); bool WriteShort(short i, bool isNULL = false); bool WriteChar(char c, bool isNULL = false); size_t GetCurrentPos() const{ return m_data->length(); } void Flush(); void Clear(); private: string* m_data; };

其中BinaryWriteStream是編碼協議的類,BinaryReadStream是解碼協議的類。可以按下面這種方式來編碼和解碼。

編碼:

std::string outbuf; BinaryWriteStream writeStream(&outbuf); writeStream.WriteInt32(msg_type_register); writeStream.WriteInt32(m_seq); writeStream.WriteString(retData); writeStream.Flush();

解碼:

BinaryReadStream readStream(strMsg.c_str(), strMsg.length()); int32_t cmd; if (!readStream.ReadInt32(cmd)) { return false; } //int seq; if (!readStream.ReadInt32(m_seq)) { return false; } std::string data; size_t datalength; if (!readStream.ReadString(&data, 0, datalength)) { return false; }

(七)、伺服器程序結構的組織

上面的六個標題,我們討論了很多具體的細節問題,現在是時候討論將這些細節組織起來了。根據我的個人經驗,目前主流的思想是one thread one loop的策略。通俗點說就是在一個線程的函數裡面不斷地循環依次做一些事情,這些事情包括檢測網路事件、解包數據產生業務邏輯。我們先從最簡單地來說,設定一些線程在一個循環裡面做網路通信相關的事情,偽碼如下:

while(退出標誌) { //IO復用技術檢測socket可讀事件、出錯事件 //(如果有數據要發送,則也檢測可寫事件) //如果有可讀事件,對於偵聽socket則接收新連接; //對於普通socket則收取該socket上的數據,收取的數據存入對應的接收緩衝區,如果出錯則關閉連接; //如果有數據要發送,有可寫事件,則發送數據 //如果有出錯事件,關閉該連接 }

另外設定一些線程去處理接收到的數據,並解包處理業務邏輯,這些線程可以認為是業務線程了,偽碼如下:

//從接收緩衝區中取出數據解包,分解成不同的業務來處理

上面的結構是目前最通用的伺服器邏輯結構,但是能不能再簡化一下或者說再綜合一下呢?我們試試,你想過這樣的問題沒有:假如現在的機器有兩個cpu,我們的網路線程數量是2個,業務邏輯線程也是2個,這樣可能存在的情況就是:業務線程運行的時候,網路線程並沒有運行,它們必須等待,如果是這樣的話,幹嘛要多建兩個線程呢?除了程序結構上可能稍微清楚一點,對程序性能沒有任何實質性提高,而且白白浪費cpu時間片在線程上下文切換上。所以,我們可以將網路線程與業務邏輯線程合併,合併後的偽碼看起來是這樣子的:

while(退出標誌) { //IO復用技術檢測socket可讀事件、出錯事件 //(如果有數據要發送,則也檢測可寫事件) //如果有可讀事件,對於偵聽socket則接收新連接; //對於普通socket則收取該socket上的數據,收取的數據存入對應的接收緩衝區,如果出錯則關閉連接; //如果有數據要發送,有可寫事件,則發送數據 //如果有出錯事件,關閉該連接 //從接收緩衝區中取出數據解包,分解成不同的業務來處理 }

你沒看錯,其實就是簡單的合併,合併之後和不僅可以達到原來合併前的效果,而且在沒有網路IO事件的時候,可以及時處理我們想處理的一些業務邏輯,並且減少了不必要的線程上下文切換時間。

我們再更進一步,甚至我們可以在這個while循環增加其它的一些任務的處理,比如程序的邏輯任務隊列、定時器事件等等,偽碼如下:

while(退出標誌) { //定時器事件處理 //IO復用技術檢測socket可讀事件、出錯事件 //(如果有數據要發送,則也檢測可寫事件) //如果有可讀事件,對於偵聽socket則接收新連接; //對於普通socket則收取該socket上的數據,收取的數據存入對應的接收緩衝區,如果出錯則關閉連接; //如果有數據要發送,有可寫事件,則發送數據 //如果有出錯事件,關閉該連接 //從接收緩衝區中取出數據解包,分解成不同的業務來處理 //程序自定義任務1 //程序自定義任務2 }

注意:之所以將定時器事件的處理放在網路IO事件的檢測之前,是因為避免定時器事件過期時間太長。假如放在後面的話,可能前面的處理耗費了一點時間,等到處理定時器事件時,時間間隔已經過去了不少時間。雖然這樣處理,也沒法保證定時器事件百分百精確,但是能盡量保證。

說了這麼多,我們來以flamingo的伺服器程序的網路框架設計為例來驗證上述介紹的理論。flamingo的網路框架是基於陳碩的muduo庫,改成C++11的版本,並修改了一些bug。在此感謝原作者陳碩。flamingo的源碼可以在這裡下載:github.com/baloonwj/fla,打不開github的可以移步csdn:download.csdn.net/detai

上文介紹的核心線程函數的while循環位於eventloop.cpp中:

void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; // FIXME: what if someone calls quit() before loop() ? LOG_TRACE << "EventLoop " << this << " start looping"; while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority eventHandling_ = true; for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; doPendingFunctors(); if (frameFunctor_) { frameFunctor_(); } } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; }

poller_->poll利用epoll分離網路事件,然後接著處理分離出來的網路事件,每一個客戶端socket對應一個連接,即一個TcpConnection和Channel通道對象。currentActiveChannel_->handleEvent(pollReturnTime_)根據是可讀、可寫、出錯事件來調用對應的處理函數,這些函數都是回調函數,程序初始化階段設置進來的:

void Channel::handleEvent(Timestamp receiveTime) { std::shared_ptr<void> guard; if (tied_) { guard = tie_.lock(); if (guard) { handleEventWithGuard(receiveTime); } } else { handleEventWithGuard(receiveTime); } } void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; LOG_TRACE << reventsToString(); if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "Channel::handle_event() POLLHUP"; } if (closeCallback_) closeCallback_(); } if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { //當是偵聽socket時,readCallback_指向Acceptor::handleRead //當是客戶端socket時,調用TcpConnection::handleRead if (readCallback_) readCallback_(receiveTime); } if (revents_ & POLLOUT) { //如果是連接狀態服的socket,則writeCallback_指向Connector::handleWrite() if (writeCallback_) writeCallback_(); } eventHandling_ = false; }

當然,這裡利用了Channel對象的「多態性」,如果是普通socket,可讀事件就會調用預先設置的回調函數;但是如果是偵聽socket,則調用Aceptor對象的handleRead()來接收新連接:

void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; //FIXME loop until no more int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport; //newConnectionCallback_實際指向TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you cant" in libevs doc. // By Marc Lehmann, author of livev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }

主循環裡面的業務邏輯處理對應:

doPendingFunctors(); if (frameFunctor_) { frameFunctor_(); }

void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true; { std::unique_lock<std::mutex> lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } callingPendingFunctors_ = false; }

這裡增加業務邏輯是增加執行任務的函數指針的,增加的任務保存在成員變數pendingFunctors_中,這個變數是一個函數指針數組(vector對象),執行的時候,調用每個函數就可以了。上面的代碼先利用一個棧變數將成員變數pendingFunctors_裡面的函數指針換過來,接下來對這個棧變數進行操作就可以了,這樣減少了鎖的粒度。因為成員變數pendingFunctors_在增加任務的時候,也會被用到,設計到多個線程操作,所以要加鎖,增加任務的地方是:

void EventLoop::queueInLoop(const Functor& cb) { { std::unique_lock<std::mutex> lock(mutex_); pendingFunctors_.push_back(cb); } if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } }

而frameFunctor_就更簡單了,就是通過設置一個函數指針就可以了。當然這裡有個技巧性的東西,即增加任務的時候,為了能夠立即執行,使用喚醒機制,通過往一個fd裡面寫入簡單的幾個位元組,來喚醒epoll,使其立刻返回,因為此時沒有其它的socke有事件,這樣接下來就執行剛才添加的任務了。

我們看一下數據收取的邏輯:

void TcpConnection::handleRead(Timestamp receiveTime) { loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { //messageCallback_指向CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime) messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } }

將收到的數據放到接收緩衝區裡面,將來我們來解包:

void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime) { while (true) { //不夠一個包頭大小 if (pBuffer->readableBytes() < (size_t)sizeof(msg)) { LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg); return; } //不夠一個整包大小 msg header; memcpy(&header, pBuffer->peek(), sizeof(msg)); if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg)) return; pBuffer->retrieve(sizeof(msg)); std::string inbuf; inbuf.append(pBuffer->peek(), header.packagesize); pBuffer->retrieve(header.packagesize); if (!Process(conn, inbuf.c_str(), inbuf.length())) { LOG_WARN << "Process error, close TcpConnection"; conn->forceClose(); } }// end while-loop }

先判斷接收緩衝區裡面的數據是否夠一個包頭大小,如果夠再判斷夠不夠包頭指定的包體大小,如果還是夠的話,接著在Process函數裡面處理該包。

再看看發送數據的邏輯:

void TcpConnection::sendInLoop(const void* data, size_t len) { loop_->assertInLoopThread(); ssize_t nwrote = 0; size_t remaining = len; bool faultError = false; if (state_ == kDisconnected) { LOG_WARN << "disconnected, give up writing"; return; } // if no thing in output queue, try writing directly if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = sockets::write(channel_->fd(), data, len); if (nwrote >= 0) { remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } } else // nwrote < 0 { nwrote = 0; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop"; if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others? { faultError = true; } } } } assert(remaining <= len); if (!faultError && remaining > 0) { size_t oldLen = outputBuffer_.readableBytes(); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining); if (!channel_->isWriting()) { channel_->enableWriting(); } } }

如果剩餘的數據remaining大於則調用channel_->enableWriting();開始監聽可寫事件,可寫事件處理如下:

void TcpConnection::handleWrite() { loop_->assertInLoopThread(); if (channel_->isWriting()) { ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes()); if (n > 0) { outputBuffer_.retrieve(n); if (outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); if (writeCompleteCallback_) { loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) { shutdownInLoop(); } } } else { LOG_SYSERR << "TcpConnection::handleWrite"; // if (state_ == kDisconnecting) // { // shutdownInLoop(); // } } } else { LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing"; } }

如果發送完數據以後調用channel_->disableWriting();移除監聽可寫事件。

很多讀者可能一直想問,文中不是說解包數據並處理邏輯是業務代碼而非網路通信的代碼,你這裡貌似都混在一起了,其實沒有,這裡實際的業務代碼處理都是框架曾提供的回調函數裡面處理的,具體怎麼處理,由框架使用者——業務層自己定義。

總結起來,實際上就是一個線程函數里一個loop那麼點事情,不信你再看我曾經工作上的一個交易系統項目代碼:

void CEventDispatcher::Run() { m_bShouldRun = true; while(m_bShouldRun) { DispatchIOs(); SyncTime(); CheckTimer(); DispatchEvents(); } }

void CEpollReactor::DispatchIOs() { DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT; if (HandleOtherTask()) { dwSelectTimeOut = 0; } struct epoll_event ev; CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin(); for(; itor!=m_mapEventHandlerId.end(); itor++) { CEventHandler *pEventHandler = (CEventHandler *)(*itor).first; if(pEventHandler == NULL){ continue; } ev.data.ptr = pEventHandler; ev.events = 0; int nReadID, nWriteID; pEventHandler->GetIds(&nReadID, &nWriteID); if (nReadID > 0) { ev.events |= EPOLLIN; } if (nWriteID > 0) { ev.events |= EPOLLOUT; } epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev); } struct epoll_event events[EPOLL_MAX_EVENTS]; int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000); for (int i=0; i<nfds; i++) { struct epoll_event &evref = events[i]; CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr; if ((evref.events|EPOLLIN)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end()) { pEventHandler->HandleInput(); } if ((evref.events|EPOLLOUT)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end()) { pEventHandler->HandleOutput(); } } }

void CEventDispatcher::DispatchEvents() { CEvent event; CSyncEvent *pSyncEvent; while(m_queueEvent.PeekEvent(event)) { int nRetval; if(event.pEventHandler != NULL) { nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam); } else { nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam); } if(event.pAdd != NULL) //同步消息 { pSyncEvent=(CSyncEvent *)event.pAdd; pSyncEvent->nRetval = nRetval; pSyncEvent->sem.UnLock(); } } }

再看看蘑菇街開源的TeamTalk的源碼(代碼下載地址:github.com/baloonwj/Tea):

void CEventDispatch::StartDispatch(uint32_t wait_timeout) { fd_set read_set, write_set, excep_set; timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = wait_timeout * 1000; // 10 millisecond if(running) return; running = true; while (running) { _CheckTimer(); _CheckLoop(); if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count) { Sleep(MIN_TIMER_DURATION); continue; } m_lock.lock(); memcpy(&read_set, &m_read_set, sizeof(fd_set)); memcpy(&write_set, &m_write_set, sizeof(fd_set)); memcpy(&excep_set, &m_excep_set, sizeof(fd_set)); m_lock.unlock(); int nfds = select(0, &read_set, &write_set, &excep_set, &timeout); if (nfds == SOCKET_ERROR) { log("select failed, error code: %d", GetLastError()); Sleep(MIN_TIMER_DURATION); continue; // select again } if (nfds == 0) { continue; } for (u_int i = 0; i < read_set.fd_count; i++) { //log("select return read count=%d
", read_set.fd_count); SOCKET fd = read_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnRead(); pSocket->ReleaseRef(); } } for (u_int i = 0; i < write_set.fd_count; i++) { //log("select return write count=%d
", write_set.fd_count); SOCKET fd = write_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnWrite(); pSocket->ReleaseRef(); } } for (u_int i = 0; i < excep_set.fd_count; i++) { //log("select return exception count=%d
", excep_set.fd_count); SOCKET fd = excep_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnClose(); pSocket->ReleaseRef(); } } } }

再看filezilla,一款ftp工具的伺服器端,它採用的是Windows的WSAAsyncSelect模型(代碼下載地址:github.com/baloonwj/fil):

//Processes event notifications sent by the sockets or the layers static LRESULT CALLBACK WindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam) { if (message>=WM_SOCKETEX_NOTIFY) { //Verify parameters ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd); if (!pWnd) return 0; if (message < static_cast<UINT>(WM_SOCKETEX_NOTIFY+pWnd->m_nWindowDataSize)) //Index is within socket storage { //Lookup socket and verify if its valid CAsyncSocketEx *pSocket=pWnd->m_pAsyncSocketExWindowData[message - WM_SOCKETEX_NOTIFY].m_pSocket; SOCKET hSocket = wParam; if (!pSocket) return 0; if (hSocket == INVALID_SOCKET) return 0; if (pSocket->m_SocketData.hSocket != hSocket) return 0; int nEvent = lParam & 0xFFFF; int nErrorCode = lParam >> 16; //Dispatch notification if (!pSocket->m_pFirstLayer) { //Dispatch to CAsyncSocketEx instance switch (nEvent) { case FD_READ: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_READ; break; } else if (pSocket->GetState() == attached) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; // Ignore further FD_READ events after FD_CLOSE has been received if (pSocket->m_SocketData.onCloseCalled) break; #endif //NOSOCKETSTATES #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { pSocket->OnReceive(nErrorCode); } break; case FD_FORCEREAD: //Forceread does not check if theres data waiting #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_FORCEREAD; break; } else if (pSocket->GetState() == attached) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnReceive(nErrorCode); } break; case FD_WRITE: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_WRITE; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_WRITE) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnSend(nErrorCode); } break; case FD_CONNECT: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting) { if (nErrorCode && pSocket->m_SocketData.nextAddr) { if (pSocket->TryNextProtocol()) break; } pSocket->SetState(connected); } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_CONNECT) pSocket->OnConnect(nErrorCode); #ifndef NOSOCKETSTATES if (!nErrorCode) { if ((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) pSocket->OnReceive(0); if ((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) pSocket->OnReceive(0); if ((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) pSocket->OnSend(0); } pSocket->m_nPendingEvents = 0; #endif break; case FD_ACCEPT: #ifndef NOSOCKETSTATES if (pSocket->GetState() != listening && pSocket->GetState() != attached) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_ACCEPT) pSocket->OnAccept(nErrorCode); break; case FD_CLOSE: #ifndef NOSOCKETSTATES if (pSocket->GetState() != connected && pSocket->GetState() != attached) break; // If there are still bytes left to read, call OnReceive instead of // OnClose and trigger a new OnClose DWORD nBytes = 0; if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes)) { if (nBytes > 0) { // Just repeat message. pSocket->ResendCloseNotify(); pSocket->m_SocketData.onCloseCalled = true; pSocket->OnReceive(WSAESHUTDOWN); break; } } pSocket->SetState(nErrorCode ? aborted : closed); #endif //NOSOCKETSTATES pSocket->OnClose(nErrorCode); break; } } else //Dispatch notification to the lowest layer { if (nEvent == FD_READ) { // Ignore further FD_READ events after FD_CLOSE has been received if (pSocket->m_SocketData.onCloseCalled) return 0; DWORD nBytes; if (!pSocket->IOCtl(FIONREAD, &nBytes)) nErrorCode = WSAGetLastError(); if (pSocket->m_pLastLayer) pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode); } else if (nEvent == FD_CLOSE) { // If there are still bytes left to read, call OnReceive instead of // OnClose and trigger a new OnClose DWORD nBytes = 0; if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes)) { if (nBytes > 0) { // Just repeat message. pSocket->ResendCloseNotify(); if (pSocket->m_pLastLayer) pSocket->m_pLastLayer->CallEvent(FD_READ, 0); return 0; } } pSocket->m_SocketData.onCloseCalled = true; if (pSocket->m_pLastLayer) pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode); } else if (pSocket->m_pLastLayer) pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode); } } return 0; } else if (message == WM_USER) //Notification event sent by a layer { //Verify parameters, lookup socket and notification message //Verify parameters ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd); if (!pWnd) return 0; if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage { return 0; } CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket; CAsyncSocketExLayer::t_LayerNotifyMsg *pMsg = (CAsyncSocketExLayer::t_LayerNotifyMsg *)lParam; if (!pMsg || !pSocket || pSocket->m_SocketData.hSocket != pMsg->hSocket) { delete pMsg; return 0; } int nEvent=pMsg->lEvent&0xFFFF; int nErrorCode=pMsg->lEvent>>16; //Dispatch to layer if (pMsg->pLayer) pMsg->pLayer->CallEvent(nEvent, nErrorCode); else { //Dispatch to CAsyncSocketEx instance switch (nEvent) { case FD_READ: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_READ; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnReceive(nErrorCode); } break; case FD_FORCEREAD: //Forceread does not check if theres data waiting #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_FORCEREAD; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnReceive(nErrorCode); } break; case FD_WRITE: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && !nErrorCode) { pSocket->m_nPendingEvents |= FD_WRITE; break; } else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() != connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_WRITE) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnSend(nErrorCode); } break; case FD_CONNECT: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting) pSocket->SetState(connected); else if (pSocket->GetState() == attached && !nErrorCode) pSocket->SetState(connected); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_CONNECT) pSocket->OnConnect(nErrorCode); #ifndef NOSOCKETSTATES if (!nErrorCode) { if (((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_WRITE)) pSocket->OnSend(0); } pSocket->m_nPendingEvents = 0; #endif //NOSOCKETSTATES break; case FD_ACCEPT: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == listening || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_ACCEPT)) #endif //NOSOCKETSTATES { pSocket->OnAccept(nErrorCode); } break; case FD_CLOSE: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == connected || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_CLOSE)) { pSocket->SetState(nErrorCode?aborted:closed); #else { #endif //NOSOCKETSTATES pSocket->OnClose(nErrorCode); } break; } } delete pMsg; return 0; } else if (message == WM_USER+1) { // WSAAsyncGetHostByName reply // Verify parameters ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd = (CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd); if (!pWnd) return 0; CAsyncSocketEx *pSocket = NULL; for (int i = 0; i < pWnd->m_nWindowDataSize; ++i) { pSocket = pWnd->m_pAsyncSocketExWindowData[i].m_pSocket; if (pSocket && pSocket->m_hAsyncGetHostByNameHandle && pSocket->m_hAsyncGetHostByNameHandle == (HANDLE)wParam && pSocket->m_pAsyncGetHostByNameBuffer) break; } if (!pSocket || !pSocket->m_pAsyncGetHostByNameBuffer) return 0; int nErrorCode = lParam >> 16; if (nErrorCode) { pSocket->OnConnect(nErrorCode); return 0; } SOCKADDR_IN sockAddr{}; sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = ((LPIN_ADDR)((LPHOSTENT)pSocket->m_pAsyncGetHostByNameBuffer)->h_addr)->s_addr; sockAddr.sin_port = htons(pSocket->m_nAsyncGetHostByNamePort); BOOL res = pSocket->Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr)); delete [] pSocket->m_pAsyncGetHostByNameBuffer; pSocket->m_pAsyncGetHostByNameBuffer = 0; pSocket->m_hAsyncGetHostByNameHandle = 0; if (!res) if (GetLastError() != WSAEWOULDBLOCK) pSocket->OnConnect(GetLastError()); return 0; } else if (message == WM_USER + 2) { //Verify parameters, lookup socket and notification message //Verify parameters if (!hWnd) return 0; CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); if (!pWnd) return 0; if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage return 0; CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket; if (!pSocket) return 0; // Process pending callbacks std::list<t_callbackMsg> tmp; tmp.swap(pSocket->m_pendingCallbacks); pSocket->OnLayerCallback(tmp); for (auto & cb : tmp) { delete [] cb.str; } } else if (message == WM_TIMER) { if (wParam != 1) return 0; ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd && pWnd->m_pThreadData); if (!pWnd || !pWnd->m_pThreadData) return 0; if (pWnd->m_pThreadData->layerCloseNotify.empty()) { KillTimer(hWnd, 1); return 0; } CAsyncSocketEx* socket = pWnd->m_pThreadData->layerCloseNotify.front(); pWnd->m_pThreadData->layerCloseNotify.pop_front(); if (pWnd->m_pThreadData->layerCloseNotify.empty()) KillTimer(hWnd, 1); if (socket) PostMessage(hWnd, socket->m_SocketData.nSocketIndex + WM_SOCKETEX_NOTIFY, socket->m_SocketData.hSocket, FD_CLOSE); return 0; } return DefWindowProc(hWnd, message, wParam, lParam); }

關於單個服務程序的框架,我已經介紹完了,如果你能完全理解我要表達的意思,我相信你也能構建出一套高性能服務程序來。

二、架構篇

一個項目的伺服器端往往由很多服務組成,就算單個服務在性能上做到極致,支持的並發數量也是有限的,舉個簡單的例子,假如一個聊天伺服器,每個用戶的信息是1k,那對於一個8G的內存的機器,在不考慮其它的情況下8*1024*1024*1024 / 100 = 1024,實際有838萬,但實際這只是非常理想的情況。所以我們有時候需要需要某個服務部署多套,就單個服務的實現來講還是《框架篇》中介紹的。我們舉個例子:

這是蘑菇街TeamTalk的伺服器架構。MsgServer是聊天服務,可以部署多套,每個聊天伺服器啟動時都會告訴loginSever和routeSever自己的ip地址和埠號,當有用戶上下或者下線的時候,MsgServer也會告訴loginSever和routeSever自己上面最新的用戶數量和用戶id列表。現在一個用戶需要登錄,先連接loginServer,loginServer根據記錄的各個MsgServer上的用戶情況,返回一個最小負載的MsgServer的ip地址和埠號給客戶端,客戶端再利用這個ip地址和埠號去登錄MsgServer。當聊天時,位於A MsgServer上的用戶給另外一個用戶發送消息,如果該用戶不在同一個MsgServer上,MsgServer將消息轉發給RouteServer,RouteServer根據自己記錄的用戶id信息找到目標用戶所在的MsgServer並轉發給對應的MsgServer。

上面是分散式部署的一個例子。我們再來看另外一個例子,這個例子是單個服務的策略,實際伺服器在處理網路數據的時候,如果同時有多個socket上有數據要處理,可能會出現一直服務前幾個socket,直到前幾個socket處理完畢後再處理後面幾個socket的數據。這就相當於,你去飯店吃飯,大家都點了菜,但是有些桌子上一直在上菜,而有些桌子上一直沒有菜。這樣肯定不好,我們來看下如何避免這種現象:

int CFtdEngine::HandlePackage(CFTDCPackage *pFTDCPackage, CFTDCSession *pSession) { //NET_IO_LOG0("CFtdEngine::HandlePackage
"); FTDC_PACKAGE_DEBUG(pFTDCPackage); if (pFTDCPackage->GetTID() != FTD_TID_ReqUserLogin) { if (!IsSessionLogin(pSession->GetSessionID())) { SendErrorRsp(pFTDCPackage, pSession, 1, "客戶未登錄"); return 0; } } CalcFlux(pSession, pFTDCPackage->Length()); //統計流量 REPORT_EVENT(LOG_DEBUG, "Front/Fgateway", "登錄請求%0x", pFTDCPackage->GetTID()); int nRet = 0; switch(pFTDCPackage->GetTID()) { case FTD_TID_ReqUserLogin: ///huwp:20070608:檢查過高版本的API將被禁止登錄 if (pFTDCPackage->GetVersion()>FTD_VERSION) { SendErrorRsp(pFTDCPackage, pSession, 1, "Too High FTD Version"); return 0; } nRet = OnReqUserLogin(pFTDCPackage, (CFTDCSession *)pSession); FTDRequestIndex.incValue(); break; case FTD_TID_ReqCheckUserLogin: nRet = OnReqCheckUserLogin(pFTDCPackage, (CFTDCSession *)pSession); FTDRequestIndex.incValue(); break; case FTD_TID_ReqSubscribeTopic: nRet = OnReqSubscribeTopic(pFTDCPackage, (CFTDCSession *)pSession); FTDRequestIndex.incValue(); break; } return 0; }

當有某個socket上有數據可讀時,接著接收該socket上的數據,對接收到的數據進行解包,然後調用CalcFlux(pSession, pFTDCPackage->Length())進行流量統計:

void CFrontEngine::CalcFlux(CSession *pSession, const int nFlux) { TFrontSessionInfo *pSessionInfo = m_mapSessionInfo.Find(pSession->GetSessionID()); if (pSessionInfo != NULL) { //流量控制改為計數 pSessionInfo->nCommFlux ++; ///若流量超過規定,則掛起該會話的讀操作 if (pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux) { pSession->SuspendRead(true); } } }

該函數會先讓某個連接會話(Session)處理的包數量遞增,接著判斷是否超過最大包數量,則設置讀掛起標誌:

void CSession::SuspendRead(bool bSuspend) { m_bSuspendRead = bSuspend; }

這樣下次將會從檢測的socket列表中排除該socket:

void CEpollReactor::RegisterIO(CEventHandler *pEventHandler) { int nReadID, nWriteID; pEventHandler->GetIds(&nReadID, &nWriteID); if (nWriteID != 0 && nReadID ==0) { nReadID = nWriteID; } if (nReadID != 0) { m_mapEventHandlerId[pEventHandler] = nReadID; struct epoll_event ev; ev.data.ptr = pEventHandler; if(epoll_ctl(m_fdEpoll, EPOLL_CTL_ADD, nReadID, &ev) != 0) { perror("epoll_ctl EPOLL_CTL_ADD"); } } }

void CSession::GetIds(int *pReadId, int *pWriteId) { m_pChannelProtocol->GetIds(pReadId,pWriteId); if (m_bSuspendRead) { *pReadId = 0; } }

也就是說不再檢測該socket上是否有數據可讀。然後在定時器里1秒後重置該標誌,這樣這個socket上有數據的話又可以重新檢測到了:

const int SESSION_CHECK_TIMER_ID = 9; const int SESSION_CHECK_INTERVAL = 1000;

SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL);

void CFrontEngine::OnTimer(int nIDEvent) { if (nIDEvent == SESSION_CHECK_TIMER_ID) { CSessionMap::iterator itor = m_mapSession.Begin(); while (!itor.IsEnd()) { TFrontSessionInfo *pFind = m_mapSessionInfo.Find((*itor)->GetSessionID()); if (pFind != NULL) { CheckSession(*itor, pFind); } itor++; } } } void CFrontEngine::CheckSession(CSession *pSession, TFrontSessionInfo *pSessionInfo) { ///重新開始計算流量 pSessionInfo->nCommFlux -= pSessionInfo->nMaxCommFlux; if (pSessionInfo->nCommFlux < 0) { pSessionInfo->nCommFlux = 0; } ///若流量超過規定,則掛起該會話的讀操作 pSession->SuspendRead(pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux);}

這就相當與飯店裡面先給某一桌客人上一些菜,讓他們先吃著,等上了一些菜之後不會再給這桌繼續上菜了,而是給其它空桌上菜,大家都吃上後,繼續回來給原先的桌子繼續上菜。實際上我們的飯店都是這麼做的。上面的例子是單服務流量控制的實現的一個非常好的思路,它保證了每個客戶端都能均衡地得到服務,而不是一些客戶端等很久才有響應。

另外加快伺服器處理速度的策略可能就是緩存了,緩存實際上是以空間換取時間的策略。對於一些反覆使用的,但是不經常改變的信息,如果從原始地點載入這些信息就比較耗時的數據(比如從磁碟中、從資料庫中),我們就可以使用緩存。所以時下像redis、leveldb、fastdb等各種內存資料庫大行其道。我在flamingo中用戶的基本信息都是緩存在聊天服務程序中的,而文件服務啟動時會去載入指定目錄裡面的所有程序名稱,這些文件的名稱都是md5,為該文件內容的md5。這樣當客戶端上傳了新文件請求時,如果其傳上來的文件md5已經位於緩存中,則表明該文件在伺服器上已經存在,這個時候伺服器就不必再接收該文件了,而是告訴客戶端文件已經上傳成功了。

說了這麼多,一般來說,一個伺服器的架構,往往更多取決於其具體的業務,我們要在結合當前的情況來實際去組織鋪排,沒有一套系統是萬能的。多思考,多實踐,多總結,相信很快你也能擁有很不錯的架構能力。

鑒於筆者能力和經驗有限,文中難免有錯漏之處,歡迎提意見。 交流QQ群:49114021

推薦閱讀:

伺服器編程心得(五)—— 如何編寫高性能日誌

TAG:伺服器編程 | 網路編程 | 後端技術 |