123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- <?php
- declare(strict_types=1);
- namespace queue;
- require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
- use Redis;
- use Exception;
- use Log;
- class IQueueDB
- {
- private $_redis;
- private $_tb_prefix;
- //存定义存储表的数量,系统会随机分配存储
- private $_tb_num = 3;
- public function __construct($queue_name)
- {
- if ( !extension_loaded('redis') ) {
- throw_exception('redis failed to load');
- }
- $this->_tb_prefix = $queue_name;
- $this->_redis = new Redis();
- $this->_redis->connect(C('queue.host'), C('queue.port'), 20);
- $this->_redis->setOption(Redis::OPT_READ_TIMEOUT, 10);
- }
- public function rpush($value)
- {
- try {
- return $this->_redis->rPush($this->_tb_prefix.rand(1,$this->_tb_num),$value);
- } catch(Exception $e) {
- throw_exception($e->getMessage());
- }
- }
- public function lpush($value)
- {
- try {
- return $this->_redis->lPush($this->_tb_prefix.rand(1,$this->_tb_num),$value);
- } catch(Exception $e) {
- throw_exception($e->getMessage());
- }
- }
- public function scan()
- {
- $list_key = [];
- for($i=1;$i<=$this->_tb_num;$i++) {
- $list_key[] = $this->_tb_prefix.$i;
- }
- return $list_key;
- }
- public function rpop($key, $time)
- {
- $result = $this->_redis->brPop($key, $time);
- if ($result) {
- return $result[1];
- } else {
- return null;
- }
- }
- public function lpop($key, $time)
- {
- $result = $this->_redis->blPop($key, $time);
- if ($result) {
- return $result[1];
- } else {
- return null;
- }
- }
- public function clear() {
- $this->_redis->flushAll();
- }
- }
- /**
- * 队列处理
- *
- *
- * @package
- */
- class IClient
- {
- private $mQueuedb;
- public function __construct($queueDb)
- {
- $this->mQueuedb = $queueDb;
- }
- public function push($key, $value)
- {
- return $this->mQueuedb->lpush(serialize([$key=>$value]));
- }
- }
- class IServer
- {
- private $_queuedb;
-
- public function __construct($queueDb) {
- $this->_queuedb = $queueDb;
- }
- public function pop($key,$time)
- {
- $result = $this->_queuedb->rpop($key,$time);
- if($result != null) {
- return unserialize($result);
- } else {
- return false;
- }
- }
- public function lpop($key,$time)
- {
- $result = $this->_queuedb->lpop($key,$time);
- if($result != null) {
- return unserialize($result);
- } else {
- return false;
- }
- }
- public function scan() {
- return $this->_queuedb->scan();
- }
- }
- abstract class ILooper
- {
- private $_stop = false;
- private $mServer;
- protected function __construct($server)
- {
- $this->mServer = $server;
- }
- public function prepare()
- {
- if (ob_get_level()) ob_end_clean();
- pcntl_signal(SIGINT, [$this,'sig_handler']);
- pcntl_signal(SIGHUP, [$this,'sig_handler']);
- pcntl_signal(SIGQUIT, [$this,'sig_handler']);
- pcntl_signal(SIGTERM, [$this,'sig_handler']);
- }
- abstract protected function handle($msg);
- public function run()
- {
- $queues = $this->mServer->scan();
- while (true)
- {
- try
- {
- if ($this->_stop) break;
- $content = $this->mServer->pop($queues,1);
- perfor_clear();
- perfor_start();
- $this->handle($content);
- perfor_end('Handle Request');
- $info = perfor_log();
- Log::record("{$info} \r\n\r\n",Log::DEBUG);
- }
- catch (Exception $e)
- {
- $err = $e->getMessage();
- $code = $e->getCode();
- Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
- }
- }
- }
- private function sig_handler($signo)
- {
- Log::record("queue quit at sig_handler.",Log::DEBUG);
- switch($signo) {
- case SIGINT:
- case SIGHUP:
- case SIGQUIT:
- case SIGTERM:
- $this->_stop = true;
- break;
- default:
- break;
- }
- }
- }
|