queue.php 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. <?php
  2. /**
  3. * 队列
  4. *
  5. *
  6. *
  7. *
  8. */
  9. defined('InShopNC') or exit('Access Invalid!');
  10. //此行代码会导致bug
  11. //ini_set('default_socket_timeout', -1);
  12. function sig_handler($signo)
  13. {
  14. switch($signo) {
  15. case SIGINT:
  16. break;
  17. default:
  18. break;
  19. }
  20. }
  21. class queueControl extends BaseCronControl
  22. {
  23. private $_stop = false;
  24. public function indexOp()
  25. {
  26. if (ob_get_level()) ob_end_clean();
  27. // pcntl_signal(SIGINT, array($this,'sig_handler'));
  28. pcntl_signal(SIGINT, 'sig_handler');
  29. $logic_queue = Logic('queue');
  30. $worker = new QueueServer();
  31. $queues = $worker->scan();
  32. while (true)
  33. {
  34. pcntl_signal_dispatch();
  35. if ($this->_stop) {
  36. exit;
  37. }
  38. $content = $worker->pop($queues,60);
  39. if($content == false) continue;
  40. if (is_array($content))
  41. {
  42. $method = key($content);
  43. $arg = current($content);
  44. $result = $logic_queue->$method($arg);
  45. if (!$result['state']) {
  46. $this->log($result['msg'],false);
  47. }
  48. }
  49. else
  50. {
  51. $model = Model();
  52. $model->checkActive();
  53. unset($model);
  54. }
  55. }
  56. }
  57. private function sig_handler($signo) {
  58. switch($signo) {
  59. case SIGINT:
  60. $this->_stop = true;
  61. break;
  62. default:
  63. break;
  64. }
  65. }
  66. }