_queue_name = $queue_name; $this->_comode = $comode; if ($this->_comode) { $this->_redis = new Swoole\Coroutine\Redis(); } else { $this->_redis = new Redis(); } $this->connect(); } public function connect(): bool { if ($this->_comode) { if ($this->_redis->connected) { return true; } else { $this->close(); $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port')); Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG); return $this->_redis->connected; } } elseif ($this->_redis->isConnected()) { return true; } else { $this->close(); $ret = $this->_redis->connect(C('queue.host'), C('queue.port')); Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG); return $this->_redis->isConnected(); } } public function close() { if ($this->_comode) { if ($this->_redis->connected) { $this->_redis->close(); } } elseif ($this->_redis->isConnected()) { $this->_redis->close(); } else { Log::record("redis has closed",Log::DEBUG); } } public function rpush($value) { try { if($this->connect()) { $ret = $this->_redis->rPush($this->_queue_name, $value); return $ret; } else { Log::record("IQueueDB::rpush connect=false", Log::DEBUG); return false; } } catch(Exception $e) { Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR); return false; } } public function lpush($value) { try { if ($this->connect()) { $ret = $this->_redis->lPush($this->_queue_name, $value); return $ret; } else { Log::record("IQueueDB::lpush connect=false", Log::DEBUG); return false; } } catch(Exception $e) { Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR); return false; } } public function scan() { $list_key[] = $this->_queue_name; return $list_key; } public function rpop() { $key = $this->_queue_name; $result = $this->_redis->rPop($key); return $result; } public function lpop() { $key = $this->_queue_name; $result = $this->_redis->lPop($key); return $result; } public function brpop($key, $time) { $result = $this->_redis->brPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function blpop($key, $time) { $result = $this->_redis->blPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function clear() { $this->_redis->flushAll(); } } /** * 队列处理 * * * @package */ class IClient { private $mQueuedb; public function __construct($queueDb) { $this->mQueuedb = $queueDb; } public function push($key, $value) { return $this->mQueuedb->lpush(serialize([$key=>$value])); } } class IServer { private $_queuedb; public function __construct($queueDb) { $this->_queuedb = $queueDb; } public function connect() : bool { return $this->_queuedb->connect(); } public function rpop() { $result = $this->_queuedb->rpop(); if($result != null) { return unserialize($result); } else { return false; } } public function lpop() { $result = $this->_queuedb->lpop(); if($result != null) { return unserialize($result); } else { return false; } } public function brpop($key,$time) { $result = $this->_queuedb->brpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function blpop($key,$time) { $result = $this->_queuedb->lpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function scan() { return $this->_queuedb->scan(); } public function stop() { $this->_queuedb->close(); } } abstract class ILooper { private $_stop = false; private $mServer; const MAX_COROUTINE = 200; protected function __construct($server) { $this->mServer = $server; } public function prepare() { if (ob_get_level()) ob_end_clean(); pcntl_signal(SIGINT, [$this,'sig_handler']); pcntl_signal(SIGHUP, [$this,'sig_handler']); pcntl_signal(SIGQUIT, [$this,'sig_handler']); pcntl_signal(SIGTERM, [$this,'sig_handler']); } abstract protected function handle($msg); public function stop() { Log::record(__FUNCTION__,Log::DEBUG); $this->_stop = true; } public function run() { $queues = $this->mServer->scan(); while (true) { try { if ($this->_stop) break; perfor_clear(); if(defined('USE_COROUTINE') && USE_COROUTINE) { $res = Swoole\Coroutine::stats(); $num = $res['coroutine_num']; $mem = memory_get_usage(); if($num < ILooper::MAX_COROUTINE) { if($this->mServer->connect() == false) { Swoole\Coroutine::sleep(1); Log::record("Processor redis disconnect.",Log::ERR); continue; } if(defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) { $content = $this->mServer->brpop($queues,1); if(empty($content)) { continue; } } else { $content = $this->mServer->rpop(); if(empty($content)) { Swoole\Coroutine::sleep(1); continue; } } if($this->_stop) { foreach ($content as $key => $params) { util::push_queue($key, $params); } } else { go(function ()use ($content,$num,$mem) { $start = microtime(true); Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}",Log::DEBUG); $method = $this->handle($content); $use_time = microtime(true) - $start; $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}",$use_time); Log::record($msg,Log::DEBUG); }); } } else { Swoole\Coroutine::sleep(0.1); } } else { if($this->mServer->connect() == false) { sleep(1); continue; } $content = $this->mServer->pop($queues,1); if(empty($content)) continue; if($this->_stop) { foreach ($content as $key => $params) { DispatcherClient::instance()->push($key, $params); } } else { perfor_clear(); perfor_start(); $this->handle($content); perfor_end('Handle Request'); $info = perfor_log(); Log::record("{$info} \r\n",Log::DEBUG); } } } catch (Exception $e) { $err = $e->getMessage(); $code = $e->getCode(); Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR); } } Log::record("ILooper Run quit.", Log::DEBUG); } private function sig_handler($signo) { Log::record("queue quit at sig_handler.",Log::DEBUG); switch($signo) { case SIGINT: case SIGHUP: case SIGQUIT: case SIGTERM: $this->_stop = true; break; default: break; } } }