_queue_name = $queue_name; $this->_redis = new Redis(); $this->connect(); } public function connect(): bool { if ($this->_redis->isConnected()) { return true; } else { $this->close(); $ret = $this->_redis->connect(C('queue.host'), C('queue.port')); Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG); return $this->_redis->isConnected(); } } public function close() { if ($this->_redis->isConnected()) { $this->_redis->close(); } else { Log::record("redis has closed",Log::DEBUG); } } public function rpush($value) { try { if($this->connect()) { $ret = $this->_redis->rPush($this->_queue_name, $value); return $ret; } else { Log::record("IQueueDB::rpush connect=false", Log::DEBUG); return false; } } catch(Exception $e) { Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR); return false; } } public function lpush($value) { try { if ($this->connect()) { $ret = $this->_redis->lPush($this->_queue_name, $value); return $ret; } else { Log::record("IQueueDB::lpush connect=false", Log::DEBUG); return false; } } catch(Exception $e) { Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR); return false; } } public function scan() { $list_key[] = $this->_queue_name; return $list_key; } public function rpop() { $key = $this->_queue_name; $result = $this->_redis->rPop($key); return $result; } public function lpop() { $key = $this->_queue_name; $result = $this->_redis->lPop($key); return $result; } public function brpop($key, $time) { $result = $this->_redis->brPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function blpop($key, $time) { $result = $this->_redis->blPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function clear() { $this->_redis->flushAll(); } } /** * 队列处理 * * * @package */ class IClient { private $mQueuedb; public function __construct($queueDb) { $this->mQueuedb = $queueDb; } public function push($key, $value) { return $this->mQueuedb->lpush(serialize([$key=>$value])); } } class IServer { private $_queuedb; public function __construct($queueDb) { $this->_queuedb = $queueDb; } public function connect() : bool { return $this->_queuedb->connect(); } public function rpop() { $result = $this->_queuedb->rpop(); if($result != null) { return unserialize($result); } else { return false; } } public function lpop() { $result = $this->_queuedb->lpop(); if($result != null) { return unserialize($result); } else { return false; } } public function brpop($key,$time) { $result = $this->_queuedb->brpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function blpop($key,$time) { $result = $this->_queuedb->lpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function scan() { return $this->_queuedb->scan(); } public function stop() { $this->_queuedb->close(); } } abstract class ILooper { private $_stop = false; private $_pause = 0; // 0,正常运行,1,申请暂停,2,暂停成功 private $_cid = 0; private $mServer; const MAX_COROUTINE = 500; protected function __construct($server) { $this->mServer = $server; } public function prepare() { if (ob_get_level()) ob_end_clean(); pcntl_signal(SIGINT, [$this,'sig_handler']); pcntl_signal(SIGHUP, [$this,'sig_handler']); pcntl_signal(SIGQUIT, [$this,'sig_handler']); pcntl_signal(SIGTERM, [$this,'sig_handler']); } abstract protected function handle($msg); public function pause() { if($this->_pause != 0) { Log::record("subcoroutine pause state={$this->_pause} cannot pause.",Log::DEBUG); return; } $this->_pause = 1; do{ Swoole\Coroutine::sleep(1); } while($this->_pause == 1); $this->wait(); Log::record("subcoroutine pause state={$this->_pause} success.",Log::DEBUG); } public function resume() { if($this->_pause == 2) { $this->_pause = 0; Swoole\Coroutine::resume($this->_cid); } Log::record("subcoroutine resume success.",Log::DEBUG); } public function stop() { Log::record(__FUNCTION__,Log::DEBUG); $this->_stop = true; $this->wait(); } private function wait() { do { $res = Swoole\Coroutine::stats(); $num = $res['coroutine_num']; if($num > 10) { Swoole\Coroutine::sleep(1); } } while($num > 10); do { $count = 0; $coros = Swoole\Coroutine::list(); foreach ($coros as $cid) { $pcid = Swoole\Coroutine::getPcid($cid); if($pcid == $this->_cid) { $count += 1; } } if($count > 0) { Swoole\Coroutine::sleep(1); } Log::record("wait running subcoroutine count = {$count} quit.",Log::DEBUG); } while($count > 0); Log::record("subcoroutine wait: quit all",Log::DEBUG); } public function run() { refill\RefillFactory::instance(); $this->_cid = Swoole\Coroutine::getCid(); $queues = $this->mServer->scan(); while (true) { try { if ($this->_stop) break; if($this->_pause == 1) { Log::record("subcoroutine runlooper pause.",Log::DEBUG); $this->_pause = 2; Swoole\Coroutine::suspend(); Log::record("subcoroutine runlooper resume success.",Log::DEBUG); } perfor_clear(); if(defined('USE_COROUTINE') && USE_COROUTINE) { $res = Swoole\Coroutine::stats(); $num = $res['coroutine_num']; $mem = memory_get_usage(); if($num < ILooper::MAX_COROUTINE) { if($this->mServer->connect() == false) { Swoole\Coroutine::sleep(1); Log::record("Processor redis disconnect.",Log::ERR); continue; } if(defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) { $content = $this->mServer->brpop($queues,1); if(empty($content)) { continue; } } else { $content = $this->mServer->rpop(); if(empty($content)) { Swoole\Coroutine::sleep(1); continue; } } if($this->_stop) { foreach ($content as $key => $params) { util::push_queue($key, $params); } } else { go(function () use ($content, $num, $mem) { $start = microtime(true); Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}", Log::DEBUG); $method = $this->handle($content); $use_time = microtime(true) - $start; $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}", $use_time); Log::record($msg, Log::DEBUG); }); } } else { Swoole\Coroutine::sleep(0.1); } } else { if($this->mServer->connect() == false) { sleep(1); continue; } $content = $this->mServer->pop($queues,1); if(empty($content)) continue; if($this->_stop) { foreach ($content as $key => $params) { DispatcherClient::instance()->push($key, $params); } } else { perfor_clear(); perfor_start(); $this->handle($content); perfor_end('Handle Request'); $info = perfor_log(); Log::record("{$info} \r\n",Log::DEBUG); } } } catch (Exception $e) { $err = $e->getMessage(); $code = $e->getCode(); Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR); } } Log::record("ILooper Run quit.", Log::DEBUG); } private function sig_handler($signo) { Log::record("queue quit at sig_handler.",Log::DEBUG); switch($signo) { case SIGINT: case SIGHUP: case SIGQUIT: case SIGTERM: $this->_stop = true; break; default: break; } } }