標籤:

Swoole-Demo(1) TCP服務端簡單實現

本專欄將會出一系列關於Swoole的各種demo. 由簡單到複雜,由單機到集群、分散式架構。

不多說了,開干吧。

  1. tcp 服務端簡單demo與client .

<?php /** * author : rookiejin <mrjnamei@gmail.com> * createTime : 2018/1/4 10:26 * description: tcp.php - swoole-demo * 該代碼是一份簡單的面向對象形式的 tcp 伺服器和客戶端通訊的demo * 功能:實現伺服器端tcp簡單demo */// 創建一個tcp伺服器$server = new swoole_server("127.0.0.1", 9501);/** * @var $server swoole_server * @var $fd int 文件描述符 */$server->on("connect", function($server , $fd){ echo "a client connected
" ;});/** * @var $server swoole_server * @var $fd int 文件描述符 * @var $from_id worker_id worker進程id * @var $data 接受的數據 */$server->on("receive", function($server , $fd , $from_id ,$data){ echo "#server received msg:" , $data , "
"; $server->send($fd , "i received");});/** * @var $server swoole_server * @var $fd 文件描述符 */$server->on("close",function($server, $fd){ echo "# client closed
";});// 啟動伺服器$server->start();

client.php

<?php$client = new swoole_client(SWOOLE_SOCK_TCP);if(!$client->connect("127.0.0.1", 9501, -1)){ exit("connect failed" . $client->errCode . "
");}$client->send("helloworld");echo $client->recv() , "
";$client->close();

2. 使用面向對象的方式來寫TCP伺服器.

<?php/** * author : rookiejin <mrjnamei@gmail.com> * createTime : 2018/1/4 10:26 * description: php_oop.php - swoole-demo * 該代碼是一份乾淨的tcp server 事件回調, * 沒有任何對事件回調的業務處理 . * 以該代碼為基準,後面的demo都在此基礎上修改 . */class Server { /** * @var swoole_server */ public $server ; /** * 配置項 * @var $config array */ public $config ; /** * @var Server */ public static $_worker ; /** * 存儲pid文件的位置 */ public $pidFile ; /** * worker 進程的數量 * @var $worker_num */ public $worker_num; /** * 當前進程的worker_id * @var $worker_id */ public $worker_id ; /** * task 進程數 + worker 進程數 = 總的服務進程 * 給其他的進程發送消息: * for($i = 0 ; $i < $count ; $i ++) { * if($i == $this->worker_id) continue;表示是該進程 * $this->server->sendMessage($i , $data); * } * task 進程的數量 * @var $task_num */ public $task_num ; /** * Server constructor. * * @param array $config */ public function __construct(array $config) { $this->server = new swoole_server($config [host] , $config [port]); $this->config = $config; $this->serverConfig(); self::$_worker = & $this; // 引用 } public function serverConfig() { $this->server->set($this->config[server]); } public function start() { // Server啟動在主進程的主線程回調此函數 $this->server->on("start",[$this , "onSwooleStart"]); // 此事件在Server正常結束時發生 $this->server->on("shutDown", [$this , "onSwooleShutDown"]); //事件在Worker進程/Task進程啟動時發生。這裡創建的對象可以在進程生命周期內使用。 $this->server->on("workerStart", [$this , "onSwooleWorkerStart"]); // 此事件在worker進程終止時發生。在此函數中可以回收worker進程申請的各類資源。 $this->server->on("workerStop",[$this, "onSwooleWorkerStop"]); // worker 向task_worker進程投遞任務觸發 $this->server->on("task", [$this, "onSwooleTask"]); // task_worker 返回值傳給worker進程時觸發 $this->server->on("finish",[$this , "onSwooleFinish"]); // 當工作進程收到由 sendMessage 發送的管道消息時會觸發onPipeMessage事件 $this->server->on("pipeMessage",[$this ,"onSwoolePipeMessage"]); // 當worker/task_worker進程發生異常後會在Manager進程內回調此函數 $this->server->on("workerError", [$this , "onSwooleWrokerError"]); // 當管理進程啟動時調用它,函數原型: $this->server->on("managerStart", [$this , "onSwooleManagerStart"]); // onManagerStop $this->server->on("managerStop", [$this , "onSwooleManagerStop"]); // 有新的連接進入時,在worker進程中回調。 $this->server->on("connect" , [$this ,onSwooleConnect]); // 接收到數據時回調此函數,發生在worker進程中 $this->server->on("receive", [$this, onSwooleReceive]); //CP客戶端連接關閉後,在worker進程中回調此函數。函數原型: $this->server->on("close", [$this ,"onSwooleClose"]); $this->server->start(); } /** * @warning 進程隔離 * 該步驟一般用於存儲進程的 master_pid 和 manager_pid 到文件中 * 本例子存儲的位置是 __DIR__ . "/tmp/" 下面 * 可以用 kill -15 master_pid 發送信號給進程關閉伺服器,並且觸發下面的onSwooleShutDown事件 * @param $server */ public function onSwooleStart($server) { $this->setProcessName(SwooleMaster); $debug = debug_backtrace(); $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" ); $pid = [$server->master_pid , $server->manager_pid]; file_put_contents($this->pidFile , implode(",", $pid)); } /** * @param $server * 已關閉所有Reactor線程、HeartbeatCheck線程、UdpRecv線程 * 已關閉所有Worker進程、Task進程、User進程 * 已close所有TCP/UDP/UnixSocket監聽埠 * 已關閉主Reactor * @warning * 強制kill進程不會回調onShutdown,如kill -9 * 需要使用kill -15來發送SIGTREM信號到主進程才能按照正常的流程終止 * 在命令行中使用Ctrl+C中斷程序會立即停止,底層不會回調onShutdown */ public function onSwooleShutDown($server) { echo "shutdown
"; } /** * @warning 進程隔離 * 該函數具有進程隔離性 , * {$this} 對象從 swoole_server->start() 開始前設置的屬性全部繼承 * {$this} 對象在 onSwooleStart,onSwooleManagerStart中設置的對象屬於不同的進程中. * 因此這裡的pidFile雖然在onSwooleStart中設置了,但是是不同的進程,所以找不到該值. * @param swoole_server $server * @param int $worker_id */ public function onSwooleWorkerStart(swoole_server $server, int $worker_id) { if($this->isTaskProcess($server)) { $this->setProcessName(SwooleTask); } else{ $this->setProcessName(SwooleWorker); } $debug = debug_backtrace(); $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" ); file_put_contents($this->pidFile , ",{$worker_id}" , FILE_APPEND); } public function onSwooleWorkerStop($server,$worker_id) { echo "#worker exited {$worker_id}
"; } /** * @warning 進程隔離 在task_worker進程內被調用 * worker進程可以使用swoole_server_task函數向task_worker進程投遞新的任務 * $task_id和$src_worker_id組合起來才是全局唯一的,不同的worker進程投遞的任務ID可能會有相同 * 函數執行時遇到致命錯誤退出,或者被外部進程強制kill,當前的任務會被丟棄,但不會影響其他正在排隊的Task * @param $server * @param $task_id 是任務ID 由swoole擴展內自動生成,用於區分不同的任務 * @param $src_worker_id 來自於哪個worker進程 * @param $data 是任務的內容 * @return mixed $data */ public function onSwooleTask($server , $task_id, $src_worker_id,$data) { return $data ; } public function onSwooleFinish() { } /** * 當工作進程收到由 sendMessage 發送的管道消息時會觸發onPipeMessage事件。worker/task進程都可能會觸發onPipeMessage事件。 * @param $server * @param $src_worker_id 消息來自哪個Worker進程 * @param $message 消息內容,可以是任意PHP類型 */ public function onSwoolePipeMessage($server , $src_worker_id,$message) { } /** * worker進程發送錯誤的錯誤處理回調 . * 記錄日誌等操作 * 此函數主要用於報警和監控,一旦發現Worker進程異常退出,那麼很有可能是遇到了致命錯誤或者進程CoreDump。通過記錄日誌或者發送報警的信息來提示開發者進行相應的處理。 * @param $server * @param $worker_id 是異常進程的編號 * @param $worker_pid 是異常進程的ID * @param $exit_code 退出的狀態碼,範圍是 1 ~255 * @param $signal 進程退出的信號 */ public function onSwooleWrokerError($server ,$worker_id,$worker_pid,$exit_code,$signal) { echo "#workerError:{$worker_id}
"; } /** * */ public function onSwooleManagerStart() { $this->setProcessName(SwooleManager); } /** * @param $server */ public function onSwooleManagerStop($server) { echo "#managerstop
"; } /** * 客戶端連接 * onConnect/onClose這2個回調發生在worker進程內,而不是主進程。 * UDP協議下只有onReceive事件,沒有onConnect/onClose事件 * @param $server * @param $fd * @param $reactorId */ public function onSwooleConnect($server ,$fd ,$reactorId) { echo "#connected
"; } /** * @param $server server對象 * @param $fd 文件描述符 * @param $reactorId reactor線程id */ public function onSwooleReceive($server,$fd,$reactorId) { echo "#received
"; } /** * 連接斷開,廣播業務需要從redis | memcached | 內存 中刪除該fd * @param $server * @param $fd * @param $reactorId */ public function onSwooleClose($server, $fd ,$reactorId) { echo "#swooleClosed
" ; } public function setProcessName($name) { if(function_exists(cli_set_process_title)) { @cli_set_process_title($name); } else{ @swoole_set_process_name($name); } } /** * 返回真說明該進程是task進程 * @param $server * @return bool */ public function isTaskProcess($server) { return $server->taskworker === true ; } /** * main 運行入口方法 */ public static function main() { self::$_worker->start(); }}$config = [server => [worker_num => 4 , "task_worker_num" => "20" , "dispatch_mode" => 3 ] , host => 0.0.0.0 , port => 9501];$server = new Server($config);Server::main() ;

  1. 本例子是註冊了swoole的基本事件監聽,回調沒有做,為下面的代碼先做一份鋪墊。
  2. 這裡主要要講的是swoole的啟動步驟.

* note1. $server = new Server ($config); 在new了一個單例Server類以後,將Server::$_worker代理到本身$this. 並且做好伺服器的配置.2. Server::main(); 該代碼為server註冊事件回調函數. 然後啟動 swoole_server::start();3. 啟動過程: 1). 先會開啟master進程. 觸發onSwooleStart事件, 可以獲取到master進程的pid和manager進程的pid. 該函數式在master進程執行的.在事件回調里實例化的任何對象只針對master進程有效. 2). 接著觸發onSwooleManagerStart. 在manager進程中執行. 該函數式在master進程執行的.在事件回調里實例化的任何對象只針對manager進程有效. 3). 接著觸發onSwooleWorkerStart. 該過程啟動worker與task_worker進程.步驟是並行的, 不分先後. worker進程與task_worker進程其實一樣,都屬於worker進程,具有進程隔離性,自己進程內 實例化的類只有在自己進程內部有用, worker_num + worker_task_num = 總的worker_id數量. 如果需要從worker進程向其他進程發送消息的話,可以這麼做: for($i = 0 ; $i < $worker_num + $task_worker_num ; $i ++) { if($i == $this->worker_id) continue; $this->server_sendMessage($i ,$message); } 4). 然後監聽connect與receive事件, 就屬於具體的業務範疇了.

下面帶來一個 tcp 聊天室的簡單案例,例子是用swoole_table存儲所有的鏈接信息. 功能可以實現群聊,單聊: 主要業務邏輯在onSwooleReceive回調中

<?php/** * author : rookiejin <mrjnamei@gmail.com> * createTime : 2018/1/4 10:26 * description: tcp_get_and_send.php - swoole-demo * 該代碼是一份簡單的面向對象形式的 tcp 伺服器和客戶端通訊的demo * 功能:單發. 群發. */class Server { /** * @var swoole_server */ public $server ; /** * 配置項 * @var $config array */ public $config ; /** * @var Server */ public static $_worker ; /** * 存儲pid文件的位置 */ public $pidFile ; /** * worker 進程的數量 * @var $worker_num */ public $worker_num; /** * 當前進程的worker_id * @var $worker_id */ public $worker_id ; /** * task 進程數 + worker 進程數 = 總的服務進程 * 給其他的進程發送消息: * for($i = 0 ; $i < $count ; $i ++) { * if($i == $this->worker_id) continue;表示是該進程 * $this->server->sendMessage($i , $data); * } * task 進程的數量 * @var $task_num */ public $task_num ; /** * @var $table swoole_table 內存表 */ public $table; /** * Server constructor. * * @param array $config */ public function __construct(array $config) { $this->server = new swoole_server($config [host] , $config [port]); $this->config = $config; $this->serverConfig(); $this->createTable(); self::$_worker = & $this; // 引用 } private function serverConfig() { $this->server->set($this->config[server]); } /** * 創建swoole_table */ private function createTable() { $this->table = new swoole_table( 65536 ); $this->table->column("fd",swoole_table::TYPE_INT , 8); $this->table->column("worker_id", swoole_table::TYPE_INT , 4); $this->table->column("name",swoole_table::TYPE_STRING,255); $this->table->create(); } public function start() { // Server啟動在主進程的主線程回調此函數 $this->server->on("start",[$this , "onSwooleStart"]); // 此事件在Server正常結束時發生 $this->server->on("shutDown", [$this , "onSwooleShutDown"]); //事件在Worker進程/Task進程啟動時發生。這裡創建的對象可以在進程生命周期內使用。 $this->server->on("workerStart", [$this , "onSwooleWorkerStart"]); // 此事件在worker進程終止時發生。在此函數中可以回收worker進程申請的各類資源。 $this->server->on("workerStop",[$this, "onSwooleWorkerStop"]); // worker 向task_worker進程投遞任務觸發 $this->server->on("task", [$this, "onSwooleTask"]); // task_worker 返回值傳給worker進程時觸發 $this->server->on("finish",[$this , "onSwooleFinish"]); // 當工作進程收到由 sendMessage 發送的管道消息時會觸發onPipeMessage事件 $this->server->on("pipeMessage",[$this ,"onSwoolePipeMessage"]); // 當worker/task_worker進程發生異常後會在Manager進程內回調此函數 $this->server->on("workerError", [$this , "onSwooleWrokerError"]); // 當管理進程啟動時調用它,函數原型: $this->server->on("managerStart", [$this , "onSwooleManagerStart"]); // onManagerStop $this->server->on("managerStop", [$this , "onSwooleManagerStop"]); // 有新的連接進入時,在worker進程中回調。 $this->server->on("connect" , [$this ,onSwooleConnect]); // 接收到數據時回調此函數,發生在worker進程中 $this->server->on("receive", [$this, onSwooleReceive]); //CP客戶端連接關閉後,在worker進程中回調此函數。函數原型: $this->server->on("close", [$this ,"onSwooleClose"]); $this->server->start(); } /** * @warning 進程隔離 * 該步驟一般用於存儲進程的 master_pid 和 manager_pid 到文件中 * 本例子存儲的位置是 __DIR__ . "/tmp/" 下面 * 可以用 kill -15 master_pid 發送信號給進程關閉伺服器,並且觸發下面的onSwooleShutDown事件 * @param $server */ public function onSwooleStart($server) { $this->setProcessName(SwooleMaster); $debug = debug_backtrace(); $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" ); $pid = [$server->master_pid , $server->manager_pid]; file_put_contents($this->pidFile , implode(",", $pid)); } /** * @param $server * 已關閉所有Reactor線程、HeartbeatCheck線程、UdpRecv線程 * 已關閉所有Worker進程、Task進程、User進程 * 已close所有TCP/UDP/UnixSocket監聽埠 * 已關閉主Reactor * @warning * 強制kill進程不會回調onShutdown,如kill -9 * 需要使用kill -15來發送SIGTREM信號到主進程才能按照正常的流程終止 * 在命令行中使用Ctrl+C中斷程序會立即停止,底層不會回調onShutdown */ public function onSwooleShutDown($server) { echo "shutdown
"; } /** * @warning 進程隔離 * 該函數具有進程隔離性 , * {$this} 對象從 swoole_server->start() 開始前設置的屬性全部繼承 * {$this} 對象在 onSwooleStart,onSwooleManagerStart中設置的對象屬於不同的進程中. * 因此這裡的pidFile雖然在onSwooleStart中設置了,但是是不同的進程,所以找不到該值. * @param swoole_server $server * @param int $worker_id */ public function onSwooleWorkerStart(swoole_server $server, int $worker_id) { if($this->isTaskProcess($server)) { $this->setProcessName(SwooleTask); } else{ $this->setProcessName(SwooleWorker); } $debug = debug_backtrace(); $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" ); file_put_contents($this->pidFile , ",{$worker_id}" , FILE_APPEND); } public function onSwooleWorkerStop($server,$worker_id) { echo "#worker exited {$worker_id}
"; } /** * @warning 進程隔離 在task_worker進程內被調用 * worker進程可以使用swoole_server_task函數向task_worker進程投遞新的任務 * $task_id和$src_worker_id組合起來才是全局唯一的,不同的worker進程投遞的任務ID可能會有相同 * 函數執行時遇到致命錯誤退出,或者被外部進程強制kill,當前的任務會被丟棄,但不會影響其他正在排隊的Task * @param $server * @param $task_id 是任務ID 由swoole擴展內自動生成,用於區分不同的任務 * @param $src_worker_id 來自於哪個worker進程 * @param $data 是任務的內容 * @return mixed $data */ public function onSwooleTask($server , $task_id, $src_worker_id,$data) { // todo } public function onSwooleFinish() { // todo } /** * 當工作進程收到由 sendMessage 發送的管道消息時會觸發onPipeMessage事件。worker/task進程都可能會觸發onPipeMessage事件。 * @param $server * @param $src_worker_id 消息來自哪個Worker進程 * @param $message 消息內容,可以是任意PHP類型 */ public function onSwoolePipeMessage($server , $src_worker_id,$message) { // todo } /** * worker進程發送錯誤的錯誤處理回調 . * 記錄日誌等操作 * 此函數主要用於報警和監控,一旦發現Worker進程異常退出,那麼很有可能是遇到了致命錯誤或者進程CoreDump。通過記錄日誌或者發送報警的信息來提示開發者進行相應的處理。 * @param $server * @param $worker_id 是異常進程的編號 * @param $worker_pid 是異常進程的ID * @param $exit_code 退出的狀態碼,範圍是 1 ~255 * @param $signal 進程退出的信號 */ public function onSwooleWrokerError($server ,$worker_id,$worker_pid,$exit_code,$signal) { echo "#workerError:{$worker_id}
"; } /** * */ public function onSwooleManagerStart() { $this->setProcessName(SwooleManager); } /** * @param $server */ public function onSwooleManagerStop($server) { echo "#managerstop
"; } /** * 客戶端連接 * onConnect/onClose這2個回調發生在worker進程內,而不是主進程。 * UDP協議下只有onReceive事件,沒有onConnect/onClose事件 * @param $server * @param $fd * @param $reactorId */ public function onSwooleConnect($server ,$fd ,$reactorId) { echo "#{$fd} has connected
"; $server->send($fd , "please input your name
"); } /** * @param $server server對象 * @param $fd 文件描述符 * @param $reactorId reactor線程id * @param $data 數據 */ public function onSwooleReceive($server,$fd,$reactorId, $data) { $data = json_decode($data, true); $exist = $this->table->exist($fd); $from = $data [from]; $to = $data [to]; $message = $data [message]; if(!$exist) { foreach ($this->table as $row) { if($row [name] == $from) { $server->send($fd , name already exists); return ; } } $this->table->set($fd , [name => $from , fd => $fd]); $server->send($fd , "welcome to join tcp chat room
"); } // 發送給其他人 . if($to == all) { $this->sendToAllExceptHim($server , $message , $fd); return ; } if(!empty($to) && !empty($message)) { $this->sendToOne($server ,$message ,$to); } return ; } private function sendToOne($server , $message , $name) { foreach ($this->table as $row) { if($row [name] == $name) { $server->send($row [fd] , $message); return ; } } } private function sendToAllExceptHim($server , $message, $fd) { foreach ($this->table as $row) { if($row[fd] == $fd) continue ; $server->send($row [fd] , $message); } } /** * 連接斷開,廣播業務需要從redis | memcached | 內存 中刪除該fd * @param $server * @param $fd * @param $reactorId */ public function onSwooleClose($server, $fd ,$reactorId) { $this->table->del($fd); } public function setProcessName($name) { if(function_exists(cli_set_process_title)) { @cli_set_process_title($name); } else{ @swoole_set_process_name($name); } } /** * 返回真說明該進程是task進程 * @param $server * @return bool */ public function isTaskProcess($server) { return $server->taskworker === true ; } /** * main 運行入口方法 */ public static function main() { self::$_worker->start(); }}$config = [server => [worker_num => 4 , "task_worker_num" => "20" , "dispatch_mode" => 3 ] , host => 0.0.0.0 , port => 9501];$server = new Server($config);Server::main() ;

如有錯誤,敬請糾正!

本代碼已託管到github: github.com/clearcodecn/

另外:clearcode.cn是本人正在建設中的一個社區,旨在講解伺服器端技術與原理,提倡開源精神,授人以魚不如授人以漁,由於時間不足,導致進展緩慢,希望有興趣的朋友一起加入.

QQ群: 139348611

轉載請申明來源!


推薦閱讀:

TAG:PHP | Swoole |