123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- <?php
- declare(strict_types=1);
- namespace queue;
- require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
- use Redis;
- use Exception;
- use Log;
- use Swoole;
- class IQueueDB
- {
- private $_redis;
- private $_queue_name;
- public function __construct($queue_name)
- {
- if ( !extension_loaded('redis') ) {
- throw_exception('redis failed to load');
- }
- $this->_queue_name = $queue_name;
- $this->_redis = new Redis();
- $this->connect();
- }
- public function connect(): bool
- {
- if ($this->_redis->isConnected()) {
- return true;
- }
- else {
- $this->close();
- $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
- Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
- return $this->_redis->isConnected();
- }
- }
- public function close()
- {
- if ($this->_redis->isConnected()) {
- $this->_redis->close();
- } else {
- Log::record("redis has closed",Log::DEBUG);
- }
- }
- public function rpush($value)
- {
- try
- {
- if($this->connect()) {
- $ret = $this->_redis->rPush($this->_queue_name, $value);
- return $ret;
- } else {
- Log::record("IQueueDB::rpush connect=false", Log::DEBUG);
- return false;
- }
- }
- catch(Exception $e) {
- Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
- return false;
- }
- }
- public function lpush($value)
- {
- try
- {
- if ($this->connect()) {
- $ret = $this->_redis->lPush($this->_queue_name, $value);
- return $ret;
- } else {
- Log::record("IQueueDB::lpush connect=false", Log::DEBUG);
- return false;
- }
- }
- catch(Exception $e) {
- Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
- return false;
- }
- }
- public function scan()
- {
- $list_key[] = $this->_queue_name;
- return $list_key;
- }
- public function rpop()
- {
- $key = $this->_queue_name;
- $result = $this->_redis->rPop($key);
- return $result;
- }
- public function lpop()
- {
- $key = $this->_queue_name;
- $result = $this->_redis->lPop($key);
- return $result;
- }
- public function brpop($key, $time)
- {
- $result = $this->_redis->brPop($key, $time);
- if ($result) {
- return $result[1];
- } else {
- return null;
- }
- }
- public function blpop($key, $time)
- {
- $result = $this->_redis->blPop($key, $time);
- if ($result) {
- return $result[1];
- } else {
- return null;
- }
- }
- public function clear() {
- $this->_redis->flushAll();
- }
- }
- /**
- * 队列处理
- *
- *
- * @package
- */
- class IClient
- {
- private $mQueuedb;
- public function __construct($queueDb)
- {
- $this->mQueuedb = $queueDb;
- }
- public function push($key, $value)
- {
- return $this->mQueuedb->lpush(serialize([$key=>$value]));
- }
- }
- class IJsonClient
- {
- private $mQueuedb;
- public function __construct($queueDb)
- {
- $this->mQueuedb = $queueDb;
- }
- public function push($key, $value)
- {
- return $this->mQueuedb->lpush(json_encode(['method' => $key, 'params' => $value]));
- }
- }
- class IServer
- {
- private $_queuedb;
-
- public function __construct($queueDb) {
- $this->_queuedb = $queueDb;
- }
- public function connect() : bool
- {
- return $this->_queuedb->connect();
- }
- public function rpop()
- {
- $result = $this->_queuedb->rpop();
- if($result != null) {
- return unserialize($result);
- } else {
- return false;
- }
- }
- public function lpop()
- {
- $result = $this->_queuedb->lpop();
- if($result != null) {
- return unserialize($result);
- } else {
- return false;
- }
- }
- public function brpop($key,$time)
- {
- $result = $this->_queuedb->brpop($key,$time);
- if($result != null) {
- return unserialize($result);
- } else {
- return false;
- }
- }
- public function blpop($key,$time)
- {
- $result = $this->_queuedb->lpop($key,$time);
- if($result != null) {
- return unserialize($result);
- } else {
- return false;
- }
- }
- public function scan() {
- return $this->_queuedb->scan();
- }
- public function stop()
- {
- $this->_queuedb->close();
- }
- }
- abstract class ILooper
- {
- private $_stop = false;
- private $_pause = 0; // 0,正常运行,1,申请暂停,2,暂停成功
- private $_cid = 0;
- private $mServer;
- const MAX_COROUTINE = 300;
- protected function __construct($server)
- {
- $this->mServer = $server;
- }
- public function prepare()
- {
- if (ob_get_level()) ob_end_clean();
- pcntl_signal(SIGINT, [$this,'sig_handler']);
- pcntl_signal(SIGHUP, [$this,'sig_handler']);
- pcntl_signal(SIGQUIT, [$this,'sig_handler']);
- pcntl_signal(SIGTERM, [$this,'sig_handler']);
- }
- abstract protected function handle($msg);
- public function pause()
- {
- if($this->_pause != 0) {
- Log::record("subcoroutine pause state={$this->_pause} cannot pause.",Log::DEBUG);
- return;
- }
- $this->_pause = 1;
- do{
- Swoole\Coroutine::sleep(1);
- } while($this->_pause == 1);
- $this->wait();
- Log::record("subcoroutine pause state={$this->_pause} success.",Log::DEBUG);
- }
- public function resume()
- {
- if($this->_pause == 2) {
- $this->_pause = 0;
- Swoole\Coroutine::resume($this->_cid);
- }
- Log::record("subcoroutine resume success.",Log::DEBUG);
- }
- public function stop()
- {
- Log::record(__FUNCTION__,Log::DEBUG);
- $this->_stop = true;
- $this->wait();
- }
- private function wait()
- {
- $coroutine_num = function()
- {
- $res = Swoole\Coroutine::stats();
- $num = $res['coroutine_num'];
- return $num;
- };
- do {
- $num = $coroutine_num();
- if($num > 10) {
- Swoole\Coroutine::sleep(1);
- }
- } while($num > 10);
- do
- {
- $count = 0;
- $coros = Swoole\Coroutine::list();
- foreach ($coros as $cid)
- {
- $pcid = Swoole\Coroutine::getPcid($cid);
- if($pcid == $this->_cid) {
- $count += 1;
- $time_secs = Swoole\Coroutine::getElapsed($cid);
- $backtrace = Swoole\Coroutine::getBackTrace($cid);
- $trace = "wait Coroutine quit elapsed cid={$cid} seconds={$time_secs}\n";
- foreach ($backtrace as $item) {
- $trace .= "{$item['file']}\t{$item['line']}\t{$item['function']}\n";
- }
- Log::record($trace,Log::DEBUG);
- }
- }
- if($count > 0) {
- Swoole\Coroutine::sleep(1);
- }
- Log::record("wait running subcoroutine count = {$count} quit.",Log::DEBUG);
- }
- while($count > 0);
- $num = $coroutine_num();
- Log::record("subcoroutine wait: quit all. cur coroutine num={$num}",Log::DEBUG);
- }
- public function run()
- {
- $this->_cid = Swoole\Coroutine::getCid();
- $queues = $this->mServer->scan();
- while (true)
- {
- try
- {
- if ($this->_stop) break;
- if($this->_pause == 1) {
- Log::record("subcoroutine runlooper pause.",Log::DEBUG);
- $this->_pause = 2;
- Swoole\Coroutine::suspend();
- Log::record("subcoroutine runlooper resume success.",Log::DEBUG);
- }
- perfor_clear();
- if(defined('USE_COROUTINE') && USE_COROUTINE)
- {
- $res = Swoole\Coroutine::stats();
- $num = $res['coroutine_num'];
- $mem = memory_get_usage();
- if($num < ILooper::MAX_COROUTINE)
- {
- if($this->mServer->connect() == false) {
- Swoole\Coroutine::sleep(1);
- Log::record("Processor redis disconnect.",Log::ERR);
- continue;
- }
- if(defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP)
- {
- $content = $this->mServer->brpop($queues,1);
- if(empty($content)) {
- continue;
- }
- }
- else
- {
- $content = $this->mServer->rpop();
- if(empty($content)) {
- Swoole\Coroutine::sleep(0.1);
- continue;
- }
- }
- $pThis = $this;
- go(function () use ($content, $num, $mem,$pThis) {
- $start = microtime(true);
- Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}", Log::DEBUG);
- $method = $pThis->handle($content);
- $use_time = microtime(true) - $start;
- $res = Swoole\Coroutine::stats();
- $num = $res['coroutine_num'];
- $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}", $use_time);
- Log::record($msg, Log::DEBUG);
- });
- }
- else {
- Swoole\Coroutine::sleep(0.1);
- }
- }
- else
- {
- if($this->mServer->connect() == false) {
- sleep(1);
- continue;
- }
- $content = $this->mServer->pop($queues,1);
- if(empty($content)) continue;
- if($this->_stop)
- {
- foreach ($content as $key => $params) {
- DispatcherClient::instance()->push($key, $params);
- }
- }
- else
- {
- perfor_clear();
- perfor_start();
- $this->handle($content);
- perfor_end('Handle Request');
- $info = perfor_log();
- Log::record("{$info} \r\n",Log::DEBUG);
- }
- }
- }
- catch (Exception $e)
- {
- $err = $e->getMessage();
- $code = $e->getCode();
- Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
- }
- }
- Log::record("ILooper Run quit.", Log::DEBUG);
- }
- private function sig_handler($signo)
- {
- Log::record("queue quit at sig_handler.",Log::DEBUG);
- switch($signo) {
- case SIGINT:
- case SIGHUP:
- case SIGQUIT:
- case SIGTERM:
- $this->_stop = true;
- break;
- default:
- break;
- }
- }
- }
|