標籤:

伺服器端編程心得(二)—— Reactor模式

最近一直在看游雙的《高性能linux伺服器編程》一書,下載鏈接: download.csdn.net/detai

書上是這麼介紹Reactor模式的:

按照這個思路,我寫個簡單的練習:

/** *@desc: 用reactor模式練習伺服器程序,main.cpp *@author: zhangyl *@date: 2016.11.23 */ #include <iostream> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> //for htonl() and htons() #include <unistd.h> #include <fcntl.h> #include <sys/epoll.h> #include <signal.h> //for signal() #include <pthread.h> #include <semaphore.h> #include <list> #include <errno.h> #include <time.h> #include <sstream> #include <iomanip> //for std::setw()/setfill() #include <stdlib.h> #define WORKER_THREAD_NUM 5 #define min(a, b) ((a <= b) ? (a) : (b)) int g_epollfd = 0; bool g_bStop = false; int g_listenfd = 0; pthread_t g_acceptthreadid = 0; pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 }; pthread_cond_t g_acceptcond; pthread_mutex_t g_acceptmutex; pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/; pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/; pthread_mutex_t g_clientmutex; std::list<int> g_listClients; void prog_exit(int signo) { ::signal(SIGINT, SIG_IGN); ::signal(SIGKILL, SIG_IGN); ::signal(SIGTERM, SIG_IGN); std::cout << "program recv signal " << signo << " to exit." << std::endl; g_bStop = true; ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL); //TODO: 是否需要先調用shutdown()一下? ::shutdown(g_listenfd, SHUT_RDWR); ::close(g_listenfd); ::close(g_epollfd); ::pthread_cond_destroy(&g_acceptcond); ::pthread_mutex_destroy(&g_acceptmutex); ::pthread_cond_destroy(&g_cond); ::pthread_mutex_destroy(&g_mutex); ::pthread_mutex_destroy(&g_clientmutex); } bool create_server_listener(const char* ip, short port) { g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); if (g_listenfd == -1) return false; int on = 1; ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on)); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = inet_addr(ip); servaddr.sin_port = htons(port); if (::bind(g_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1) return false; if (::listen(g_listenfd, 50) == -1) return false; g_epollfd = ::epoll_create(1); if (g_epollfd == -1) return false; struct epoll_event e; memset(&e, 0, sizeof(e)); e.events = EPOLLIN | EPOLLRDHUP; e.data.fd = g_listenfd; if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1) return false; return true; } void release_client(int clientfd) { if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1) std::cout << "release client socket failed as call epoll_ctl failed" << std::endl; ::close(clientfd); } void* accept_thread_func(void* arg) { while (!g_bStop) { ::pthread_mutex_lock(&g_acceptmutex); ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex); //::pthread_mutex_lock(&g_acceptmutex); //std::cout << "run loop in accept_thread_func" << std::endl; struct sockaddr_in clientaddr; socklen_t addrlen; int newfd = ::accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen); ::pthread_mutex_unlock(&g_acceptmutex); if (newfd == -1) continue; std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl; //將新socket設置為non-blocking int oldflag = ::fcntl(newfd, F_GETFL, 0); int newflag = oldflag | O_NONBLOCK; if (::fcntl(newfd, F_SETFL, newflag) == -1) { std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl; continue; } struct epoll_event e; memset(&e, 0, sizeof(e)); e.events = EPOLLIN | EPOLLRDHUP | EPOLLET; e.data.fd = newfd; if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1) { std::cout << "epoll_ctl error, fd =" << newfd << std::endl; } } return NULL; } void* worker_thread_func(void* arg) { while (!g_bStop) { int clientfd; ::pthread_mutex_lock(&g_clientmutex); while (g_listClients.empty()) ::pthread_cond_wait(&g_cond, &g_clientmutex); clientfd = g_listClients.front(); g_listClients.pop_front(); pthread_mutex_unlock(&g_clientmutex); //gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來 std::cout << std::endl; std::string strclientmsg; char buff[256]; bool bError = false; while (true) { memset(buff, 0, sizeof(buff)); int nRecv = ::recv(clientfd, buff, 256, 0); if (nRecv == -1) { if (errno == EWOULDBLOCK) break; else { std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl; release_client(clientfd); bError = true; break; } } //對端關閉了socket,這端也關閉。 else if (nRecv == 0) { std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl; release_client(clientfd); bError = true; break; } strclientmsg += buff; } //出錯了,就不要再繼續往下執行了 if (bError) continue; std::cout << "client msg: " << strclientmsg; //將消息加上時間標籤後發回 time_t now = time(NULL); struct tm* nowstr = localtime(&now); std::ostringstream ostimestr; ostimestr << "[" << nowstr->tm_year + 1900 << "-" << std::setw(2) << std::setfill(0) << nowstr->tm_mon + 1 << "-" << std::setw(2) << std::setfill(0) << nowstr->tm_mday << " " << std::setw(2) << std::setfill(0) << nowstr->tm_hour << ":" << std::setw(2) << std::setfill(0) << nowstr->tm_min << ":" << std::setw(2) << std::setfill(0) << nowstr->tm_sec << "]server reply: "; strclientmsg.insert(0, ostimestr.str()); while (true) { int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0); if (nSent == -1) { if (errno == EWOULDBLOCK) { ::sleep(10); continue; } else { std::cout << "send error, fd = " << clientfd << std::endl; release_client(clientfd); break; } } std::cout << "send: " << strclientmsg; strclientmsg.erase(0, nSent); if (strclientmsg.empty()) break; } } return NULL; } void daemon_run() { int pid; signal(SIGCHLD, SIG_IGN); //1)在父進程中,fork返回新創建子進程的進程ID; //2)在子進程中,fork返回0; //3)如果出現錯誤,fork返回一個負值; pid = fork(); if (pid < 0) { std:: cout << "fork error" << std::endl; exit(-1); } //父進程退出,子進程獨立運行 else if (pid > 0) { exit(0); } //之前parent和child運行在同一個session里,parent是會話(session)的領頭進程, //parent進程作為會話的領頭進程,如果exit結束執行的話,那麼子進程會成為孤兒進程,並被init收養。 //執行setsid()之後,child將重新獲得一個新的會話(session)id。 //這時parent退出之後,將不會影響到child了。 setsid(); int fd; fd = open("/dev/null", O_RDWR, 0); if (fd != -1) { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); } if (fd > 2) close(fd); } int main(int argc, char* argv[]) { short port = 0; int ch; bool bdaemon = false; while ((ch = getopt(argc, argv, "p:d")) != -1) { switch (ch) { case d: bdaemon = true; break; case p: port = atol(optarg); break; } } if (bdaemon) daemon_run(); if (port == 0) port = 12345; if (!create_server_listener("0.0.0.0", port)) { std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl; return -1; } //設置信號處理 signal(SIGCHLD, SIG_DFL); signal(SIGPIPE, SIG_IGN); signal(SIGINT, prog_exit); signal(SIGKILL, prog_exit); signal(SIGTERM, prog_exit); ::pthread_cond_init(&g_acceptcond, NULL); ::pthread_mutex_init(&g_acceptmutex, NULL); ::pthread_cond_init(&g_cond, NULL); ::pthread_mutex_init(&g_mutex, NULL); ::pthread_mutex_init(&g_clientmutex, NULL); ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL); //啟動工作線程 for (int i = 0; i < WORKER_THREAD_NUM; ++i) { ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL); } while (!g_bStop) { struct epoll_event ev[1024]; int n = ::epoll_wait(g_epollfd, ev, 1024, 10); if (n == 0) continue; else if (n < 0) { std::cout << "epoll_wait error" << std::endl; continue; } int m = min(n, 1024); for (int i = 0; i < m; ++i) { //通知接收連接線程接收新連接 if (ev[i].data.fd == g_listenfd) pthread_cond_signal(&g_acceptcond); //通知普通工作線程接收數據 else { pthread_mutex_lock(&g_clientmutex); g_listClients.push_back(ev[i].data.fd); pthread_mutex_unlock(&g_clientmutex); pthread_cond_signal(&g_cond); //std::cout << "signal" << std::endl; } } } return 0; }

程序的功能一個簡單的echo服務:客戶端連接上伺服器之後,給伺服器發送信息,伺服器加上時間戳等信息後返回給客戶端。使用到的知識點有:

1. 條件變數

2.epoll的邊緣觸發模式

程序的大致框架是:

1. 主線程只負責監聽偵聽socket上是否有新連接,如果有新連接到來,交給一個叫accept的工作線程去接收新連接,並將新連接socket綁定到主線程使用epollfd上去。

2. 主線程如果偵聽到客戶端的socket上有可讀事件,則通知另外五個工作線程去接收處理客戶端發來的數據,並將數據加上時間戳後發回給客戶端。

3. 可以通過傳遞-p port來設置程序的監聽埠號;可以通過傳遞-d來使程序以daemon模式運行在後台。這也是標準linux daemon模式的書寫方法。

程序難點和需要注意的地方是:

1. 條件變數為了防止虛假喚醒,一定要在一個循環裡面調用pthread_cond_wait()函數,我在worker_thread_func()中使用了:

while (g_listClients.empty()) ::pthread_cond_wait(&g_cond, &g_clientmutex);

在accept_thread_func()函數裡面我沒有使用循環,這樣會有問題嗎?

2. 使用條件變數pthread_cond_wait()函數的時候一定要先獲得與該條件變數相關的mutex,即像下面這樣的結構:

mutex_lock(...); while (condition is true) ::pthread_cond_wait(...); //這裡可以有其他代碼... mutex_unlock(...); //這裡可以有其他代碼...

因為pthread_cond_wait()如果阻塞的話,它解鎖相關mutex和阻塞當前線程這兩個動作加在一起是原子的。

3. 作為伺服器端程序最好對偵聽socket調用setsocketopt()設置SO_REUSEADDR和SO_REUSEPORT兩個標誌,因為服務程序有時候會需要重啟(比如調試的時候就會不斷重啟),如果不設置這兩個標誌的話,綁定埠時就會調用失敗。因為一個埠使用後,即使不再使用,因為四次揮手該埠處於TIME_WAIT狀態,有大約2min的MSL(Maximum Segment Lifetime,最大存活期)。這2min內,該埠是不能被重複使用的。你的伺服器程序上次使用了這個埠號,接著重啟,因為這個緣故,你再次綁定這個埠就會失敗(bind函數調用失敗)。要不你就每次重啟時需要等待2min後再試(這在頻繁重啟程序調試是難以接收的),或者設置這種SO_REUSEADDR和SO_REUSEPORT立即回收埠使用。

其實,SO_REUSEADDR在windows上和Unix平台上還有些細微的區別,我在libevent源碼中看到這樣的描述:

int evutil_make_listen_socket_reuseable(evutil_socket_t sock) { #ifndef WIN32 int one = 1; /* REUSEADDR on Unix means, "dont hang on to this address after the * listener is closed." On Windows, though, it means "dont keep other * processes from binding to this address while were using it. */ return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one, (ev_socklen_t)sizeof(one)); #else return 0; #endif }

注意注釋部分,在Unix平台上設置這個選項意味著,任意進程可以復用該地址;而在windows,不要阻止其他進程復用該地址。也就是在在Unix平台上,如果不設置這個選項,任意進程在一定時間內,不能bind該地址;在windows平台上,在一定時間內,其他進程不能bind該地址,而本進程卻可以再次bind該地址。

4. epoll_wait對新連接socket使用的是邊緣觸發模式EPOLLET(edge trigger),而不是默認的水平觸發模式(level trigger)。因為如果採取水平觸發模式的話,主線程檢測到某個客戶端socket數據可讀時,通知工作線程去收取該socket上的數據,這個時候主線程繼續循環,只要在工作線程沒有將該socket上數據全部收完,或者在工作線程收取數據的過程中,客戶端有新數據到來,主線程會繼續發通知(通過pthread_cond_signal())函數,再次通知工作線程收取數據。這樣會可能導致多個工作線程同時調用recv函數收取該客戶端socket上的數據,這樣產生的結果將會導致數據錯亂。

相反,採取邊緣觸發模式,只有等某個工作線程將那個客戶端socket上數據全部收取完畢,主線程的epoll_wait才可能會再次觸發來通知工作線程繼續收取那個客戶端socket新來的數據。

5. 代碼中有這樣一行:

//gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來 std::cout << std::endl;

如果不加上這一行,正常運行伺服器程序,程序中要列印到控制台的信息都會列印出來,但是如果用gdb調試狀態下,程序的所有輸出就不顯示了。我不知道這是不是gdb的一個bug,所以這裡加上std::endl來輸出一個換行符並flush標準輸出,讓輸出顯示出來。(std::endl不僅是輸出一個換行符而且是同時刷新輸出,相當於fflush()函數)。

程序我部署起來了,你可以使用linux的nc命令或自己寫程序連接伺服器來查看程序效果,當然也可以使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

然後就可以給伺服器自由發送數據了,伺服器會給你發送的信息加上時間戳返回給你。效果如圖:

另外我將這個代碼改寫了成純C++11版本,使用CMake編譯,為了支持編譯必須加上這-std=c++11:

CMakeLists.txt代碼如下:

cmake_minimum_required(VERSION 2.8) PROJECT(myreactorserver) AUX_SOURCE_DIRECTORY(./ SRC_LIST) SET(EXECUTABLE_OUTPUT_PATH ./) ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11) INCLUDE_DIRECTORIES( ./ ) LINK_DIRECTORIES( ./ ) set( main.cpp myreator.cpp ) ADD_EXECUTABLE(myreactorserver ${SRC_LIST}) TARGET_LINK_LIBRARIES(myreactorserver pthread)

myreactor.h文件內容:

/** *@desc: myreactor頭文件, myreactor.h *@author: zhangyl *@date: 2016.12.03 */ #ifndef __MYREACTOR_H__ #define __MYREACTOR_H__ #include <list> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #define WORKER_THREAD_NUM 5 class CMyReactor { public: CMyReactor(); ~CMyReactor(); bool init(const char* ip, short nport); bool uninit(); bool close_client(int clientfd); static void* main_loop(void* p); private: //no copyable CMyReactor(const CMyReactor& rhs); CMyReactor& operator = (const CMyReactor& rhs); bool create_server_listener(const char* ip, short port); static void accept_thread_proc(CMyReactor* pReatcor); static void worker_thread_proc(CMyReactor* pReatcor); private: //C11語法可以在這裡初始化 int m_listenfd = 0; int m_epollfd = 0; bool m_bStop = false; std::shared_ptr<std::thread> m_acceptthread; std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM]; std::condition_variable m_acceptcond; std::mutex m_acceptmutex; std::condition_variable m_workercond ; std::mutex m_workermutex; std::list<int> m_listClients; }; #endif //!__MYREACTOR_H__

myreactor.cpp文件內容:

/** *@desc: myreactor實現文件, myreactor.cpp *@author: zhangyl *@date: 2016.12.03 */ #include "myreactor.h" #include <iostream> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> //for htonl() and htons() #include <fcntl.h> #include <sys/epoll.h> #include <list> #include <errno.h> #include <time.h> #include <sstream> #include <iomanip> //for std::setw()/setfill() #include <unistd.h> #define min(a, b) ((a <= b) ? (a) : (b)) CMyReactor::CMyReactor() { //m_listenfd = 0; //m_epollfd = 0; //m_bStop = false; } CMyReactor::~CMyReactor() { } bool CMyReactor::init(const char* ip, short nport) { if (!create_server_listener(ip, nport)) { std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl; return false; } std::cout << "main thread id = " << std::this_thread::get_id() << std::endl; //啟動接收新連接的線程 m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this)); //啟動工作線程 for (auto& t : m_workerthreads) { t.reset(new std::thread(CMyReactor::worker_thread_proc, this)); } return true; } bool CMyReactor::uninit() { m_bStop = true; m_acceptcond.notify_one(); m_workercond.notify_all(); m_acceptthread->join(); for (auto& t : m_workerthreads) { t->join(); } ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL); //TODO: 是否需要先調用shutdown()一下? ::shutdown(m_listenfd, SHUT_RDWR); ::close(m_listenfd); ::close(m_epollfd); return true; } bool CMyReactor::close_client(int clientfd) { if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1) { std::cout << "close client socket failed as call epoll_ctl failed" << std::endl; //return false; } ::close(clientfd); return true; } void* CMyReactor::main_loop(void* p) { std::cout << "main thread id = " << std::this_thread::get_id() << std::endl; CMyReactor* pReatcor = static_cast<CMyReactor*>(p); while (!pReatcor->m_bStop) { struct epoll_event ev[1024]; int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10); if (n == 0) continue; else if (n < 0) { std::cout << "epoll_wait error" << std::endl; continue; } int m = min(n, 1024); for (int i = 0; i < m; ++i) { //通知接收連接線程接收新連接 if (ev[i].data.fd == pReatcor->m_listenfd) pReatcor->m_acceptcond.notify_one(); //通知普通工作線程接收數據 else { { std::unique_lock<std::mutex> guard(pReatcor->m_workermutex); pReatcor->m_listClients.push_back(ev[i].data.fd); } pReatcor->m_workercond.notify_one(); //std::cout << "signal" << std::endl; }// end if }// end for-loop }// end while std::cout << "main loop exit ..." << std::endl; return NULL; } void CMyReactor::accept_thread_proc(CMyReactor* pReatcor) { std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl; while (true) { int newfd; struct sockaddr_in clientaddr; socklen_t addrlen; { std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex); pReatcor->m_acceptcond.wait(guard); if (pReatcor->m_bStop) break; //std::cout << "run loop in accept_thread_proc" << std::endl; newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen); } if (newfd == -1) continue; std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl; //將新socket設置為non-blocking int oldflag = ::fcntl(newfd, F_GETFL, 0); int newflag = oldflag | O_NONBLOCK; if (::fcntl(newfd, F_SETFL, newflag) == -1) { std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl; continue; } struct epoll_event e; memset(&e, 0, sizeof(e)); e.events = EPOLLIN | EPOLLRDHUP | EPOLLET; e.data.fd = newfd; if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1) { std::cout << "epoll_ctl error, fd =" << newfd << std::endl; } } std::cout << "accept thread exit ..." << std::endl; } void CMyReactor::worker_thread_proc(CMyReactor* pReatcor) { std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl; while (true) { int clientfd; { std::unique_lock<std::mutex> guard(pReatcor->m_workermutex); while (pReatcor->m_listClients.empty()) { if (pReatcor->m_bStop) { std::cout << "worker thread exit ..." << std::endl; return; } pReatcor->m_workercond.wait(guard); } clientfd = pReatcor->m_listClients.front(); pReatcor->m_listClients.pop_front(); } //gdb調試時不能實時刷新標準輸出,用這個函數刷新標準輸出,使信息在屏幕上實時顯示出來 std::cout << std::endl; std::string strclientmsg; char buff[256]; bool bError = false; while (true) { memset(buff, 0, sizeof(buff)); int nRecv = ::recv(clientfd, buff, 256, 0); if (nRecv == -1) { if (errno == EWOULDBLOCK) break; else { std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl; pReatcor->close_client(clientfd); bError = true; break; } } //對端關閉了socket,這端也關閉。 else if (nRecv == 0) { std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl; pReatcor->close_client(clientfd); bError = true; break; } strclientmsg += buff; } //出錯了,就不要再繼續往下執行了 if (bError) continue; std::cout << "client msg: " << strclientmsg; //將消息加上時間標籤後發回 time_t now = time(NULL); struct tm* nowstr = localtime(&now); std::ostringstream ostimestr; ostimestr << "[" << nowstr->tm_year + 1900 << "-" << std::setw(2) << std::setfill(0) << nowstr->tm_mon + 1 << "-" << std::setw(2) << std::setfill(0) << nowstr->tm_mday << " " << std::setw(2) << std::setfill(0) << nowstr->tm_hour << ":" << std::setw(2) << std::setfill(0) << nowstr->tm_min << ":" << std::setw(2) << std::setfill(0) << nowstr->tm_sec << "]server reply: "; strclientmsg.insert(0, ostimestr.str()); while (true) { int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0); if (nSent == -1) { if (errno == EWOULDBLOCK) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } else { std::cout << "send error, fd = " << clientfd << std::endl; pReatcor->close_client(clientfd); break; } } std::cout << "send: " << strclientmsg; strclientmsg.erase(0, nSent); if (strclientmsg.empty()) break; } } } bool CMyReactor::create_server_listener(const char* ip, short port) { m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); if (m_listenfd == -1) return false; int on = 1; ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on)); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = inet_addr(ip); servaddr.sin_port = htons(port); if (::bind(m_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1) return false; if (::listen(m_listenfd, 50) == -1) return false; m_epollfd = ::epoll_create(1); if (m_epollfd == -1) return false; struct epoll_event e; memset(&e, 0, sizeof(e)); e.events = EPOLLIN | EPOLLRDHUP; e.data.fd = m_listenfd; if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1) return false; return true; }

main.cpp文件內容:

/** *@desc: 用reactor模式練習伺服器程序 *@author: zhangyl *@date: 2016.12.03 */ #include <iostream> #include <signal.h> //for signal() #include<unistd.h> #include <stdlib.h> //for exit() #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include "myreactor.h" CMyReactor g_reator; void prog_exit(int signo) { std::cout << "program recv signal " << signo << " to exit." << std::endl; g_reator.uninit(); } void daemon_run() { int pid; signal(SIGCHLD, SIG_IGN); //1)在父進程中,fork返回新創建子進程的進程ID; //2)在子進程中,fork返回0; //3)如果出現錯誤,fork返回一個負值; pid = fork(); if (pid < 0) { std:: cout << "fork error" << std::endl; exit(-1); } //父進程退出,子進程獨立運行 else if (pid > 0) { exit(0); } //之前parent和child運行在同一個session里,parent是會話(session)的領頭進程, //parent進程作為會話的領頭進程,如果exit結束執行的話,那麼子進程會成為孤兒進程,並被init收養。 //執行setsid()之後,child將重新獲得一個新的會話(session)id。 //這時parent退出之後,將不會影響到child了。 setsid(); int fd; fd = open("/dev/null", O_RDWR, 0); if (fd != -1) { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); } if (fd > 2) close(fd); } int main(int argc, char* argv[]) { //設置信號處理 signal(SIGCHLD, SIG_DFL); signal(SIGPIPE, SIG_IGN); signal(SIGINT, prog_exit); signal(SIGKILL, prog_exit); signal(SIGTERM, prog_exit); short port = 0; int ch; bool bdaemon = false; while ((ch = getopt(argc, argv, "p:d")) != -1) { switch (ch) { case d: bdaemon = true; break; case p: port = atol(optarg); break; } } if (bdaemon) daemon_run(); if (port == 0) port = 12345; if (!g_reator.init("0.0.0.0", 12345)) return -1; g_reator.main_loop(&g_reator); return 0; }

完整實例代碼下載地址:

普通版本:pan.baidu.com/s/1o82Mkn

C++11版本:pan.baidu.com/s/1dEJdri


推薦閱讀:

深挖NUMA
讓計算更智慧 | 聯想發布全新ThinkSystem、ThinkAgile品牌把握AI風口
loki設計構想

TAG:伺服器架構 |