_tb_prefix = $queue_name; $this->_comode = $comode; if ($this->_comode) { $this->_redis = new \Swoole\Coroutine\Redis(); $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port')); } else { $this->_redis = new Redis(); $ret = $this->_redis->connect(C('queue.host'), C('queue.port'), 20); } $this->connect(); } public function connect(): bool { if ($this->_comode) { if ($this->_redis->connected) return true; $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port')); Log::record("Swoole\Coroutine\Redis connect ret = {$ret}", Log::DEBUG); return $this->_redis->connected; } else { if ($this->_redis->isConnected()) return true; $ret = $this->_redis->connect(C('queue.host'), C('queue.port'), 20); Log::record("Redis connect ret = {$ret}", Log::DEBUG); return $this->_redis->isConnected(); } } public function rpush($value) { try { return $this->_redis->rPush($this->_tb_prefix.rand(1,$this->_tb_num),$value); } catch(Exception $e) { throw_exception($e->getMessage()); } } public function lpush($value) { try { return $this->_redis->lPush($this->_tb_prefix.rand(1,$this->_tb_num),$value); } catch(Exception $e) { throw_exception($e->getMessage()); } } public function scan() { $list_key = []; for($i=1;$i<=$this->_tb_num;$i++) { $list_key[] = $this->_tb_prefix.$i; } return $list_key; } public function rpop($key, $time) { $result = $this->_redis->brPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function lpop($key, $time) { $result = $this->_redis->blPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function clear() { $this->_redis->flushAll(); } } /** * 队列处理 * * * @package */ class IClient { private $mQueuedb; public function __construct($queueDb) { $this->mQueuedb = $queueDb; } public function push($key, $value) { return $this->mQueuedb->lpush(serialize([$key=>$value])); } } class IServer { private $_queuedb; public function __construct($queueDb) { $this->_queuedb = $queueDb; } public function connect() : bool { return $this->_queuedb->connect(); } public function pop($key,$time) { $result = $this->_queuedb->rpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function lpop($key,$time) { $result = $this->_queuedb->lpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function scan() { return $this->_queuedb->scan(); } } abstract class ILooper { private $_stop = false; private $mServer; const MAX_COROUTINE = 1000; protected function __construct($server) { $this->mServer = $server; } public function prepare() { if (ob_get_level()) ob_end_clean(); pcntl_signal(SIGINT, [$this,'sig_handler']); pcntl_signal(SIGHUP, [$this,'sig_handler']); pcntl_signal(SIGQUIT, [$this,'sig_handler']); pcntl_signal(SIGTERM, [$this,'sig_handler']); } abstract protected function handle($msg); public function run() { $queues = $this->mServer->scan(); while (true) { try { if ($this->_stop) break; if(defined('USE_COROUTINE') && USE_COROUTINE) { $res = \Swoole\Coroutine::stats(); $num = $res['coroutine_num']; if($num < ILooper::MAX_COROUTINE) { if($this->mServer->connect() == false) { \Swoole\Coroutine::sleep(1); Log::record("Processor redis disconnect.",Log::ERR); continue; } $content = $this->mServer->pop($queues,1); if(empty($content)) continue; go(function ()use ($content) { $this->handle($content); }); } else { \Swoole\Coroutine::sleep(0.1); } } else { if($this->mServer->connect() == false) { sleep(1); continue; } $content = $this->mServer->pop($queues,1); if(empty($content)) continue; perfor_clear(); perfor_start(); $this->handle($content); perfor_end('Handle Request'); $info = perfor_log(); } Log::record("{$info} \r\n",Log::DEBUG); } catch (Exception $e) { $err = $e->getMessage(); $code = $e->getCode(); Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR); } } } private function sig_handler($signo) { Log::record("queue quit at sig_handler.",Log::DEBUG); switch($signo) { case SIGINT: case SIGHUP: case SIGQUIT: case SIGTERM: $this->_stop = true; break; default: break; } } }