12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- <?php
- declare(strict_types=0);
- /**
- * 队列
- *
- *
- *
- *
- */
- defined('InShopNC') or exit('Access Invalid!');
- //此行代码会导致bug
- //ini_set('default_socket_timeout', -1);
- class queueControl
- {
- private $_stop = false;
- public function indexOp()
- {
- if (ob_get_level()) ob_end_clean();
- pcntl_signal(SIGINT, [$this, 'sig_handler']);
- pcntl_signal(SIGHUP, [$this, 'sig_handler']);
- pcntl_signal(SIGQUIT, [$this, 'sig_handler']);
- pcntl_signal(SIGTERM, [$this, 'sig_handler']);
- $logic_queue = Logic('queue');
- $worker = new QueueServer();
- $queues = $worker->scan();
- while (true)
- {
- pcntl_signal_dispatch();
- if ($this->_stop) break;
- try
- {
- $content = $worker->pop($queues, 1);
- $start = microtime(true);
- $mem = memory_get_usage();
- if (is_array($content))
- {
- $method = key($content);
- $arg = current($content);
- if(!method_exists($logic_queue,$method)) {
- $msg = sprintf("method=$method not exist memory=$mem time=%.6f\r\n\r\n",microtime(true) - $start);
- Log::record($msg ,Log::DEBUG);
- continue;
- }
- $result = $logic_queue->$method($arg);
- if(is_array($result)) {
- Log::record($result['msg'] ?? '', Log::DEBUG);
- }
- $msg = sprintf("method=$method memory=$mem time=%.6f\r\n\r\n",microtime(true) - $start);
- Log::record($msg ,Log::DEBUG);
- }
- }
- catch (Exception $e) {
- $err = $e->getMessage();
- $code = $e->getCode();
- Log::record("code=$code err=$err", Log::ERR);
- break;
- }
- }
- }
- private function sig_handler($signo)
- {
- Log::record("queue quit at sig_handler.", Log::DEBUG);
- switch ($signo) {
- case SIGINT:
- case SIGHUP:
- case SIGQUIT:
- case SIGTERM:
- $this->_stop = true;
- break;
- default:
- break;
- }
- }
- }
|