queue.php 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. <?php
  2. /**
  3. * 队列
  4. *
  5. *
  6. *
  7. *
  8. */
  9. defined('InShopNC') or exit('Access Invalid!');
  10. ini_set('default_socket_timeout', -1);
  11. class queueControl extends BaseCronControl
  12. {
  13. private $_stop = false;
  14. public function indexOp() {
  15. if (ob_get_level()) ob_end_clean();
  16. pcntl_signal(SIGINT, array(&$this,'sig_handler'));
  17. $logic_queue = Logic('queue');
  18. $worker = new QueueServer();
  19. $queues = $worker->scan();
  20. while (true) {
  21. pcntl_signal_dispatch();
  22. if ($this->_stop) {
  23. exit;
  24. }
  25. $content = $worker->pop($queues,3);
  26. if (is_array($content)) {
  27. $method = key($content);
  28. $arg = current($content);
  29. $result = $logic_queue->$method($arg);
  30. if (!$result['state']) {
  31. $this->log($result['msg'],false);
  32. }
  33. } else {
  34. $model = Model();
  35. $model->checkActive();
  36. unset($model);
  37. }
  38. }
  39. }
  40. private function sig_handler($signo) {
  41. switch($signo) {
  42. case SIGINT:
  43. $this->_stop = true;
  44. break;
  45. default:
  46. break;
  47. }
  48. }
  49. }