queuehandler.php 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. <?php
  2. declare(strict_types=0);
  3. //此行代码会导致bug
  4. //ini_set('default_socket_timeout', -1);
  5. class queuehandler
  6. {
  7. private $_stop = false;
  8. public function run()
  9. {
  10. global $config;
  11. $queue_name = $config['net_queue']['name'];
  12. $host = $config['net_queue']['host'];
  13. $port = $config['net_queue']['port'];
  14. if (ob_get_level()) ob_end_clean();
  15. pcntl_signal(SIGINT, [$this,'sig_handler']);
  16. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  17. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  18. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  19. $logic_queue = new queue_logic();
  20. $worker = new QueueServer($queue_name,$host,$port);
  21. $queues = $worker->scan();
  22. echo $queues;
  23. while (true)
  24. {
  25. pcntl_signal_dispatch();
  26. try
  27. {
  28. if ($this->_stop) break;
  29. $content = $worker->pop($queues, 1);
  30. if(is_array($content))
  31. {
  32. $method = key($content);
  33. $arg = current($content);
  34. $argstr = json_encode($arg,JSON_UNESCAPED_UNICODE);
  35. Log::record("method={$method} args={$argstr}",Log::DEBUG);
  36. $result = $logic_queue->$method($arg);
  37. if (!$result['state']) {
  38. Log::record("{$method} run error: {$result['msg']}",Log::DEBUG);
  39. }
  40. }
  41. }
  42. catch (Exception $e)
  43. {
  44. $err = $e->getMessage();
  45. $code = $e->getCode();
  46. Log::record("QueueDB pop err: code={$code} err={$err}",Log::DEBUG);
  47. break;
  48. }
  49. }
  50. }
  51. private function sig_handler($signo)
  52. {
  53. Log::record("queue quit at sig_handler.",Log::DEBUG);
  54. switch($signo) {
  55. case SIGINT:
  56. case SIGHUP:
  57. case SIGQUIT:
  58. case SIGTERM:
  59. $this->_stop = true;
  60. break;
  61. default:
  62. break;
  63. }
  64. }
  65. }