queuehandler.php 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. <?php
  2. declare(strict_types=0);
  3. //此行代码会导致bug
  4. //ini_set('default_socket_timeout', -1);
  5. use think\facade\Log;
  6. class queuehandler
  7. {
  8. private $_stop = false;
  9. public function run()
  10. {
  11. global $config;
  12. $queue_name = $config['net_queue']['name'];
  13. $host = $config['net_queue']['host'];
  14. $port = $config['net_queue']['port'];
  15. if (ob_get_level()) ob_end_clean();
  16. pcntl_signal(SIGINT, [$this,'sig_handler']);
  17. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  18. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  19. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  20. $logic_queue = new queue_logic();
  21. $worker = new QueueServer($queue_name,$host,$port);
  22. $queues = $worker->scan();
  23. echo $queues;
  24. while (true)
  25. {
  26. pcntl_signal_dispatch();
  27. try
  28. {
  29. if ($this->_stop) break;
  30. echo 2;
  31. $content = $worker->pop($queues, 1);
  32. if(is_array($content))
  33. {
  34. $method = key($content);
  35. $arg = current($content);
  36. $argstr = json_encode($arg,JSON_UNESCAPED_UNICODE);
  37. Log::record("method={$method} args={$argstr}",'debug');
  38. $result = $logic_queue->$method($arg);
  39. if (!$result['state']) {
  40. Log::record("{$method} run error: {$result['msg']}",'debug');
  41. }
  42. }
  43. }
  44. catch (Exception $e)
  45. {
  46. $err = $e->getMessage();
  47. $code = $e->getCode();
  48. Log::record("QueueDB pop err: code={$code} err={$err}",'debug');
  49. break;
  50. }
  51. }
  52. }
  53. private function sig_handler($signo)
  54. {
  55. Log::record("queue quit at sig_handler.",Log::DEBUG);
  56. switch($signo) {
  57. case SIGINT:
  58. case SIGHUP:
  59. case SIGQUIT:
  60. case SIGTERM:
  61. $this->_stop = true;
  62. break;
  63. default:
  64. break;
  65. }
  66. }
  67. }