queuehandler.php 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. <?php
  2. declare(strict_types=0);
  3. //此行代码会导致bug
  4. //ini_set('default_socket_timeout', -1);
  5. class queuehandler
  6. {
  7. private $_stop = false;
  8. public function run()
  9. {
  10. global $other_config;
  11. $queue_name = $other_config['net_queue']['name'];
  12. $host = $other_config['net_queue']['host'];
  13. $port = $other_config['net_queue']['port'];
  14. if (ob_get_level()) ob_end_clean();
  15. // pcntl_signal(SIGINT, [$this,'sig_handler']);
  16. // pcntl_signal(SIGHUP, [$this,'sig_handler']);
  17. // pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  18. // pcntl_signal(SIGTERM, [$this,'sig_handler']);
  19. $logic_queue = new queue_logic();
  20. $worker = new QueueServer($queue_name,$host,$port);
  21. $queues = $worker->scan();
  22. while (true)
  23. {
  24. pcntl_signal_dispatch();
  25. try
  26. {
  27. if ($this->_stop) break;
  28. $content = $worker->pop($queues, 1);
  29. if(is_array($content))
  30. {
  31. $method = key($content);
  32. $arg = current($content);
  33. $argstr = json_encode($arg,JSON_UNESCAPED_UNICODE);
  34. Log::record("method={$method} args={$argstr}",Log::DEBUG);
  35. $result = $logic_queue->$method($arg);
  36. if ($result['code'] != 0) {
  37. Log::record("{$method} run error: {$result['msg']}",Log::ERR);
  38. }
  39. }
  40. }
  41. catch (Exception $e)
  42. {
  43. $err = $e->getMessage();
  44. $code = $e->getCode();
  45. Log::record("QueueDB pop err: code={$code} err={$err}",Log::DEBUG);
  46. break;
  47. }
  48. }
  49. }
  50. private function sig_handler($signo)
  51. {
  52. Log::record("queue quit at sig_handler.",Log::DEBUG);
  53. switch($signo) {
  54. case SIGINT:
  55. case SIGHUP:
  56. case SIGQUIT:
  57. case SIGTERM:
  58. $this->_stop = true;
  59. break;
  60. default:
  61. break;
  62. }
  63. }
  64. }