_tb_prefix = $queue_name; $this->_redis = new Redis(); $this->_redis->connect(C('queue.host'), C('queue.port'), 20); $this->_redis->setOption(Redis::OPT_READ_TIMEOUT, 10); } public function rpush($value) { try { return $this->_redis->rPush($this->_tb_prefix.rand(1,$this->_tb_num),$value); } catch(Exception $e) { throw_exception($e->getMessage()); } } public function lpush($value) { try { return $this->_redis->lPush($this->_tb_prefix.rand(1,$this->_tb_num),$value); } catch(Exception $e) { throw_exception($e->getMessage()); } } public function scan() { $list_key = []; for($i=1;$i<=$this->_tb_num;$i++) { $list_key[] = $this->_tb_prefix.$i; } return $list_key; } public function rpop($key, $time) { $result = $this->_redis->brPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function lpop($key, $time) { $result = $this->_redis->blPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function clear() { $this->_redis->flushAll(); } } /** * 队列处理 * * * @package */ class IClient { private $mQueuedb; public function __construct($queueDb) { $this->mQueuedb = $queueDb; } public function push($key, $value) { return $this->mQueuedb->lpush(serialize([$key=>$value])); } } class IServer { private $_queuedb; public function __construct($queueDb) { $this->_queuedb = $queueDb; } public function pop($key,$time) { $result = $this->_queuedb->rpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function lpop($key,$time) { $result = $this->_queuedb->lpop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function scan() { return $this->_queuedb->scan(); } } abstract class ILooper { private $_stop = false; private $mServer; protected function __construct($server) { $this->mServer = $server; } public function prepare() { if (ob_get_level()) ob_end_clean(); pcntl_signal(SIGINT, [$this,'sig_handler']); pcntl_signal(SIGHUP, [$this,'sig_handler']); pcntl_signal(SIGQUIT, [$this,'sig_handler']); pcntl_signal(SIGTERM, [$this,'sig_handler']); } abstract protected function handle($msg); public function run() { $queues = $this->mServer->scan(); while (true) { try { if ($this->_stop) break; $content = $this->mServer->pop($queues,1); perfor_clear(); perfor_start(); $this->handle($content); perfor_end('Handle Request'); $info = perfor_log(); Log::record("{$info} \r\n\r\n",Log::DEBUG); } catch (Exception $e) { $err = $e->getMessage(); $code = $e->getCode(); Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR); } } } private function sig_handler($signo) { Log::record("queue quit at sig_handler.",Log::DEBUG); switch($signo) { case SIGINT: case SIGHUP: case SIGQUIT: case SIGTERM: $this->_stop = true; break; default: break; } } }