Swoole 源碼分析——Server之Worker(一)
來自專欄 Swoole
swManager_loop
函數 manager
進程管理
manager
進程開啟的時候,首先要調用onManagerStart
回調- 添加信號處理函數
swSignal_add
,SIGTERM
用於結束server
,只需要running
設置為 0,manager
會逐個殺死worker
進程;SIGUSR1
用於重載所有的worker
進程;SIGUSR2
用於重載所有的task_worker
進程;SIGIO
用於重啟已經關閉了的worker
進程;SIGALRM
用於檢測所有的超時請求; - 如果設置了
serv->manager_alarm
,那麼就是開啟了超時請求的監控,此時需要設置alarm
信號,讓manager
進程定時去檢查是否有超時的請求。 - 如果
running
為 1,就不斷while
循環,殺死或者啟動相應的worker
進程,如果running
為 0,那麼就關閉所有的worker
進程,調用onManagerStop
函數退出程序。 - 調用
wait
函數,監控已結束的worker
進程 - 如果
wait
函數返回異常,很有可能是被信號打斷。此時需要先檢查ManagerProcess.read_message
,如果是 1,那麼說明wait
函數被SIGIO
信號打斷,該信號由worker
進程發送,用於告知manager
進程該worker
進程即將關閉。此時,需要manager
進程重新開啟worker
進程。 - 如果
ManagerProcess.alarm
為 1,那麼說明wait
函數由SIGALRM
信號打斷,此時需要檢查超時的請求。erv->hooks[SW_SERVER_HOOK_MANAGER_TIMER]
也就是php_swoole_trace_check
是檢查慢請求的函數。 - 如果
ManagerProcess.reload_all_worker
為 1,那麼wait
函數由SIGUSR1
打斷,此時應該重啟所有的worker
進程 - 如果
ManagerProcess.reload_task_worker
為 1,那麼wait
函數由SIGUSR2
打斷,此時應該重啟所有的task_worker
進程 - 如果
wait
返回值正常,那麼就要從serv->workers
、serv->gs->task_workers
、serv->user_worker
中尋找退出的worker
進程。如果該進程是STOPPED
狀態,說明很有可能是調試狀態,此時不需要重啟,只需要調用tracer
函數
static void swManager_signal_handle(int sig){ switch (sig) { case SIGTERM: SwooleG.running = 0; break; /** * reload all workers */ case SIGUSR1: if (ManagerProcess.reloading == 0) { ManagerProcess.reloading = 1; ManagerProcess.reload_all_worker = 1; } break; /** * only reload task workers */ case SIGUSR2: if (ManagerProcess.reloading == 0) { ManagerProcess.reloading = 1; ManagerProcess.reload_task_worker = 1; } break; case SIGIO: ManagerProcess.read_message = 1; break; case SIGALRM: ManagerProcess.alarm = 1; break; default:#ifdef SIGRTMIN if (sig == SIGRTMIN) { swServer_reopen_log_file(SwooleG.serv); }#endif break; }}static int swManager_loop(swFactory *factory){ int pid, new_pid; int i; int reload_worker_i = 0; int reload_worker_num; int reload_init = 0; pid_t reload_worker_pid = 0; int status; SwooleG.use_signalfd = 0; SwooleG.use_timerfd = 0; memset(&ManagerProcess, 0, sizeof(ManagerProcess)); swServer *serv = factory->ptr; swWorker *reload_workers; if (serv->hooks[SW_SERVER_HOOK_MANAGER_START]) { swServer_call_hook(serv, SW_SERVER_HOOK_MANAGER_START, serv); } if (serv->onManagerStart) { serv->onManagerStart(serv); } reload_worker_num = serv->worker_num + serv->task_worker_num; reload_workers = sw_calloc(reload_worker_num, sizeof(swWorker)); if (reload_workers == NULL) { swError("malloc[reload_workers] failed"); return SW_ERR; } //for reload swSignal_add(SIGHUP, NULL); swSignal_add(SIGTERM, swManager_signal_handle); swSignal_add(SIGUSR1, swManager_signal_handle); swSignal_add(SIGUSR2, swManager_signal_handle); swSignal_add(SIGIO, swManager_signal_handle);#ifdef SIGRTMIN swSignal_add(SIGRTMIN, swManager_signal_handle);#endif //swSignal_add(SIGINT, swManager_signal_handle); if (serv->manager_alarm > 0) { alarm(serv->manager_alarm); swSignal_add(SIGALRM, swManager_signal_handle); } SwooleG.main_reactor = NULL; while (SwooleG.running > 0) { _wait: pid = wait(&status); if (ManagerProcess.read_message) { swWorkerStopMessage msg; while (swChannel_pop(serv->message_box, &msg, sizeof(msg)) > 0) { if (SwooleG.running == 0) { continue; } pid_t new_pid = swManager_spawn_worker(factory, msg.worker_id); if (new_pid > 0) { serv->workers[msg.worker_id].pid = new_pid; } } ManagerProcess.read_message = 0; } if (pid < 0) { if (ManagerProcess.alarm == 1) { ManagerProcess.alarm = 0; alarm(serv->manager_alarm); if (serv->hooks[SW_SERVER_HOOK_MANAGER_TIMER]) { swServer_call_hook(serv, SW_SERVER_HOOK_MANAGER_TIMER, serv); } } if (ManagerProcess.reloading == 0) { error: if (errno != EINTR) { swSysError("wait() failed."); } continue; } //reload task & event workers else if (ManagerProcess.reload_all_worker == 1) { swNotice("Server is reloading now."); if (reload_init == 0) { reload_init = 1; memcpy(reload_workers, serv->workers, sizeof(swWorker) * serv->worker_num); reload_worker_num = serv->worker_num; if (serv->task_worker_num > 0) { memcpy(reload_workers + serv->worker_num, serv->gs->task_workers.workers, sizeof(swWorker) * serv->task_worker_num); reload_worker_num += serv->task_worker_num; } ManagerProcess.reload_all_worker = 0; if (serv->reload_async) { for (i = 0; i < serv->worker_num; i++) { if (kill(reload_workers[i].pid, SIGTERM) < 0) { swSysError("kill(%d, SIGTERM) [%d] failed.", reload_workers[i].pid, i); } } reload_worker_i = serv->worker_num; } else { reload_worker_i = 0; } } goto kill_worker; } //only reload task workers else if (ManagerProcess.reload_task_worker == 1) { if (serv->task_worker_num == 0) { swWarn("cannot reload task workers, task workers is not started."); continue; } swNotice("Server is reloading now."); if (reload_init == 0) { memcpy(reload_workers, serv->gs->task_workers.workers, sizeof(swWorker) * serv->task_worker_num); reload_worker_num = serv->task_worker_num; reload_worker_i = 0; reload_init = 1; ManagerProcess.reload_task_worker = 0; } goto kill_worker; } else { goto error; } } if (SwooleG.running == 1) { //event workers for (i = 0; i < serv->worker_num; i++) { //compare PID if (pid != serv->workers[i].pid) { continue; } if (WIFSTOPPED(status) && serv->workers[i].tracer) { serv->workers[i].tracer(&serv->workers[i]); serv->workers[i].tracer = NULL; goto _wait; } //Check the process return code and signal swManager_check_exit_status(serv, i, pid, status); while (1) { new_pid = swManager_spawn_worker(factory, i); if (new_pid < 0) { usleep(100000); continue; } else { serv->workers[i].pid = new_pid; break; } } } swWorker *exit_worker; //task worker if (serv->gs->task_workers.map) { exit_worker = swHashMap_find_int(serv->gs->task_workers.map, pid); if (exit_worker != NULL) { if (WIFSTOPPED(status) && exit_worker->tracer) { exit_worker->tracer(exit_worker); exit_worker->tracer = NULL; goto _wait; } swManager_check_exit_status(serv, exit_worker->id, pid, status); swProcessPool_spawn(&serv->gs->task_workers, exit_worker); } } //user process if (serv->user_worker_map != NULL) { swManager_wait_user_worker(&serv->gs->event_workers, pid, status); } if (pid == reload_worker_pid) { reload_worker_i++; } } //reload worker kill_worker: if (ManagerProcess.reloading == 1) { //reload finish if (reload_worker_i >= reload_worker_num) { reload_worker_pid = reload_worker_i = reload_init = ManagerProcess.reloading = 0; continue; } reload_worker_pid = reload_workers[reload_worker_i].pid; if (kill(reload_worker_pid, SIGTERM) < 0) { if (errno == ECHILD) { reload_worker_i++; goto kill_worker; } swSysError("kill(%d, SIGTERM) [%d] failed.", reload_workers[reload_worker_i].pid, reload_worker_i); } } } sw_free(reload_workers); swSignal_none(); //kill all child process for (i = 0; i < serv->worker_num; i++) { swTrace("[Manager]kill worker processor"); kill(serv->workers[i].pid, SIGTERM); } //kill and wait task process if (serv->task_worker_num > 0) { swProcessPool_shutdown(&serv->gs->task_workers); } //wait child process for (i = 0; i < serv->worker_num; i++) { if (swWaitpid(serv->workers[i].pid, &status, 0) < 0) { swSysError("waitpid(%d) failed.", serv->workers[i].pid); } } //kill all user process if (serv->user_worker_map) { swManager_kill_user_worker(serv); } if (serv->onManagerStop) { serv->onManagerStop(serv); } return SW_OK;}void php_swoole_trace_check(void *arg){ swServer *serv = (swServer *) arg; uint8_t timeout = serv->request_slowlog_timeout; int count = serv->worker_num + serv->task_worker_num; int i = serv->trace_event_worker ? 0 : serv->worker_num; swWorker *worker; for (; i < count; i++) { worker = swServer_get_worker(serv, i); swTraceLog(SW_TRACE_SERVER, "trace request, worker#%d, pid=%d. request_time=%d.", i, worker->pid, worker->request_time); if (!(worker->request_time > 0 && worker->traced == 0 && serv->gs->now - worker->request_time >= timeout)) { continue; } if (ptrace(PTRACE_ATTACH, worker->pid, 0, 0) < 0) { swSysError("failed to ptrace(ATTACH, %d) worker#%d,", worker->pid, worker->id); continue; } worker->tracer = trace_request; worker->traced = 1; }}static void swManager_check_exit_status(swServer *serv, int worker_id, pid_t pid, int status){ if (status != 0) { swWarn("worker#%d abnormal exit, status=%d, signal=%d", worker_id, WEXITSTATUS(status), WTERMSIG(status)); if (serv->onWorkerError != NULL) { serv->onWorkerError(serv, worker_id, pid, WEXITSTATUS(status), WTERMSIG(status)); } }}
swWorker_loop
函數 worker
事件循環
worker
進程的事件循環和reactor
線程類似,都是創建reactor
對象,然後調用SwooleG.main_reactor->wait
函數進行事件循環,不同的是worker
進程監控的是pipe_worker
這個socket
。- 如果
worker
的dispatch_mode
是stream
,reactor
還要監聽serv->stream_fd
,以便可以更加高效的消費reactor
線程發送的數據 swServer_worker_init
函數用於初始化worker
進程,swWorker_onStart
用於調用回調函數,swWorker_onStop
用於停止worker
進程
int swWorker_loop(swFactory *factory, int worker_id){ swServer *serv = factory->ptr;#ifndef SW_WORKER_USE_SIGNALFD SwooleG.use_signalfd = 0;#elif defined(HAVE_SIGNALFD) SwooleG.use_signalfd = 1;#endif //timerfd#ifdef HAVE_TIMERFD SwooleG.use_timerfd = 1;#endif //worker_id SwooleWG.id = worker_id; SwooleG.pid = getpid(); swWorker *worker = swServer_get_worker(serv, worker_id); swServer_worker_init(serv, worker); SwooleG.main_reactor = sw_malloc(sizeof(swReactor)); if (SwooleG.main_reactor == NULL) { swError("[Worker] malloc for reactor failed."); return SW_ERR; } if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0) { swError("[Worker] create worker_reactor failed."); return SW_ERR; } worker->status = SW_WORKER_IDLE; int pipe_worker = worker->pipe_worker; swSetNonBlock(pipe_worker); SwooleG.main_reactor->ptr = serv; SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_WRITE, swReactor_onWrite); /** * set pipe buffer size */ int i; swConnection *pipe_socket; for (i = 0; i < serv->worker_num + serv->task_worker_num; i++) { worker = swServer_get_worker(serv, i); pipe_socket = swReactor_get(SwooleG.main_reactor, worker->pipe_master); pipe_socket->buffer_size = SW_MAX_INT; pipe_socket = swReactor_get(SwooleG.main_reactor, worker->pipe_worker); pipe_socket->buffer_size = SW_MAX_INT; } if (serv->dispatch_mode == SW_DISPATCH_STREAM) { SwooleG.main_reactor->add(SwooleG.main_reactor, serv->stream_fd, SW_FD_LISTEN | SW_EVENT_READ); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_LISTEN, swWorker_onStreamAccept); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_STREAM, swWorker_onStreamRead); swStream_set_protocol(&serv->stream_protocol); serv->stream_protocol.package_max_length = SW_MAX_INT; serv->stream_protocol.onPackage = swWorker_onStreamPackage; serv->buffer_pool = swLinkedList_new(0, NULL); } swWorker_onStart(serv);#ifdef HAVE_SIGNALFD if (SwooleG.use_signalfd) { swSignalfd_setup(SwooleG.main_reactor); }#endif //main loop SwooleG.main_reactor->wait(SwooleG.main_reactor, NULL); //clear pipe buffer swWorker_clean(); //worker shutdown swWorker_onStop(serv); return SW_OK;}
swServer_worker_init
初始化函數
- 與
reactor
線程一樣,首先如果設置了CPU
親和度的話,要將worker
進程綁定到特定的CPU
上,指定CPU
的方法仍然是SwooleWG.id % serv->cpu_affinity_available_num
,這樣可以保證對應的reactor
線程和worker
進程在同一個CPU
核上 swWorker_signal_init
用於設置worker
進程的信號處理函數:SIGTERM
信號用於關閉當前worker
進程;SIGALRM
代表定時任務。buffer_input
用於存儲來源於reactor
線程發送的數據,是一個serv->reactor_num + serv->dgram_port_num
大小的數組。
void swWorker_signal_init(void){ swSignal_clear(); /** * use user settings */ SwooleG.use_signalfd = SwooleG.enable_signalfd; swSignal_add(SIGHUP, NULL); swSignal_add(SIGPIPE, NULL); swSignal_add(SIGUSR1, NULL); swSignal_add(SIGUSR2, NULL); swSignal_add(SIGTERM, swWorker_signal_handler); swSignal_add(SIGALRM, swSystemTimer_signal_handler); //for test swSignal_add(SIGVTALRM, swWorker_signal_handler);#ifdef SIGRTMIN swSignal_add(SIGRTMIN, swWorker_signal_handler);#endif}int swServer_worker_init(swServer *serv, swWorker *worker){#ifdef HAVE_CPU_AFFINITY if (serv->open_cpu_affinity) { cpu_set_t cpu_set; CPU_ZERO(&cpu_set); if (serv->cpu_affinity_available_num) { CPU_SET(serv->cpu_affinity_available[SwooleWG.id % serv->cpu_affinity_available_num], &cpu_set); } else { CPU_SET(SwooleWG.id % SW_CPU_NUM, &cpu_set); }#ifdef __FreeBSD__ if (cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1, sizeof(cpu_set), &cpu_set) < 0)#else if (sched_setaffinity(getpid(), sizeof(cpu_set), &cpu_set) < 0)#endif { swSysError("sched_setaffinity() failed."); } }#endif //signal init swWorker_signal_init(); SwooleWG.buffer_input = swServer_create_worker_buffer(serv); if (!SwooleWG.buffer_input) { return SW_ERR; } if (serv->max_request < 1) { SwooleWG.run_always = 1; } else { SwooleWG.max_request = serv->max_request; if (SwooleWG.max_request > 10) { int n = swoole_system_random(1, SwooleWG.max_request / 2); if (n > 0) { SwooleWG.max_request += n; } } } worker->start_time = serv->gs->now; worker->request_time = 0; worker->request_count = 0; return SW_OK;}swString** swServer_create_worker_buffer(swServer *serv){ int i; int buffer_num; if (serv->factory_mode == SW_MODE_SINGLE) { buffer_num = 1; } else { buffer_num = serv->reactor_num + serv->dgram_port_num; } swString **buffers = sw_malloc(sizeof(swString*) * buffer_num); if (buffers == NULL) { swError("malloc for worker buffer_input failed."); return NULL; } for (i = 0; i < buffer_num; i++) { buffers[i] = swString_new(SW_BUFFER_SIZE_BIG); if (buffers[i] == NULL) { swError("worker buffer_input init failed."); return NULL; } } return buffers;}
推薦閱讀:
※為什麼世界互聯網大會在烏鎮開?
※國產手機信號最好的是哪個品牌?
※HTTPS的優點與缺點 網站優化裡面HTTPS有著什麼用處