lpush($queue_name, serialize([$key => $value])); } else { if (!C('queue.open')) { Logic('queue')->$key($value); //如果队列没打开,立即执行 return; } if (!is_object(self::$queuedb)) { self::$queuedb = new QueueDB(); } return self::$queuedb->push(serialize([$key=>$value])); } } public static function async_push($key, $value, $period_secs = 0) { if($period_secs > 0) { $model_cron = Model('cron'); return $model_cron->addCron(['params' => serialize([$key=>$value]),'type' => 8,'exeid' => 0,'exetime' => time() + $period_secs]); } else { return self::push($key,$value); } } } class QueueServer { private $_queuedb; public function __construct() { $this->_queuedb = new QueueDB(); } public function pop($key,$time) { $result = $this->_queuedb->pop($key,$time); if($result != null) { return unserialize($result); } else { return false; } } public function scan() { return $this->_queuedb->scan(); } } class QueueDB { private $_redis; private $_tb_prefix = 'QUEUE_TABLE_'; //存定义存储表的数量,系统会随机分配存储 private $_tb_num = 2; public function __construct() { if ( !extension_loaded('redis') ) { throw_exception('redis failed to load'); } $this->_redis = new Redis(); $this->_redis->connect(C('queue.host'),C('queue.port'),20); $this->_redis->setOption(Redis::OPT_READ_TIMEOUT, 10); } public function push($value) { try { return $this->_redis->lPush($this->_tb_prefix.rand(1,$this->_tb_num),$value); } catch(Exception $e) { throw_exception($e->getMessage()); } } public function scan() { $list_key = []; for($i=1;$i<=$this->_tb_num;$i++) { $list_key[] = $this->_tb_prefix.$i; } return $list_key; } public function pop($key, $time) { $result = $this->_redis->brPop($key, $time); if ($result) { return $result[1]; } else { return null; } } public function clear() { $this->_redis->flushAll(); } }