Swoole 源碼分析——ReactorBase
來自專欄 Swoole
前言
作為一個網路框架,最為核心的就是消息的接受與發送。高效的 reactor
模式一直是眾多網路框架的首要選擇,本節主要講解 swoole
中的 reactor
模塊。
UNP 學習筆記——IO 復用
Reactor
的數據結構
Reactor
的數據結構比較複雜,首先object
是具體Reactor
對象的首地址,ptr
是擁有Reactor
對象的類的指針,event_num
存放現有監控的fd
個數,max_event_num
存放允許持有的最大事件數目,flag
為標記位,id
用於存放對應reactor
的id
,running
用於標記該reactor
是否正在運行,一般是創建時會被置為 1,start
標記著reactor
是否已經被啟動,一般是進行wait
監控時被置為 1,once
標誌著reactor
是否是僅需要一次性監控,check_timer
標誌著是否要檢查定時任務singal_no
:每次reactor
由於fd
的就緒返回時,reactor
都會檢查這個singal_no
,如果這個值不為空,那麼就會調用相應的信號回調函數disable_accept
標誌著是否接受新的連接,這個只有主reactor
中才會設置為 0,其他reactor
線程不需要接受新的連接,只需要接受數據即可check_signalfd
標誌著是否需要檢查signalfd
thread
用於標記當前是使用reactor
多線程模式還是多進程模式,一般都會使用多線程模式timeout_msec
用於記錄每次reactor->wait
的超時max_socket
記錄著reactor
中最大的連接數,與max_connection
的值一致;socket_list
是reactor
多線程模式的監聽的socket
,與connection_list
保持一致;socket_array
是reactor
多進程模式中的監聽的fd
handle
是默認就緒的回調函數,write_handle
是寫就緒的回調函數,error_handle
包含錯誤就緒的回調函數timewheel
、heartbeat_interval
、last_heartbeat_time
是心跳檢測,專門剔除空閑連接last_malloc_trim_time
記錄了上次返還給系統的時間,swoole
會定期的通過malloc_trim
函數返回空閑的內存空間
struct _swReactor{ void *object; void *ptr; //reserve /** * last signal number */ int singal_no; uint32_t event_num; uint32_t max_event_num; uint32_t check_timer :1; uint32_t running :1; uint32_t start :1; uint32_t once :1; /** * disable accept new connection */ uint32_t disable_accept :1; uint32_t check_signalfd :1; /** * multi-thread reactor, cannot realloc sockets. */ uint32_t thread :1; /** * reactor->wait timeout (millisecond) or -1 */ int32_t timeout_msec; uint16_t id; //Reactor ID uint16_t flag; //flag uint32_t max_socket;#ifdef SW_USE_MALLOC_TRIM time_t last_malloc_trim_time;#endif#ifdef SW_USE_TIMEWHEEL swTimeWheel *timewheel; uint16_t heartbeat_interval; time_t last_heartbeat_time;#endif /** * for thread */ swConnection *socket_list; /** * for process */ swArray *socket_array; swReactor_handle handle[SW_MAX_FDTYPE]; //默認事件 swReactor_handle write_handle[SW_MAX_FDTYPE]; //擴展事件1(一般為寫事件) swReactor_handle error_handle[SW_MAX_FDTYPE]; //擴展事件2(一般為錯誤事件,如socket關閉) int (*add)(swReactor *, int fd, int fdtype); int (*set)(swReactor *, int fd, int fdtype); int (*del)(swReactor *, int fd); int (*wait)(swReactor *, struct timeval *); void (*free)(swReactor *); int (*setHandle)(swReactor *, int fdtype, swReactor_handle); swDefer_callback *defer_callback_list; swDefer_callback idle_task; swDefer_callback future_task; void (*onTimeout)(swReactor *); void (*onFinish)(swReactor *); void (*onBegin)(swReactor *); void (*enable_accept)(swReactor *); int (*can_exit)(swReactor *); int (*write)(swReactor *, int, void *, int); int (*close)(swReactor *, int); int (*defer)(swReactor *, swCallback, void *);};
reactor
的創建
reactor
的創建主要是調用swReactorEpoll_create
函數setHandle
函數是為監聽的fd
設置回調函數,包括讀就緒、寫就緒、錯誤onFinish
是每次調用epoll
函數返回後,處理具體邏輯後,最後調用的回調函數onTimeout
是每次調用epoll
函數超時後的回調函數write
函數是利用reactor
向socket
發送數據的介面defer
函數用於添加defer_callback_list
成員變數,這個成員變數是回調函數列表,epoll
函數超時和onFinish
都會循環defer_callback_list
裡面的回調函數socket_array
是監聽的fd
列表
int swReactor_create(swReactor *reactor, int max_event){ int ret; bzero(reactor, sizeof(swReactor));#ifdef HAVE_EPOLL ret = swReactorEpoll_create(reactor, max_event); reactor->running = 1; reactor->setHandle = swReactor_setHandle; reactor->onFinish = swReactor_onFinish; reactor->onTimeout = swReactor_onTimeout; reactor->write = swReactor_write; reactor->defer = swReactor_defer; reactor->close = swReactor_close; reactor->socket_array = swArray_new(1024, sizeof(swConnection)); if (!reactor->socket_array) { swWarn("create socket array failed."); return SW_ERR; } return ret;}
reactor
的函數
reactor
設置文件就緒回調函數 swReactor_setHandle
reactor
中設置的fd
由兩部分構成,一種是swFd_type
,標識著文件描述符的類型,一種是swEvent_type
標識著文件描述符感興趣的讀寫事件
enum swFd_type{ SW_FD_TCP = 0, //tcp socket SW_FD_LISTEN = 1, //server socket SW_FD_CLOSE = 2, //socket closed SW_FD_ERROR = 3, //socket error SW_FD_UDP = 4, //udp socket SW_FD_PIPE = 5, //pipe SW_FD_STREAM = 6, //stream socket SW_FD_WRITE = 7, //fd can write SW_FD_TIMER = 8, //timer fd SW_FD_AIO = 9, //linux native aio SW_FD_SIGNAL = 11, //signalfd SW_FD_DNS_RESOLVER = 12, //dns resolver SW_FD_INOTIFY = 13, //server socket SW_FD_USER = 15, //SW_FD_USER or SW_FD_USER+n: for custom event SW_FD_STREAM_CLIENT = 16, //swClient stream SW_FD_DGRAM_CLIENT = 17, //swClient dgram};enum swEvent_type{ SW_EVENT_DEAULT = 256, SW_EVENT_READ = 1u << 9, SW_EVENT_WRITE = 1u << 10, SW_EVENT_ERROR = 1u << 11, SW_EVENT_ONCE = 1u << 12,};
swReactor_fdtype
用於從文件描述符中提取swFd_type
,也就是文件描述符的類型:
static sw_inline int swReactor_fdtype(int fdtype){ return fdtype & (~SW_EVENT_READ) & (~SW_EVENT_WRITE) & (~SW_EVENT_ERROR);}
swReactor_event_read
、swReactor_event_write
、swReactor_event_error
這三個函數與swFd_type
正相反,是從文件描述符中提取讀寫事件
static sw_inline int swReactor_event_read(int fdtype){ return (fdtype < SW_EVENT_DEAULT) || (fdtype & SW_EVENT_READ);}static sw_inline int swReactor_event_write(int fdtype){ return fdtype & SW_EVENT_WRITE;}static sw_inline int swReactor_event_error(int fdtype){ return fdtype & SW_EVENT_ERROR;}
swReactor_setHandle
用於為文件描述符_fdtype
設定讀就緒、寫就緒的回調函數
int swReactor_setHandle(swReactor *reactor, int _fdtype, swReactor_handle handle){ int fdtype = swReactor_fdtype(_fdtype); if (fdtype >= SW_MAX_FDTYPE) { swWarn("fdtype > SW_MAX_FDTYPE[%d]", SW_MAX_FDTYPE); return SW_ERR; } if (swReactor_event_read(_fdtype)) { reactor->handle[fdtype] = handle; } else if (swReactor_event_write(_fdtype)) { reactor->write_handle[fdtype] = handle; } else if (swReactor_event_error(_fdtype)) { reactor->error_handle[fdtype] = handle; } else { swWarn("unknow fdtype"); return SW_ERR; } return SW_OK;}
reactor
添加 defer
函數
defer
函數會在每次事件循環結束或超時的時候調用swReactor_defer
函數會為defer_callback_list
添加新的回調函數
static int swReactor_defer(swReactor *reactor, swCallback callback, void *data){ swDefer_callback *cb = sw_malloc(sizeof(swDefer_callback)); if (!cb) { swWarn("malloc(%ld) failed.", sizeof(swDefer_callback)); return SW_ERR; } cb->callback = callback; cb->data = data; LL_APPEND(reactor->defer_callback_list, cb); return SW_OK;}
reactor
超時回調函數
epoll
在設置的時間內沒有返回的話,也會自動返回,這個時候就會調用超時回調函數:
static void swReactor_onTimeout(swReactor *reactor){ swReactor_onTimeout_and_Finish(reactor); if (reactor->disable_accept) { reactor->enable_accept(reactor); reactor->disable_accept = 0; }}
swReactor_onTimeout_and_Finish
函數用於在超時、finish
等情況下調用- 這個函數首先會檢查是否存在定時任務,如果有定時任務就會調用
swTimer_select
執行回調函數 - 接下來就要執行存儲在
defer_callback_list
的多個回調函數, 該list
是事先定義好的需要defer
執行的函數 idle_task
是EventLoop
中使用的每一輪事件循環結束時調用的函數。- 如果當前
reactor
當前在work
進程,那麼就要調用swWorker_try_to_exit
函數來判斷event_num
是不是為 0,如果為 0 ,那麼就置running
為0,停止等待事件就緒 - 如果當前
SwooleG.serv
為空,swReactor_empty
函數用於判斷當前reactor
是否還有事件在監聽,如果沒有,那麼就會設置running
為 0 - 判斷當前時間是否可以調用
malloc_trim
釋放空閑的內存,如果距離上次釋放內存的時間超過了SW_MALLOC_TRIM_INTERVAL
,就更新last_malloc_trim_time
並調用malloc_trim
static void swReactor_onTimeout_and_Finish(swReactor *reactor){ //check timer if (reactor->check_timer) { swTimer_select(&SwooleG.timer); } //defer callback swDefer_callback *cb, *tmp; swDefer_callback *defer_callback_list = reactor->defer_callback_list; reactor->defer_callback_list = NULL; LL_FOREACH(defer_callback_list, cb) { cb->callback(cb->data); } LL_FOREACH_SAFE(defer_callback_list, cb, tmp) { sw_free(cb); } //callback at the end if (reactor->idle_task.callback) { reactor->idle_task.callback(reactor->idle_task.data); }#ifdef SW_COROUTINE //coro timeout if (!swIsMaster()) { coro_handle_timeout(); }#endif //server worker swWorker *worker = SwooleWG.worker; if (worker != NULL) { if (SwooleWG.wait_exit == 1) { swWorker_try_to_exit(); } } //not server, the event loop is empty if (SwooleG.serv == NULL && swReactor_empty(reactor)) { reactor->running = 0; }#ifdef SW_USE_MALLOC_TRIM if (SwooleG.serv && reactor->last_malloc_trim_time < SwooleG.serv->gs->now - SW_MALLOC_TRIM_INTERVAL) { malloc_trim(SW_MALLOC_TRIM_PAD); reactor->last_malloc_trim_time = SwooleG.serv->gs->now; }#endif}
swReactor_empty
用來判斷當前的reactor
是否還有事件需要監聽- 可以從函數中可以看出來,如果定時任務
timer
裡面還有等待的任務,那麼就可以返回 false event_num
如果為 0,可以返回 true,結束事件循環- 對於協程來說,還要調用
can_exit
來判斷是否可以退出事件循環
int swReactor_empty(swReactor *reactor){ //timer if (SwooleG.timer.num > 0) { return SW_FALSE; } int empty = SW_FALSE; //thread pool if (SwooleAIO.init && reactor->event_num == 1 && SwooleAIO.task_num == 0) { empty = SW_TRUE; } //no event else if (reactor->event_num == 0) { empty = SW_TRUE; } //coroutine if (empty && reactor->can_exit && reactor->can_exit(reactor)) { empty = SW_TRUE; } return empty;}
reactor
事件循環結束函數
- 每次事件循環結束之後,都會調用
onFinish
函數 - 該函數主要函數調用
swReactor_onTimeout_and_Finish
,在此之前還會檢查在事件循環過程中是否有信號觸發
static void swReactor_onFinish(swReactor *reactor){ //check signal if (reactor->singal_no) { swSignal_callback(reactor->singal_no); reactor->singal_no = 0; } swReactor_onTimeout_and_Finish(reactor);}
reactor
事件循環關閉函數
- 當一個
socket
關閉的時候,會調用close
函數,對應的回調函數就是swReactor_close
- 該函數用於釋放
swConnection
內部申請的內存,並調用close
函數關閉連接
int swReactor_close(swReactor *reactor, int fd){ swConnection *socket = swReactor_get(reactor, fd); if (socket->out_buffer) { swBuffer_free(socket->out_buffer); } if (socket->in_buffer) { swBuffer_free(socket->in_buffer); } if (socket->websocket_buffer) { swString_free(socket->websocket_buffer); } bzero(socket, sizeof(swConnection)); socket->removed = 1; swTraceLog(SW_TRACE_CLOSE, "fd=%d.", fd); return close(fd);}
swReactor_get
用於從reactor
中根據文件描述符獲取對應swConnection
對象的場景,由於swoole
一般都會採用reactor
多線程模式,因此基本只會執行return &reactor->socket_list[fd];
這一句。socket_list
這個列表與connection_list
保持一致,是事先申請的大小為max_connection
的類型是swConnection
的數組socket_list
中的數據有一部分是已經建立連接的swConnection
的對象,有一部分僅僅是空的swConnection
,這個時候swConnection->fd
為 0
static sw_inline swConnection* swReactor_get(swReactor *reactor, int fd){ if (reactor->thread) { return &reactor->socket_list[fd]; } swConnection *socket = (swConnection*) swArray_alloc(reactor->socket_array, fd); if (socket == NULL) { return NULL; } if (!socket->active) { socket->fd = fd; } return socket;}
reactor
的數據寫入
- 如果想對一個
socket
寫入數據,並不能簡單的直接調用send
函數,因為這個函數可能被信號打斷(EINTR)、可能暫時不可用(EAGAIN)、可能只寫入了部分數據,也有可能寫入成功。因此,reactor
定義了一個函數專門處理寫數據這一邏輯 - 首先要利用
swReactor_get
取出對應的swConnection
對象 - 如果取出的對象
fd
是 0,說明這個fd
文件描述符事先並沒有在reactor
裡面進行監聽 - 如果這個
socket
的out_buffer
為空,那麼就先嘗試利用swConnection_send
函數調用send
函數,觀察是否可以直接把所有數據發送成功 - 如果返回
EINTR
,那麼說明被信號打斷了,重新發送即可 - 如果返回
EAGAIN
,那麼說明此時socket
暫時不可用,此時需要將fd
文件描述符的寫就緒狀態添加到reactor
中,然後將數據拷貝到out_buffer
中去 - 如果返回寫入的數據量小於
n
,說明只寫入了部分,此時需要把沒有寫入的部分拷貝到out_buffer
中去 - 如果
out_buffer
不為空,那麼說明此時socket
不可寫,那麼就要將數據拷貝到out_buffer
中去,等著reactor
監控到寫就緒之後,把out_buffer
發送出去。 - 如果此時
out_buffer
存儲空間不足,那麼就要swYield
讓進程休眠一段時間,等待fd
的寫就緒狀態
int swReactor_write(swReactor *reactor, int fd, void *buf, int n){ int ret; swConnection *socket = swReactor_get(reactor, fd); swBuffer *buffer = socket->out_buffer; if (socket->fd == 0) { socket->fd = fd; } if (socket->buffer_size == 0) { socket->buffer_size = SwooleG.socket_buffer_size; } if (socket->nonblock == 0) { swoole_fcntl_set_option(fd, 1, -1); socket->nonblock = 1; } if (n > socket->buffer_size) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "data is too large, cannot exceed buffer size."); return SW_ERR; } if (swBuffer_empty(buffer)) { if (socket->ssl_send) { goto do_buffer; } do_send: ret = swConnection_send(socket, buf, n, 0); if (ret > 0) { if (n == ret) { return ret; } else { buf += ret; n -= ret; goto do_buffer; } }#ifdef HAVE_KQUEUE else if (errno == EAGAIN || errno == ENOBUFS)#else else if (errno == EAGAIN)#endif { do_buffer: if (!socket->out_buffer) { buffer = swBuffer_new(sizeof(swEventData)); if (!buffer) { swWarn("create worker buffer failed."); return SW_ERR; } socket->out_buffer = buffer; } socket->events |= SW_EVENT_WRITE; if (socket->events & SW_EVENT_READ) { if (reactor->set(reactor, fd, socket->fdtype | socket->events) < 0) { swSysError("reactor->set(%d, SW_EVENT_WRITE) failed.", fd); } } else { if (reactor->add(reactor, fd, socket->fdtype | SW_EVENT_WRITE) < 0) { swSysError("reactor->add(%d, SW_EVENT_WRITE) failed.", fd); } } goto append_buffer; } else if (errno == EINTR) { goto do_send; } else { SwooleG.error = errno; return SW_ERR; } } else { append_buffer: if (buffer->length > socket->buffer_size) { if (socket->dontwait) { SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW; return SW_ERR; } else { swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "socket#%d output buffer overflow.", fd); swYield(); swSocket_wait(fd, SW_SOCKET_OVERFLOW_WAIT, SW_EVENT_WRITE); } } if (swBuffer_append(buffer, buf, n) < 0) { return SW_ERR; } } return SW_OK;}
推薦閱讀:
※網購不想上某寶了,還有什麼網站比較不錯的呢?
※HTML的Encode(轉碼)和Decode(解碼)
※掃地機器人有用嗎?
※8月22號 科技播報 科技大街