server.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. <?php
  2. class G
  3. {
  4. static $index = 0;
  5. static $serv;
  6. static $config = array(
  7. //'reactor_num' => 16, // 线程数. 一般设置为CPU核数的1-4倍
  8. 'worker_num' => 2, // 工作进程数量. 设置为CPU的1-4倍最合理
  9. 'max_request' => 1000, // 防止 PHP 内存溢出, 一个工作进程处理 X 次任务后自动重启 (注: 0,不自动重启)
  10. 'max_conn' => 10000, // 最大连接数
  11. 'task_worker_num' => 1, // 任务工作进程数量
  12. // 'task_ipc_mode' => 2, // 设置 Task 进程与 Worker 进程之间通信的方式。
  13. 'task_max_request' => 0, // 防止 PHP 内存溢出
  14. //'task_tmpdir' => '/tmp',
  15. //'message_queue_key' => ftok(SYS_ROOT . 'queue.msg', 1),
  16. 'dispatch_mode' => 2,
  17. //'daemonize' => 1, // 设置守护进程模式
  18. 'backlog' => 128,
  19. //'log_file' => '/data/logs/swoole.log',
  20. 'heartbeat_check_interval' => 2, // 心跳检测间隔时长(秒)
  21. 'heartbeat_idle_time' => 3, // 连接最大允许空闲的时间
  22. //'open_eof_check' => 1,
  23. //'open_eof_split' => 1,
  24. //'package_eof' => "\r\r\n",
  25. //'open_cpu_affinity' => 1,
  26. 'socket_buffer_size' => 1024 * 1024 * 128,
  27. 'output_buffer_size' => 1024 * 1024 * 2,
  28. //'enable_delay_receive' => true,
  29. //'cpu_affinity_ignore' =>array(0,1)//如果你的网卡2个队列(或者没有多队列那么默认是cpu0来处理中断),并且绑定了core 0和core 1,那么可以通过这个设置避免swoole的线程或者进程绑定到这2个core,防止cpu0,1被耗光而造成的丢包
  30. );
  31. }
  32. if (isset($argv[1]) and $argv[1] == 'daemon') {
  33. G::$config['daemonize'] = true;
  34. } else {
  35. G::$config['daemonize'] = false;
  36. }
  37. //$mode = SWOOLE_BASE;
  38. $mode = SWOOLE_PROCESS;
  39. $serv = new swoole_server("0.0.0.0", 9501, $mode, SWOOLE_SOCK_TCP);
  40. $serv->listen('0.0.0.0', 9502, SWOOLE_SOCK_UDP);
  41. $serv->listen('::', 9503, SWOOLE_SOCK_TCP6);
  42. $serv->listen('::', 9504, SWOOLE_SOCK_UDP6);
  43. $process1 = new swoole_process(function ($worker) use ($serv) {
  44. global $argv;
  45. swoole_set_process_name("php {$argv[0]}: my_process1");
  46. swoole_timer_tick(2000, function ($interval) use ($worker, $serv) {
  47. echo "#{$worker->pid} child process timer $interval\n"; // 如果worker中没有定时器,则会输出 process timer xxx
  48. foreach ($serv->connections as $conn)
  49. {
  50. $serv->send($conn, "heartbeat\n");
  51. }
  52. });
  53. swoole_timer_tick(5000, function () use ($serv)
  54. {
  55. $serv->sendMessage("hello event worker", 0);
  56. $serv->sendMessage("hello task worker", 4);
  57. });
  58. }, false);
  59. //$serv->addprocess($process1);
  60. $process2 = new swoole_process(function ($worker) use ($serv) {
  61. global $argv;
  62. swoole_set_process_name("php {$argv[0]}: my_process2");
  63. swoole_timer_tick(2000, function ($interval) use ($worker, $serv) {
  64. echo "#{$worker->pid} child process timer $interval\n"; // 如果worker中没有定时器,则会输出 process timer xxx
  65. });
  66. }, false);
  67. //$serv->addprocess($process2);
  68. $serv->set(G::$config);
  69. $serv->set(['reactor_num' => 4]);
  70. /**
  71. * 使用类的静态属性,可以直接访问
  72. */
  73. G::$serv = $serv;
  74. function my_onStart(swoole_server $serv)
  75. {
  76. global $argv;
  77. swoole_set_process_name("php {$argv[0]}: master");
  78. my_log("Server: start.Swoole version is [".SWOOLE_VERSION."]");
  79. my_log("MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}");
  80. }
  81. function my_log($msg)
  82. {
  83. global $serv;
  84. if (empty($serv->worker_pid))
  85. {
  86. $serv->worker_pid = posix_getpid();
  87. }
  88. echo "#".$serv->worker_pid."\t[".date('H:i:s')."]\t".$msg.PHP_EOL;
  89. }
  90. function forkChildInWorker() {
  91. global $serv;
  92. echo "on worker start\n";
  93. $process = new swoole_process( function (swoole_process $worker) use ($serv) {
  94. // $serv = new swoole_server( "0.0.0.0", 9503 );
  95. // $serv->set(array(
  96. // 'worker_num' => 1
  97. // ));
  98. // $serv->on ( 'receive', function (swoole_server $serv, $fd, $reactor_id, $data) {
  99. // $serv->send ( $fd, "Swoole: " . $data );
  100. // $serv->close ( $fd );
  101. // });
  102. // $serv->start ();
  103. // swoole_event_add ($worker->pipe, function ($pipe) use ($worker) {
  104. // echo $worker->read()."\n";
  105. // });
  106. });
  107. $pid = $process->start();
  108. echo "Fork child process success. pid={$pid}\n";
  109. //保存子进程对象,这里如果不保存,那对象会被销毁,管道也会被关闭
  110. $serv->childprocess = $process;
  111. }
  112. function processRename(swoole_server $serv, $worker_id) {
  113. global $argv;
  114. if ( $serv->taskworker)
  115. {
  116. swoole_set_process_name("php {$argv[0]}: task");
  117. }
  118. else
  119. {
  120. swoole_set_process_name("php {$argv[0]}: worker");
  121. }
  122. // if ($worker_id == 0)
  123. // {
  124. // var_dump($serv->setting);
  125. // }
  126. my_log("WorkerStart: MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}|WorkerId={$serv->worker_id}|WorkerPid={$serv->worker_pid}");
  127. }
  128. function setTimerInWorker(swoole_server $serv, $worker_id) {
  129. if ($worker_id == 0) {
  130. echo "Start: ".microtime(true)."\n";
  131. //$serv->addtimer(3000);
  132. // $serv->addtimer(7000);
  133. //var_dump($serv->gettimer());
  134. }
  135. // $serv->after(2000, function(){
  136. // echo "Timeout: ".microtime(true)."\n";
  137. // });
  138. // $serv->after(5000, function(){
  139. // echo "Timeout: ".microtime(true)."\n";
  140. // global $serv;
  141. // $serv->deltimer(3000);
  142. // });
  143. }
  144. function my_onShutdown($serv)
  145. {
  146. echo "Server: onShutdown\n";
  147. }
  148. function my_onClose(swoole_server $serv, $fd, $reactor_id)
  149. {
  150. my_log("Client[$fd@$reactor_id]: fd=$fd is closed");
  151. var_dump($serv->getClientInfo($fd));
  152. }
  153. function my_onConnect(swoole_server $serv, $fd, $reactor_id)
  154. {
  155. //throw new Exception("hello world");
  156. // var_dump($serv->connection_info($fd));
  157. //var_dump($serv, $fd, $reactor_id);
  158. // echo "Worker#{$serv->worker_pid} Client[$fd@$reactor_id]: Connect.\n";
  159. $serv->after(2000, function() use ($serv, $fd) {
  160. $serv->confirm($fd);
  161. });
  162. my_log("Client: Connect --- {$fd}");
  163. }
  164. function timer_show($id)
  165. {
  166. my_log("Timer#$id");
  167. }
  168. function my_onWorkerExit(swoole_server $serv, $worker_id) {
  169. global $argv;
  170. }
  171. function my_onWorkerStart(swoole_server $serv, $worker_id)
  172. {
  173. processRename($serv, $worker_id);
  174. if (!$serv->taskworker)
  175. {
  176. swoole_process::signal(SIGUSR2, function($signo){
  177. echo "SIGNAL: $signo\n";
  178. });
  179. $serv->defer(function(){
  180. echo "defer call\n";
  181. });
  182. // $serv->tick(2000, function() use ($serv) {
  183. // echo "Worker-{$serv->worker_id} tick-2000\n";
  184. // });
  185. }
  186. else
  187. {
  188. // swoole_timer_after(2000, function() {
  189. // echo "after 2 secends.\n";
  190. // });
  191. // $serv->tick(1000, function ($id) use ($serv) {
  192. // if (G::$index > 10) {
  193. // $serv->after(2500, 'timer_show', 2);
  194. // G::$index = 0;
  195. // } else {
  196. // G::$index++;
  197. // }
  198. // timer_show($id);
  199. // });
  200. }
  201. //forkChildInWorker();
  202. // setTimerInWorker($serv, $worker_id);
  203. }
  204. function my_onWorkerStop($serv, $worker_id)
  205. {
  206. echo "WorkerStop[$worker_id]|pid=".$serv->worker_pid.".\n";
  207. }
  208. function my_onPacket($serv, $data, $clientInfo)
  209. {
  210. $serv->sendto($clientInfo['address'], $clientInfo['port'], "Server " . $data);
  211. var_dump($clientInfo);
  212. }
  213. function my_onReceive(swoole_server $serv, $fd, $reactor_id, $data)
  214. {
  215. my_log("Worker#{$serv->worker_pid} Client[$fd@$reactor_id]: received: $data");
  216. $cmd = trim($data);
  217. if($cmd == "reload")
  218. {
  219. $serv->reload();
  220. }
  221. elseif($cmd == "task")
  222. {
  223. $task_id = $serv->task("task ".$fd);
  224. echo "Dispath AsyncTask: id=$task_id\n";
  225. }
  226. elseif ($cmd == "taskclose")
  227. {
  228. $serv->task("close " . $fd);
  229. echo "close the connection in taskworker\n";
  230. }
  231. elseif ($cmd == "tasksend")
  232. {
  233. $serv->task("send " . $fd);
  234. }
  235. elseif ($cmd == "bigtask")
  236. {
  237. $serv->task(str_repeat('A', 8192*5));
  238. }
  239. elseif($cmd == "taskwait")
  240. {
  241. $result = $serv->taskwait("taskwait");
  242. if ($result) {
  243. $serv->send($fd, "taskwaitok");
  244. }
  245. echo "SyncTask: result=".var_export($result, true)."\n";
  246. }
  247. elseif($cmd == "taskWaitMulti")
  248. {
  249. $result = $serv->taskWaitMulti(array(
  250. str_repeat('A', 8192 * 5),
  251. str_repeat('B', 8192 * 6),
  252. str_repeat('C', 8192 * 8)
  253. ));
  254. if ($result)
  255. {
  256. $resp = "taskWaitMulti ok\n";
  257. foreach($result as $k => $v)
  258. {
  259. $resp .= "result[$k] length=".strlen($v)."\n";
  260. }
  261. $serv->send($fd, $resp);
  262. }
  263. else
  264. {
  265. $serv->send($fd, "taskWaitMulti error\n");
  266. }
  267. }
  268. elseif ($cmd == "hellotask")
  269. {
  270. $serv->task("hellotask");
  271. }
  272. elseif ($cmd == "taskcallback")
  273. {
  274. $serv->task("taskcallback", -1, function (swoole_server $serv, $task_id, $data)
  275. {
  276. echo "Task Callback: ";
  277. var_dump($task_id, $data);
  278. });
  279. }
  280. elseif ($cmd == "sendto")
  281. {
  282. $serv->sendto("127.0.0.1", 9999, "hello world");
  283. }
  284. elseif($cmd == "close")
  285. {
  286. $serv->send($fd, "close connection\n");
  287. $result = $serv->close($fd);
  288. }
  289. elseif($cmd == "info")
  290. {
  291. $info = $serv->connection_info(strval($fd), $reactor_id);
  292. var_dump($info["remote_ip"]);
  293. $serv->send($fd, 'Info: '.var_export($info, true).PHP_EOL);
  294. }
  295. elseif ($cmd == 'proxy')
  296. {
  297. $serv->send(1, "hello world\n");
  298. }
  299. elseif ($cmd == 'sleep')
  300. {
  301. sleep(10);
  302. }
  303. elseif ($cmd == 'foreach')
  304. {
  305. foreach($serv->connections as $fd)
  306. {
  307. echo "conn : $fd\n";
  308. }
  309. return;
  310. }
  311. elseif ($cmd == 'tick')
  312. {
  313. $serv->tick(2000, function ($id) {
  314. echo "tick #$id\n";
  315. });
  316. }
  317. elseif ($cmd == 'addtimer')
  318. {
  319. $serv->addtimer(3000);
  320. }
  321. elseif($cmd == "list")
  322. {
  323. $start_fd = 0;
  324. echo "broadcast\n";
  325. while(true)
  326. {
  327. $conn_list = $serv->connection_list($start_fd, 10);
  328. if (empty($conn_list))
  329. {
  330. echo "iterates finished\n";
  331. break;
  332. }
  333. $start_fd = end($conn_list);
  334. var_dump($conn_list);
  335. }
  336. }
  337. elseif($cmd == "list2")
  338. {
  339. foreach($serv->connections as $con)
  340. {
  341. var_dump($serv->connection_info($con));
  342. }
  343. }
  344. elseif($cmd == "stats")
  345. {
  346. $serv_stats = $serv->stats();
  347. $serv->send($fd, 'Stats: '.var_export($serv_stats, true)."\ncount=".count($serv->connections).PHP_EOL);
  348. }
  349. elseif($cmd == "broadcast")
  350. {
  351. broadcast($serv, $fd, "hello from $fd\n");
  352. }
  353. //这里故意调用一个不存在的函数
  354. elseif($cmd == "error")
  355. {
  356. hello_no_exists();
  357. }
  358. elseif($cmd == "exit")
  359. {
  360. exit("worker php exit.\n");
  361. }
  362. //关闭fd
  363. elseif(substr($cmd, 0, 5) == "close")
  364. {
  365. $close_fd = substr($cmd, 6);
  366. $serv->close($close_fd);
  367. }
  368. elseif($cmd == "shutdown")
  369. {
  370. $serv->shutdown();
  371. }
  372. elseif($cmd == "fatalerror")
  373. {
  374. require __DIR__.'/php/error.php';
  375. }
  376. elseif($cmd == 'defer')
  377. {
  378. $serv->defer(function() use ($fd, $serv) {
  379. $serv->close($fd);
  380. $serv->defer(function(){
  381. echo "deferd\n";
  382. });
  383. });
  384. $serv->send($fd, 'Swoole: '.$data, $reactor_id);
  385. }
  386. else
  387. {
  388. $serv->send($fd, 'Swoole: '.$data, $reactor_id);
  389. //$serv->close($fd);
  390. }
  391. //echo "Client:Data. fd=$fd|reactor_id=$reactor_id|data=$data";
  392. // $serv->after(
  393. // 800, function () {
  394. // echo "hello";
  395. // }
  396. // );
  397. //swoole_server_send($serv, $other_fd, "Server: $data", $other_reactor_id);
  398. }
  399. function my_onTask(swoole_server $serv, $task_id, $reactor_id, $data)
  400. {
  401. if ($data == 'taskwait')
  402. {
  403. $fd = str_replace('task-', '', $data);
  404. $serv->send($fd, "hello world");
  405. return array("task" => 'wait');
  406. }
  407. elseif ($data == 'taskcallback')
  408. {
  409. return array("task" => 'callback');
  410. }
  411. else
  412. {
  413. $cmd = explode(' ', $data);
  414. if ($cmd[0] == 'send')
  415. {
  416. $serv->send($cmd[1], str_repeat('A', 10000)."\n");
  417. }
  418. elseif ($cmd[0] == 'close')
  419. {
  420. $serv->close($cmd[1]);
  421. }
  422. else
  423. {
  424. echo "bigtask: length=".strlen($data)."\n";
  425. return $data;
  426. }
  427. // $serv->sendto('127.0.0.1', 9999, "hello world");
  428. //swoole_timer_after(1000, "test");
  429. // var_dump($data);
  430. // $serv->send($fd, str_repeat('A', 8192 * 2));
  431. // $serv->send($fd, str_repeat('B', 8192 * 2));
  432. // $serv->send($fd, str_repeat('C', 8192 * 2));
  433. // $serv->send($fd, str_repeat('D', 8192 * 2));
  434. return;
  435. }
  436. if ($data == "hellotask")
  437. {
  438. broadcast($serv, 0, "hellotask");
  439. }
  440. else
  441. {
  442. echo "AsyncTask[PID=".$serv->worker_pid."]: task_id=$task_id.".PHP_EOL;
  443. //eg: test-18
  444. return $data;
  445. }
  446. }
  447. function my_onFinish(swoole_server $serv, $task_id, $data)
  448. {
  449. list($str, $fd) = explode('-', $data);
  450. $serv->send($fd, 'taskok');
  451. var_dump($str, $fd);
  452. echo "AsyncTask Finish: result={$data}. PID=".$serv->worker_pid.PHP_EOL;
  453. }
  454. function my_onWorkerError(swoole_server $serv, $worker_id, $worker_pid, $exit_code, $signo)
  455. {
  456. echo "worker abnormal exit. WorkerId=$worker_id|Pid=$worker_pid|ExitCode=$exit_code|Signal=$signo\n";
  457. }
  458. function broadcast(swoole_server $serv, $fd = 0, $data = "hello")
  459. {
  460. $start_fd = 0;
  461. echo "broadcast\n";
  462. while(true)
  463. {
  464. $conn_list = $serv->connection_list($start_fd, 10);
  465. if($conn_list === false)
  466. {
  467. break;
  468. }
  469. $start_fd = end($conn_list);
  470. foreach($conn_list as $conn)
  471. {
  472. if($conn === $fd) continue;
  473. $ret1 = $serv->send($conn, $data);
  474. //var_dump($ret1);
  475. //$ret2 = $serv->close($conn);
  476. //var_dump($ret2);
  477. }
  478. }
  479. }
  480. $serv->on('PipeMessage', function($serv, $src_worker_id, $msg) {
  481. my_log("PipeMessage: Src={$src_worker_id},Msg=".trim($msg));
  482. if ($serv->taskworker)
  483. {
  484. $serv->sendMessage("hello user process",
  485. $src_worker_id);
  486. }
  487. });
  488. $serv->on('Start', 'my_onStart');
  489. $serv->on('Connect', 'my_onConnect');
  490. $serv->on('Receive', 'my_onReceive');
  491. $serv->on('Packet', 'my_onPacket');
  492. $serv->on('Close', 'my_onClose');
  493. $serv->on('Shutdown', 'my_onShutdown');
  494. $serv->on('WorkerStart', 'my_onWorkerStart');
  495. $serv->on('WorkerStop', 'my_onWorkerStop');
  496. $serv->on('Task', 'my_onTask');
  497. $serv->on('Finish', 'my_onFinish');
  498. $serv->on('WorkerError', 'my_onWorkerError');
  499. $serv->on('WorkerExit', 'my_onWorkerExit');
  500. $serv->on('ManagerStart', function($serv) {
  501. global $argv;
  502. swoole_set_process_name("php {$argv[0]}: manager");
  503. });
  504. $serv->start();