Swoole 源碼分析——ReactorThread(一)

Swoole 源碼分析——ReactorThread(一)

來自專欄 Swoole

前言

經過 php_swoole_server_before_start 調用 swReactorThread_create 創建了 serv->reactor_threads 對象後,swServer_start 調用 swReactorThread_start 創建了 reactor 多線程。線程在建立之時,就會調用 swReactorThread_loop 函數開啟 reactor 事件循環。

swServer_master_onAccept 接受連接請求

  • swServer_start_proxy 設置了 main_reactor 監聽 socket 的事件回調函數,在 main_reactor 調用 wait 後,如果 listen_list 中有 TCPconnect 請求,reactor 就會調用 swServer_master_onAccept 函數
  • accept4accept 兩個函數唯一的區別在於最後的參數,accept4 可以將返回的 socket 設置為相應的文件屬性
  • 如果返回的文件描述符異常
    • 如果錯誤是 EAGAIN,說明此時沒有連接等待接受,那麼可以返回成功,繼續事件循環
    • 如果錯誤是 EINTR,說明 accept 被信號打斷,繼續調用 accept 即可
    • 如果錯誤是 EMFILE 或者 ENFILE,那麼當前文件描述符已經達到最大,此時應該停止接受連接請求
  • 設置 connect_notify 為 1,告知 reactor 線程需要通知 worker 接受新的連接
  • 根據 new_fd 分配其該處理的 reactor 線程,並向該 reactor 線程添加該文件描述符的監控,但是值得注意的是,這時只會監聽寫事件,用於向客戶端說明已接收 accept 請求,並不會監聽讀事件
  • swServer_connection_new 函數用於更新 serv->connection_list[new_fd] 的屬性

int swServer_master_onAccept(swReactor *reactor, swEvent *event){ swServer *serv = reactor->ptr; swReactor *sub_reactor; swSocketAddress client_addr; socklen_t client_addrlen = sizeof(client_addr); swListenPort *listen_host = serv->connection_list[event->fd].object; int new_fd = 0, reactor_id = 0, i; //SW_ACCEPT_AGAIN for (i = 0; i < SW_ACCEPT_MAX_COUNT; i++) {#ifdef HAVE_ACCEPT4 new_fd = accept4(event->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);#else new_fd = accept(event->fd, (struct sockaddr *) &client_addr, &client_addrlen);#endif if (new_fd < 0) { switch (errno) { case EAGAIN: return SW_OK; case EINTR: continue; default: if (errno == EMFILE || errno == ENFILE) { swServer_disable_accept(reactor); reactor->disable_accept = 1; } swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "accept() failed. Error: %s[%d]", strerror(errno), errno); return SW_OK; } }#ifndef HAVE_ACCEPT4 else { swoole_fcntl_set_option(new_fd, 1, 1); }#endif swTrace("[Master] Accept new connection. maxfd=%d|reactor_id=%d|conn=%d", swServer_get_maxfd(serv), reactor->id, new_fd); //too many connection if (new_fd >= serv->max_connection) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_TOO_MANY_SOCKET, "Too many connections [now: %d].", new_fd); close(new_fd); return SW_OK; } if (serv->factory_mode == SW_MODE_SINGLE) { reactor_id = 0; } else { reactor_id = new_fd % serv->reactor_num; } //add to connection_list swConnection *conn = swServer_connection_new(serv, listen_host, new_fd, event->fd, reactor_id); memcpy(&conn->info.addr, &client_addr, sizeof(client_addr)); sub_reactor = &serv->reactor_threads[reactor_id].reactor; conn->socket_type = listen_host->type;#ifdef SW_USE_OPENSSL if (listen_host->ssl) { if (swSSL_create(conn, listen_host->ssl_context, 0) < 0) { bzero(conn, sizeof(swConnection)); close(new_fd); return SW_OK; } } else { conn->ssl = NULL; }#endif /* * [!!!] new_connection function must before reactor->add */ conn->connect_notify = 1; if (sub_reactor->add(sub_reactor, new_fd, SW_FD_TCP | SW_EVENT_WRITE) < 0) { bzero(conn, sizeof(swConnection)); close(new_fd); return SW_OK; }#ifdef SW_ACCEPT_AGAIN continue;#else break;#endif } return SW_OK;}

swServer_connection_new 創建新的連接對象

  • ls 是負責監聽連接的 swListenPort 對象,fd 是已建立連接的文件描述符,from_fd 是負責監聽連接的文件描述符,reactor_id 是分配給已連接的文件描述符的 reactor
  • 如果 ls 設置了 open_tcp_nodelay,那麼就要設置 fdTCP_NODELAY;如果設置了接受、發送緩衝區大小,就要設置 SO_RCVBUFSO_SNDBUF
  • 設置 swConnectionfdfrom_idfrom_fdconnect_timelast_time 等等參數
  • 設置連接的 session_id

static swConnection* swServer_connection_new(swServer *serv, swListenPort *ls, int fd, int from_fd, int reactor_id){ swConnection* connection = NULL; serv->stats->accept_count++; sw_atomic_fetch_add(&serv->stats->connection_num, 1); sw_atomic_fetch_add(&ls->connection_num, 1); if (fd > swServer_get_maxfd(serv)) { swServer_set_maxfd(serv, fd); } connection = &(serv->connection_list[fd]); bzero(connection, sizeof(swConnection)); //TCP Nodelay if (ls->open_tcp_nodelay) { int sockopt = 1; if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &sockopt, sizeof(sockopt)) < 0) { swSysError("setsockopt(TCP_NODELAY) failed."); } connection->tcp_nodelay = 1; } //socket recv buffer size if (ls->kernel_socket_recv_buffer_size > 0) { if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ls->kernel_socket_recv_buffer_size, sizeof(int))) { swSysError("setsockopt(SO_RCVBUF, %d) failed.", ls->kernel_socket_recv_buffer_size); } } //socket send buffer size if (ls->kernel_socket_send_buffer_size > 0) { if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ls->kernel_socket_send_buffer_size, sizeof(int)) < 0) { swSysError("setsockopt(SO_SNDBUF, %d) failed.", ls->kernel_socket_send_buffer_size); } } connection->fd = fd; connection->from_id = serv->factory_mode == SW_MODE_SINGLE ? SwooleWG.id : reactor_id; connection->from_fd = (sw_atomic_t) from_fd; connection->connect_time = serv->gs->now; connection->last_time = serv->gs->now; connection->active = 1; connection->buffer_size = ls->socket_buffer_size;#ifdef SW_REACTOR_SYNC_SEND if (serv->factory_mode != SW_MODE_THREAD && !ls->ssl) { connection->direct_send = 1; }#endif#ifdef SW_REACTOR_USE_SESSION swSession *session; sw_spinlock(&serv->gs->spinlock); int i; uint32_t session_id = serv->gs->session_round; //get session id for (i = 0; i < serv->max_connection; i++) { session_id++; //SwooleGS->session_round just has 24 bits size; if (unlikely(session_id == 1 << 24)) { session_id = 1; } session = swServer_get_session(serv, session_id); //vacancy if (session->fd == 0) { session->fd = fd; session->id = session_id; session->reactor_id = connection->from_id; break; } } serv->gs->session_round = session_id; sw_spinlock_release(&serv->gs->spinlock); connection->session_id = session_id;#endif return connection;}

swReactorThread_loop 事件循環

  • reactor 多線程在建立之時,就會調用 swReactorThread_loop 函數開啟 reactor 事件循環。
  • 從參數中獲取當前 reactor 線程的 id
  • 設置線程特有數據 SwooleTGfactory_lock_targetfactory_target_worker 用於後面向 worker 進程傳輸數據時,一次只能傳遞一部分,下次傳輸數據時需要鎖定對應的 worker 進程。
  • swServer_get_thread 用於利用 reactor_id 獲取對應的 swReactorThread 對象
  • 如果設置了 CPU_AFFINITY 選項(將 swoolereactor 線程與對應的 worker 進程綁定到固定的一個核上。可以避免進程/線程的運行時在多個核之間互相切換,提高 CPU Cache 的命中率),這時要通過 reactor_id 將當前線程綁定到對應的 CPU 核中(worker 進程以相同方式綁定,這樣就實現了 reactor 線程與對應的 worker 進程綁定到固定的一個核上)。
  • 如果開啟了 cpu_affinity_ignore 設置(接受一個數組作為參數,例如 array(0, 1) 表示不使用 CPU0, CPU1,專門空出來處理網路中斷。如果當前系統內核與網卡有多隊列特性,網路中斷會分布到多核,可以緩解網路中斷的壓力,這個時候不需要設置該選項),那麼就要從 serv->cpu_affinity_available 數組中挑選 CPU 進行綁定
  • swReactor_create 創造本線程的 reactor 對象,並且設置 SW_FD_PIPE 的讀寫事件回調函數:swReactorThread_onPipeReceiveswReactorThread_onPipeWrite,用於與 worker 進程進行通信
  • 如果 server 中存在 UDP 監聽埠,而且該監聽的 socketreactor_id 相對應,那麼向 reactor 對象添加文件描述符進行監聽
  • swReactorThread_set_protocol 用於設置 TCPUDP 的讀寫回調函數: swReactorThread_onPackageswReactorThread_onWriteswReactorThread_onRead 用來接收客戶端傳輸的信息,並且設置監聽 socketonRead 函數、onPackage 函數
  • 構造 pipe_read_list 存儲 pipe
  • 遍歷 serv->workers,找出與當前 reactor 相對應的的 worker,添加 pipe_master 文件描述符到 reactor 進行監控,設置其 serv->connection_list[pipe_master]in_bufferfrom_idobject,當前線程的 notify_pipepipe_read_list
  • 如果開啟了時間輪演算法,就要創建 reactor->timewheel 對象,計算 reactor->heartbeat_interval,替代原有的 onFinishonTimeout 回調函數。

static int swReactorThread_loop(swThreadParam *param){ swServer *serv = SwooleG.serv; int ret; int reactor_id = param->pti; pthread_t thread_id = pthread_self(); SwooleTG.factory_lock_target = 0; SwooleTG.factory_target_worker = -1; SwooleTG.id = reactor_id; SwooleTG.type = SW_THREAD_REACTOR; SwooleTG.buffer_stack = swString_new(8192); if (SwooleTG.buffer_stack == NULL) { return SW_ERR; } swReactorThread *thread = swServer_get_thread(serv, reactor_id); swReactor *reactor = &thread->reactor; SwooleTG.reactor = reactor;#ifdef HAVE_CPU_AFFINITY //cpu affinity setting if (serv->open_cpu_affinity) { cpu_set_t cpu_set; CPU_ZERO(&cpu_set); if (serv->cpu_affinity_available_num) { CPU_SET(serv->cpu_affinity_available[reactor_id % serv->cpu_affinity_available_num], &cpu_set); } else { CPU_SET(reactor_id % SW_CPU_NUM, &cpu_set); } if (0 != pthread_setaffinity_np(thread_id, sizeof(cpu_set), &cpu_set)) { swSysError("pthread_setaffinity_np() failed."); } }#endif ret = swReactor_create(reactor, SW_REACTOR_MAXEVENTS); if (ret < 0) { return SW_ERR; } swSignal_none(); reactor->ptr = serv; reactor->id = reactor_id; reactor->thread = 1; reactor->socket_list = serv->connection_list; reactor->max_socket = serv->max_connection; reactor->onFinish = NULL; reactor->onTimeout = NULL; reactor->close = swReactorThread_close; reactor->setHandle(reactor, SW_FD_CLOSE, swReactorThread_onClose); reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_READ, swReactorThread_onPipeReceive); reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_WRITE, swReactorThread_onPipeWrite); //listen UDP if (serv->have_udp_sock == 1) { swListenPort *ls; LL_FOREACH(serv->listen_list, ls) { if (ls->type == SW_SOCK_UDP || ls->type == SW_SOCK_UDP6 || ls->type == SW_SOCK_UNIX_DGRAM) { if (ls->sock % serv->reactor_num != reactor_id) { continue; } if (ls->type == SW_SOCK_UDP) { serv->connection_list[ls->sock].info.addr.inet_v4.sin_port = htons(ls->port); } else { serv->connection_list[ls->sock].info.addr.inet_v6.sin6_port = htons(ls->port); } serv->connection_list[ls->sock].fd = ls->sock; serv->connection_list[ls->sock].socket_type = ls->type; serv->connection_list[ls->sock].object = ls; ls->thread_id = thread_id; reactor->add(reactor, ls->sock, SW_FD_UDP); } } } //set protocol function point swReactorThread_set_protocol(serv, reactor); int i = 0, pipe_fd;#ifdef SW_USE_RINGBUFFER int j = 0;#endif if (serv->factory_mode == SW_MODE_PROCESS) {#ifdef SW_USE_RINGBUFFER thread->pipe_read_list = sw_calloc(serv->reactor_pipe_num, sizeof(int)); if (thread->pipe_read_list == NULL) { swSysError("thread->buffer_pipe create failed"); return SW_ERR; }#endif for (i = 0; i < serv->worker_num; i++) { if (i % serv->reactor_num == reactor_id) { pipe_fd = serv->workers[i].pipe_master; //for request swBuffer *buffer = swBuffer_new(sizeof(swEventData)); if (!buffer) { swWarn("create buffer failed."); break; } serv->connection_list[pipe_fd].in_buffer = buffer; //for response swSetNonBlock(pipe_fd); reactor->add(reactor, pipe_fd, SW_FD_PIPE); if (thread->notify_pipe == 0) { thread->notify_pipe = serv->workers[i].pipe_worker; } /** * mapping reactor_id and worker pipe */ serv->connection_list[pipe_fd].from_id = reactor_id; serv->connection_list[pipe_fd].fd = pipe_fd; serv->connection_list[pipe_fd].object = sw_malloc(sizeof(swLock)); /** * create pipe lock */ if (serv->connection_list[pipe_fd].object == NULL) { swWarn("create pipe mutex lock failed."); break; } swMutex_create(serv->connection_list[pipe_fd].object, 0);#ifdef SW_USE_RINGBUFFER thread->pipe_read_list[j] = pipe_fd; j++;#endif } } }#ifdef SW_USE_TIMEWHEEL if (serv->heartbeat_idle_time > 0) { if (serv->heartbeat_idle_time < SW_TIMEWHEEL_SIZE) { reactor->timewheel = swTimeWheel_new(serv->heartbeat_idle_time); reactor->heartbeat_interval = 1; } else { reactor->timewheel = swTimeWheel_new(SW_TIMEWHEEL_SIZE); reactor->heartbeat_interval = serv->heartbeat_idle_time / SW_TIMEWHEEL_SIZE; } reactor->last_heartbeat_time = 0; if (reactor->timewheel == NULL) { swSysError("thread->timewheel create failed."); return SW_ERR; } reactor->timeout_msec = reactor->heartbeat_interval * 1000; reactor->onFinish = swReactorThread_onReactorCompleted; reactor->onTimeout = swReactorThread_onReactorCompleted; }#endif //wait other thread#ifdef HAVE_PTHREAD_BARRIER pthread_barrier_wait(&serv->barrier);#else SW_START_SLEEP;#endif //main loop reactor->wait(reactor, NULL); //shutdown reactor->free(reactor);#ifdef SW_USE_TIMEWHEEL if (reactor->timewheel) { swTimeWheel_free(reactor->timewheel); }#endif swString_free(SwooleTG.buffer_stack); pthread_exit(0); return SW_OK;}void swReactorThread_set_protocol(swServer *serv, swReactor *reactor){ //UDP Packet reactor->setHandle(reactor, SW_FD_UDP, swReactorThread_onPackage); //Write reactor->setHandle(reactor, SW_FD_TCP | SW_EVENT_WRITE, swReactorThread_onWrite); //Read reactor->setHandle(reactor, SW_FD_TCP | SW_EVENT_READ, swReactorThread_onRead); swListenPort *ls; //listen the all tcp port LL_FOREACH(serv->listen_list, ls) { if (swSocket_is_dgram(ls->type)) { continue; } swPort_set_protocol(ls); }}

swReactorThread_onWrite 寫事件回調

  • master 線程的 main_reactor 接受到新的請求後,就會設置相應的 swConnection.connect_notify 為 1,這個時候 reactor 線程的任務並不是向客戶端發送數據,而是向 worker 進程發送 SW_EVENT_CONNECT 事件
    • 如果使用時間輪演算法,那麼就需要調用 swTimeWheel_add 將該 swConnection 對象添加到時間輪的監控中
    • 如果存在 onConnect 回調函數,就要調用 swServer_tcp_notify 函數向 worker 進程發送事件
    • 如果 out_buffer 緩衝區有數據,就將其數據發送給客戶端
    • 如果啟用了 enable_delay_receive 選項,那麼就要把當前連接 socketreactor 中刪除,等待服務端調用 $serv->confirm($fd) 對連接進行確認;否則就要一併開啟 socket 的可讀事件,讀取客戶端發來的數據。
  • 如果心跳檢測或者時間輪演算法檢測到死連接,那麼就會重置 close_notify 為 1,這個時候就要通知 worker 進行關閉事件
  • out_buffer 不為空,說明此時服務端有數據需要發給客戶端,數據會被存儲在 swBuffer 這個鏈表數據結構中,每個鏈表元素是一個數據包,此時需要檢驗數據類型是 SW_CHUNK_CLOSESW_CHUNK_SENDFILE 還是其他普通數據。
  • swConnection_buffer_send 用於發送普通數據,這個函數會嘗試向 socket 發送一次數據,可能出現的情況有:
    • 全部發送成功:繼續循環,發送下一個 buffer
    • 發送部分數據:繼續循環,發送這一個 buffer 的剩餘元素
    • send_wait 為 1:跳出循環,等待下一次可寫就緒
    • 發生異常:繼續循環,重新發送
    • close_wait 為 1:連接已關閉,關閉這個 socket 文件描述符的監控
  • 如果發送了部分數據,重置 overflow 為 0
  • 如果 high_watermark 為 1,說明此前 out_buffer 數據已達到高水位線,此時重新比較 out_buffer 數據大小,如果低於 buffer_low_watermark,就要通知 worker 進程調用 onBufferEmpty 回調函數。
  • 如果 out_buffer 為空,那麼重新設置 socket 文件描述符的 reactor 監聽事件,刪除寫就緒,只設置讀就緒。這個是水平觸發模式的必要步驟,避免無數據寫入時,頻繁地調用寫就緒回調函數。

static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev){ int ret; swServer *serv = SwooleG.serv; swBuffer_trunk *chunk; int fd = ev->fd; if (serv->factory_mode == SW_MODE_PROCESS) { assert(fd % serv->reactor_num == reactor->id); assert(fd % serv->reactor_num == SwooleTG.id); } swConnection *conn = swServer_connection_get(serv, fd); if (conn == NULL || conn->active == 0) { return SW_ERR; } swTraceLog(SW_TRACE_REACTOR, "fd=%d, conn->connect_notify=%d, conn->close_notify=%d, serv->disable_notify=%d, conn->close_force=%d", fd, conn->connect_notify, conn->close_notify, serv->disable_notify, conn->close_force); if (conn->connect_notify) { conn->connect_notify = 0;#ifdef SW_USE_TIMEWHEEL if (reactor->timewheel) { swTimeWheel_add(reactor->timewheel, conn); }#endif#ifdef SW_USE_OPENSSL if (conn->ssl) { goto listen_read_event; }#endif //notify worker process if (serv->onConnect) { swServer_tcp_notify(serv, conn, SW_EVENT_CONNECT); if (!swBuffer_empty(conn->out_buffer)) { goto _pop_chunk; } } //delay receive, wait resume command. if (serv->enable_delay_receive) { conn->listen_wait = 1; return reactor->del(reactor, fd); } else {#ifdef SW_USE_OPENSSL listen_read_event:#endif return reactor->set(reactor, fd, SW_EVENT_TCP | SW_EVENT_READ); } } else if (conn->close_notify) {#ifdef SW_USE_OPENSSL if (conn->ssl && conn->ssl_state != SW_SSL_STATE_READY) { return swReactorThread_close(reactor, fd); }#endif swServer_tcp_notify(serv, conn, SW_EVENT_CLOSE); conn->close_notify = 0; return SW_OK; } else if (serv->disable_notify && conn->close_force) { return swReactorThread_close(reactor, fd); } _pop_chunk: while (!swBuffer_empty(conn->out_buffer)) { chunk = swBuffer_get_trunk(conn->out_buffer); if (chunk->type == SW_CHUNK_CLOSE) { close_fd: reactor->close(reactor, fd); return SW_OK; } else if (chunk->type == SW_CHUNK_SENDFILE) { ret = swConnection_onSendfile(conn, chunk); } else { ret = swConnection_buffer_send(conn); } if (ret < 0) { if (conn->close_wait) { goto close_fd; } else if (conn->send_wait) { break; } } } if (conn->overflow && conn->out_buffer->length < conn->buffer_size) { conn->overflow = 0; } if (serv->onBufferEmpty && conn->high_watermark) { swListenPort *port = swServer_get_port(serv, fd); if (conn->out_buffer->length <= port->buffer_low_watermark) { conn->high_watermark = 0; swServer_tcp_notify(serv, conn, SW_EVENT_BUFFER_EMPTY); } } //remove EPOLLOUT event if (!conn->removed && swBuffer_empty(conn->out_buffer)) { reactor->set(reactor, fd, SW_FD_TCP | SW_EVENT_READ); } return SW_OK;}

推薦閱讀:

Swoole 4.1 正式發布,包含多項重大更新
讓swoole websocket支持socket.io連接
《就是要你懂swoole》-Server(一)

TAG:源代碼 | Swoole | 科技 |