task_stream.php 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. <?php
  2. $serv = new swoole_server("127.0.0.1", 9501, SWOOLE_BASE);
  3. $serv->set(array(
  4. 'worker_num' => 1,
  5. 'task_worker_num' => 1,
  6. 'task_ipc_mode' => 4,
  7. // 'message_queue_key' => 0x70001001,
  8. ));
  9. $serv->on('Receive', function(swoole_server $serv, $fd, $reactor_id, $data) {
  10. //AsyncTask
  11. $data = trim($data);
  12. //$data = str_repeat('A', 8192*100);
  13. // if ($data == 'async')
  14. // if(false)
  15. if (true)
  16. {
  17. $task_id = $serv->task($data, 0);
  18. $serv->send($fd, "Dispath AsyncTask: id=$task_id\n");
  19. }
  20. //Sync Task
  21. else
  22. {
  23. $res = $serv->taskwait($data, 10);
  24. echo "Dispath SyncTask: result=".$res.PHP_EOL;
  25. }
  26. //$serv->send($fd, "OK\n");
  27. });
  28. $serv->on('Task', function (swoole_server $serv, $task_id, $reactor_id, $data) {
  29. echo "#{$serv->worker_id}\tonTask: [PID={$serv->worker_pid}]: task_id=$task_id, data_len=".strlen($data).".".PHP_EOL;
  30. $serv->finish($data);
  31. return $data;
  32. });
  33. $serv->on('Finish', function (swoole_server $serv, $task_id, $data) {
  34. echo "Task#$task_id finished, data_len=".strlen($data).PHP_EOL;
  35. });
  36. $serv->on('workerStart', function($serv, $worker_id) {
  37. global $argv;
  38. if ($serv->taskworker)
  39. {
  40. swoole_set_process_name("php {$argv[0]}: task_worker");
  41. }
  42. else
  43. {
  44. swoole_set_process_name("php {$argv[0]}: worker");
  45. }
  46. echo "Worker#$worker_id, pid=".posix_getpid()." start".PHP_EOL;
  47. });
  48. $serv->start();