1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- <?php
- declare(strict_types=0);
- //此行代码会导致bug
- //ini_set('default_socket_timeout', -1);
- class queuehandler
- {
- private $_stop = false;
- public function run()
- {
- global $config;
- $queue_name = $config['net_queue']['name'];
- $host = $config['net_queue']['host'];
- $port = $config['net_queue']['port'];
- if (ob_get_level()) ob_end_clean();
- pcntl_signal(SIGINT, [$this,'sig_handler']);
- pcntl_signal(SIGHUP, [$this,'sig_handler']);
- pcntl_signal(SIGQUIT, [$this,'sig_handler']);
- pcntl_signal(SIGTERM, [$this,'sig_handler']);
- $logic_queue = new queue_logic();
- $worker = new QueueServer($queue_name,$host,$port);
- $queues = $worker->scan();
- echo $queues;
- while (true)
- {
- pcntl_signal_dispatch();
- try
- {
- if ($this->_stop) break;
- $content = $worker->pop($queues, 1);
- if(is_array($content))
- {
- $method = key($content);
- $arg = current($content);
- $argstr = json_encode($arg,JSON_UNESCAPED_UNICODE);
- Log::record("method={$method} args={$argstr}",Log::DEBUG);
- $result = $logic_queue->$method($arg);
- if (!$result['state']) {
- Log::record("{$method} run error: {$result['msg']}",Log::DEBUG);
- }
- }
- }
- catch (Exception $e)
- {
- $err = $e->getMessage();
- $code = $e->getCode();
- Log::record("QueueDB pop err: code={$code} err={$err}",Log::DEBUG);
- break;
- }
- }
- }
- private function sig_handler($signo)
- {
- Log::record("queue quit at sig_handler.",Log::DEBUG);
- switch($signo) {
- case SIGINT:
- case SIGHUP:
- case SIGQUIT:
- case SIGTERM:
- $this->_stop = true;
- break;
- default:
- break;
- }
- }
- }
|