標籤:

Swoole 源碼分析——Server之Worker(一)

Swoole 源碼分析——Server之Worker(一)

來自專欄 Swoole

swManager_loop 函數 manager 進程管理

  • manager 進程開啟的時候,首先要調用 onManagerStart 回調
  • 添加信號處理函數 swSignal_addSIGTERM 用於結束 server,只需要 running 設置為 0,manager 會逐個殺死 worker 進程;SIGUSR1 用於重載所有的 worker 進程;SIGUSR2 用於重載所有的 task_worker 進程;SIGIO 用於重啟已經關閉了的 worker 進程;SIGALRM 用於檢測所有的超時請求;
  • 如果設置了 serv->manager_alarm,那麼就是開啟了超時請求的監控,此時需要設置 alarm 信號,讓 manager 進程定時去檢查是否有超時的請求。
  • 如果 running 為 1,就不斷 while 循環,殺死或者啟動相應的 worker 進程,如果 running 為 0,那麼就關閉所有的 worker 進程,調用 onManagerStop 函數退出程序。
  • 調用 wait 函數,監控已結束的 worker 進程
    • 如果 wait 函數返回異常,很有可能是被信號打斷。此時需要先檢查 ManagerProcess.read_message,如果是 1,那麼說明 wait 函數被 SIGIO 信號打斷,該信號由 worker 進程發送,用於告知 manager 進程該 worker 進程即將關閉。此時,需要 manager 進程重新開啟 worker 進程。
    • 如果 ManagerProcess.alarm 為 1,那麼說明 wait 函數由 SIGALRM 信號打斷,此時需要檢查超時的請求。erv->hooks[SW_SERVER_HOOK_MANAGER_TIMER] 也就是 php_swoole_trace_check 是檢查慢請求的函數。
    • 如果 ManagerProcess.reload_all_worker 為 1,那麼 wait 函數由 SIGUSR1 打斷,此時應該重啟所有的 worker 進程
    • 如果 ManagerProcess.reload_task_worker 為 1,那麼 wait 函數由 SIGUSR2 打斷,此時應該重啟所有的 task_worker 進程
    • 如果 wait 返回值正常,那麼就要從 serv->workersserv->gs->task_workersserv->user_worker 中尋找退出的 worker 進程。如果該進程是 STOPPED 狀態,說明很有可能是調試狀態,此時不需要重啟,只需要調用 tracer 函數

static void swManager_signal_handle(int sig){ switch (sig) { case SIGTERM: SwooleG.running = 0; break; /** * reload all workers */ case SIGUSR1: if (ManagerProcess.reloading == 0) { ManagerProcess.reloading = 1; ManagerProcess.reload_all_worker = 1; } break; /** * only reload task workers */ case SIGUSR2: if (ManagerProcess.reloading == 0) { ManagerProcess.reloading = 1; ManagerProcess.reload_task_worker = 1; } break; case SIGIO: ManagerProcess.read_message = 1; break; case SIGALRM: ManagerProcess.alarm = 1; break; default:#ifdef SIGRTMIN if (sig == SIGRTMIN) { swServer_reopen_log_file(SwooleG.serv); }#endif break; }}static int swManager_loop(swFactory *factory){ int pid, new_pid; int i; int reload_worker_i = 0; int reload_worker_num; int reload_init = 0; pid_t reload_worker_pid = 0; int status; SwooleG.use_signalfd = 0; SwooleG.use_timerfd = 0; memset(&ManagerProcess, 0, sizeof(ManagerProcess)); swServer *serv = factory->ptr; swWorker *reload_workers; if (serv->hooks[SW_SERVER_HOOK_MANAGER_START]) { swServer_call_hook(serv, SW_SERVER_HOOK_MANAGER_START, serv); } if (serv->onManagerStart) { serv->onManagerStart(serv); } reload_worker_num = serv->worker_num + serv->task_worker_num; reload_workers = sw_calloc(reload_worker_num, sizeof(swWorker)); if (reload_workers == NULL) { swError("malloc[reload_workers] failed"); return SW_ERR; } //for reload swSignal_add(SIGHUP, NULL); swSignal_add(SIGTERM, swManager_signal_handle); swSignal_add(SIGUSR1, swManager_signal_handle); swSignal_add(SIGUSR2, swManager_signal_handle); swSignal_add(SIGIO, swManager_signal_handle);#ifdef SIGRTMIN swSignal_add(SIGRTMIN, swManager_signal_handle);#endif //swSignal_add(SIGINT, swManager_signal_handle); if (serv->manager_alarm > 0) { alarm(serv->manager_alarm); swSignal_add(SIGALRM, swManager_signal_handle); } SwooleG.main_reactor = NULL; while (SwooleG.running > 0) { _wait: pid = wait(&status); if (ManagerProcess.read_message) { swWorkerStopMessage msg; while (swChannel_pop(serv->message_box, &msg, sizeof(msg)) > 0) { if (SwooleG.running == 0) { continue; } pid_t new_pid = swManager_spawn_worker(factory, msg.worker_id); if (new_pid > 0) { serv->workers[msg.worker_id].pid = new_pid; } } ManagerProcess.read_message = 0; } if (pid < 0) { if (ManagerProcess.alarm == 1) { ManagerProcess.alarm = 0; alarm(serv->manager_alarm); if (serv->hooks[SW_SERVER_HOOK_MANAGER_TIMER]) { swServer_call_hook(serv, SW_SERVER_HOOK_MANAGER_TIMER, serv); } } if (ManagerProcess.reloading == 0) { error: if (errno != EINTR) { swSysError("wait() failed."); } continue; } //reload task & event workers else if (ManagerProcess.reload_all_worker == 1) { swNotice("Server is reloading now."); if (reload_init == 0) { reload_init = 1; memcpy(reload_workers, serv->workers, sizeof(swWorker) * serv->worker_num); reload_worker_num = serv->worker_num; if (serv->task_worker_num > 0) { memcpy(reload_workers + serv->worker_num, serv->gs->task_workers.workers, sizeof(swWorker) * serv->task_worker_num); reload_worker_num += serv->task_worker_num; } ManagerProcess.reload_all_worker = 0; if (serv->reload_async) { for (i = 0; i < serv->worker_num; i++) { if (kill(reload_workers[i].pid, SIGTERM) < 0) { swSysError("kill(%d, SIGTERM) [%d] failed.", reload_workers[i].pid, i); } } reload_worker_i = serv->worker_num; } else { reload_worker_i = 0; } } goto kill_worker; } //only reload task workers else if (ManagerProcess.reload_task_worker == 1) { if (serv->task_worker_num == 0) { swWarn("cannot reload task workers, task workers is not started."); continue; } swNotice("Server is reloading now."); if (reload_init == 0) { memcpy(reload_workers, serv->gs->task_workers.workers, sizeof(swWorker) * serv->task_worker_num); reload_worker_num = serv->task_worker_num; reload_worker_i = 0; reload_init = 1; ManagerProcess.reload_task_worker = 0; } goto kill_worker; } else { goto error; } } if (SwooleG.running == 1) { //event workers for (i = 0; i < serv->worker_num; i++) { //compare PID if (pid != serv->workers[i].pid) { continue; } if (WIFSTOPPED(status) && serv->workers[i].tracer) { serv->workers[i].tracer(&serv->workers[i]); serv->workers[i].tracer = NULL; goto _wait; } //Check the process return code and signal swManager_check_exit_status(serv, i, pid, status); while (1) { new_pid = swManager_spawn_worker(factory, i); if (new_pid < 0) { usleep(100000); continue; } else { serv->workers[i].pid = new_pid; break; } } } swWorker *exit_worker; //task worker if (serv->gs->task_workers.map) { exit_worker = swHashMap_find_int(serv->gs->task_workers.map, pid); if (exit_worker != NULL) { if (WIFSTOPPED(status) && exit_worker->tracer) { exit_worker->tracer(exit_worker); exit_worker->tracer = NULL; goto _wait; } swManager_check_exit_status(serv, exit_worker->id, pid, status); swProcessPool_spawn(&serv->gs->task_workers, exit_worker); } } //user process if (serv->user_worker_map != NULL) { swManager_wait_user_worker(&serv->gs->event_workers, pid, status); } if (pid == reload_worker_pid) { reload_worker_i++; } } //reload worker kill_worker: if (ManagerProcess.reloading == 1) { //reload finish if (reload_worker_i >= reload_worker_num) { reload_worker_pid = reload_worker_i = reload_init = ManagerProcess.reloading = 0; continue; } reload_worker_pid = reload_workers[reload_worker_i].pid; if (kill(reload_worker_pid, SIGTERM) < 0) { if (errno == ECHILD) { reload_worker_i++; goto kill_worker; } swSysError("kill(%d, SIGTERM) [%d] failed.", reload_workers[reload_worker_i].pid, reload_worker_i); } } } sw_free(reload_workers); swSignal_none(); //kill all child process for (i = 0; i < serv->worker_num; i++) { swTrace("[Manager]kill worker processor"); kill(serv->workers[i].pid, SIGTERM); } //kill and wait task process if (serv->task_worker_num > 0) { swProcessPool_shutdown(&serv->gs->task_workers); } //wait child process for (i = 0; i < serv->worker_num; i++) { if (swWaitpid(serv->workers[i].pid, &status, 0) < 0) { swSysError("waitpid(%d) failed.", serv->workers[i].pid); } } //kill all user process if (serv->user_worker_map) { swManager_kill_user_worker(serv); } if (serv->onManagerStop) { serv->onManagerStop(serv); } return SW_OK;}void php_swoole_trace_check(void *arg){ swServer *serv = (swServer *) arg; uint8_t timeout = serv->request_slowlog_timeout; int count = serv->worker_num + serv->task_worker_num; int i = serv->trace_event_worker ? 0 : serv->worker_num; swWorker *worker; for (; i < count; i++) { worker = swServer_get_worker(serv, i); swTraceLog(SW_TRACE_SERVER, "trace request, worker#%d, pid=%d. request_time=%d.", i, worker->pid, worker->request_time); if (!(worker->request_time > 0 && worker->traced == 0 && serv->gs->now - worker->request_time >= timeout)) { continue; } if (ptrace(PTRACE_ATTACH, worker->pid, 0, 0) < 0) { swSysError("failed to ptrace(ATTACH, %d) worker#%d,", worker->pid, worker->id); continue; } worker->tracer = trace_request; worker->traced = 1; }}static void swManager_check_exit_status(swServer *serv, int worker_id, pid_t pid, int status){ if (status != 0) { swWarn("worker#%d abnormal exit, status=%d, signal=%d", worker_id, WEXITSTATUS(status), WTERMSIG(status)); if (serv->onWorkerError != NULL) { serv->onWorkerError(serv, worker_id, pid, WEXITSTATUS(status), WTERMSIG(status)); } }}

swWorker_loop 函數 worker 事件循環

  • worker 進程的事件循環和 reactor 線程類似,都是創建 reactor 對象,然後調用 SwooleG.main_reactor->wait 函數進行事件循環,不同的是 worker 進程監控的是 pipe_worker 這個 socket
  • 如果 workerdispatch_modestreamreactor 還要監聽 serv->stream_fd,以便可以更加高效的消費 reactor 線程發送的數據
  • swServer_worker_init 函數用於初始化 worker 進程,swWorker_onStart 用於調用回調函數,swWorker_onStop 用於停止 worker 進程

int swWorker_loop(swFactory *factory, int worker_id){ swServer *serv = factory->ptr;#ifndef SW_WORKER_USE_SIGNALFD SwooleG.use_signalfd = 0;#elif defined(HAVE_SIGNALFD) SwooleG.use_signalfd = 1;#endif //timerfd#ifdef HAVE_TIMERFD SwooleG.use_timerfd = 1;#endif //worker_id SwooleWG.id = worker_id; SwooleG.pid = getpid(); swWorker *worker = swServer_get_worker(serv, worker_id); swServer_worker_init(serv, worker); SwooleG.main_reactor = sw_malloc(sizeof(swReactor)); if (SwooleG.main_reactor == NULL) { swError("[Worker] malloc for reactor failed."); return SW_ERR; } if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0) { swError("[Worker] create worker_reactor failed."); return SW_ERR; } worker->status = SW_WORKER_IDLE; int pipe_worker = worker->pipe_worker; swSetNonBlock(pipe_worker); SwooleG.main_reactor->ptr = serv; SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_WRITE, swReactor_onWrite); /** * set pipe buffer size */ int i; swConnection *pipe_socket; for (i = 0; i < serv->worker_num + serv->task_worker_num; i++) { worker = swServer_get_worker(serv, i); pipe_socket = swReactor_get(SwooleG.main_reactor, worker->pipe_master); pipe_socket->buffer_size = SW_MAX_INT; pipe_socket = swReactor_get(SwooleG.main_reactor, worker->pipe_worker); pipe_socket->buffer_size = SW_MAX_INT; } if (serv->dispatch_mode == SW_DISPATCH_STREAM) { SwooleG.main_reactor->add(SwooleG.main_reactor, serv->stream_fd, SW_FD_LISTEN | SW_EVENT_READ); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_LISTEN, swWorker_onStreamAccept); SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_STREAM, swWorker_onStreamRead); swStream_set_protocol(&serv->stream_protocol); serv->stream_protocol.package_max_length = SW_MAX_INT; serv->stream_protocol.onPackage = swWorker_onStreamPackage; serv->buffer_pool = swLinkedList_new(0, NULL); } swWorker_onStart(serv);#ifdef HAVE_SIGNALFD if (SwooleG.use_signalfd) { swSignalfd_setup(SwooleG.main_reactor); }#endif //main loop SwooleG.main_reactor->wait(SwooleG.main_reactor, NULL); //clear pipe buffer swWorker_clean(); //worker shutdown swWorker_onStop(serv); return SW_OK;}

swServer_worker_init 初始化函數

  • reactor 線程一樣,首先如果設置了 CPU 親和度的話,要將 worker 進程綁定到特定的 CPU 上,指定 CPU 的方法仍然是 SwooleWG.id % serv->cpu_affinity_available_num,這樣可以保證對應的 reactor 線程和 worker 進程在同一個 CPU 核上
  • swWorker_signal_init 用於設置 worker 進程的信號處理函數:SIGTERM 信號用於關閉當前 worker 進程;SIGALRM 代表定時任務。
  • buffer_input 用於存儲來源於 reactor 線程發送的數據,是一個 serv->reactor_num + serv->dgram_port_num 大小的數組。

void swWorker_signal_init(void){ swSignal_clear(); /** * use user settings */ SwooleG.use_signalfd = SwooleG.enable_signalfd; swSignal_add(SIGHUP, NULL); swSignal_add(SIGPIPE, NULL); swSignal_add(SIGUSR1, NULL); swSignal_add(SIGUSR2, NULL); swSignal_add(SIGTERM, swWorker_signal_handler); swSignal_add(SIGALRM, swSystemTimer_signal_handler); //for test swSignal_add(SIGVTALRM, swWorker_signal_handler);#ifdef SIGRTMIN swSignal_add(SIGRTMIN, swWorker_signal_handler);#endif}int swServer_worker_init(swServer *serv, swWorker *worker){#ifdef HAVE_CPU_AFFINITY if (serv->open_cpu_affinity) { cpu_set_t cpu_set; CPU_ZERO(&cpu_set); if (serv->cpu_affinity_available_num) { CPU_SET(serv->cpu_affinity_available[SwooleWG.id % serv->cpu_affinity_available_num], &cpu_set); } else { CPU_SET(SwooleWG.id % SW_CPU_NUM, &cpu_set); }#ifdef __FreeBSD__ if (cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1, sizeof(cpu_set), &cpu_set) < 0)#else if (sched_setaffinity(getpid(), sizeof(cpu_set), &cpu_set) < 0)#endif { swSysError("sched_setaffinity() failed."); } }#endif //signal init swWorker_signal_init(); SwooleWG.buffer_input = swServer_create_worker_buffer(serv); if (!SwooleWG.buffer_input) { return SW_ERR; } if (serv->max_request < 1) { SwooleWG.run_always = 1; } else { SwooleWG.max_request = serv->max_request; if (SwooleWG.max_request > 10) { int n = swoole_system_random(1, SwooleWG.max_request / 2); if (n > 0) { SwooleWG.max_request += n; } } } worker->start_time = serv->gs->now; worker->request_time = 0; worker->request_count = 0; return SW_OK;}swString** swServer_create_worker_buffer(swServer *serv){ int i; int buffer_num; if (serv->factory_mode == SW_MODE_SINGLE) { buffer_num = 1; } else { buffer_num = serv->reactor_num + serv->dgram_port_num; } swString **buffers = sw_malloc(sizeof(swString*) * buffer_num); if (buffers == NULL) { swError("malloc for worker buffer_input failed."); return NULL; } for (i = 0; i < buffer_num; i++) { buffers[i] = swString_new(SW_BUFFER_SIZE_BIG); if (buffers[i] == NULL) { swError("worker buffer_input init failed."); return NULL; } } return buffers;}

推薦閱讀:

為什麼世界互聯網大會在烏鎮開?
國產手機信號最好的是哪個品牌?
HTTPS的優點與缺點 網站優化裡面HTTPS有著什麼用處

TAG:PHP | 科技 | Swoole |