|
@@ -6,19 +6,17 @@
|
|
|
* @package
|
|
|
*/
|
|
|
|
|
|
-class QueueClient {
|
|
|
-
|
|
|
+class QueueClient
|
|
|
+{
|
|
|
private static $queuedb;
|
|
|
|
|
|
- /**
|
|
|
- * 入列
|
|
|
- * @param string $key
|
|
|
- * @param array $value
|
|
|
- */
|
|
|
- public static function push($key, $value) {
|
|
|
+ public static function push($key, $value)
|
|
|
+ {
|
|
|
if (!C('queue.open')) {
|
|
|
- Logic('queue')->$key($value);return;
|
|
|
+ Logic('queue')->$key($value); //如果队列没打开,立即执行
|
|
|
+ return;
|
|
|
}
|
|
|
+
|
|
|
if (!is_object(self::$queuedb)) {
|
|
|
self::$queuedb = new QueueDB();
|
|
|
}
|
|
@@ -26,20 +24,22 @@ class QueueClient {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-class QueueServer {
|
|
|
-
|
|
|
+class QueueServer
|
|
|
+{
|
|
|
private $_queuedb;
|
|
|
|
|
|
public function __construct() {
|
|
|
$this->_queuedb = new QueueDB();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 取出队列
|
|
|
- * @param unknown $key
|
|
|
- */
|
|
|
- public function pop($key,$time) {
|
|
|
- return unserialize($this->_queuedb->pop($key,$time));
|
|
|
+ public function pop($key,$time)
|
|
|
+ {
|
|
|
+ $result = $this->_queuedb->pop($key,$time);
|
|
|
+ if($result != null) {
|
|
|
+ return unserialize($result);
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public function scan() {
|
|
@@ -47,44 +47,35 @@ class QueueServer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-class QueueDB {
|
|
|
-
|
|
|
- //定义对象
|
|
|
+class QueueDB
|
|
|
+{
|
|
|
private $_redis;
|
|
|
|
|
|
- //存储前缀
|
|
|
private $_tb_prefix = 'QUEUE_TABLE_';
|
|
|
-
|
|
|
//存定义存储表的数量,系统会随机分配存储
|
|
|
private $_tb_num = 2;
|
|
|
|
|
|
- /**
|
|
|
- * 初始化
|
|
|
- */
|
|
|
- public function __construct() {
|
|
|
+ public function __construct()
|
|
|
+ {
|
|
|
if ( !extension_loaded('redis') ) {
|
|
|
throw_exception('redis failed to load');
|
|
|
}
|
|
|
$this->_redis = new Redis();
|
|
|
$this->_redis->connect(C('queue.host'),C('queue.port'),20);
|
|
|
+ $this->_redis->setOption(Redis::OPT_READ_TIMEOUT, 120);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 入列
|
|
|
- * @param unknown $value
|
|
|
- */
|
|
|
- public function push($value) {
|
|
|
+ public function push($value)
|
|
|
+ {
|
|
|
try {
|
|
|
return $this->_redis->lPush($this->_tb_prefix.rand(1,$this->_tb_num),$value);
|
|
|
- }catch(Exception $e) {
|
|
|
+ } catch(Exception $e) {
|
|
|
throw_exception($e->getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 取得所有的list key(表)
|
|
|
- */
|
|
|
- public function scan() {
|
|
|
+ public function scan()
|
|
|
+ {
|
|
|
$list_key = array();
|
|
|
for($i=1;$i<=$this->_tb_num;$i++) {
|
|
|
$list_key[] = $this->_tb_prefix.$i;
|
|
@@ -92,23 +83,23 @@ class QueueDB {
|
|
|
return $list_key;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 出列
|
|
|
- * @param unknown $key
|
|
|
- */
|
|
|
- public function pop($key, $time) {
|
|
|
- try {
|
|
|
+ public function pop($key, $time)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
if ($result = $this->_redis->brPop($key,$time)) {
|
|
|
return $result[1];
|
|
|
}
|
|
|
} catch (Exception $e) {
|
|
|
- exit($e->getMessage());
|
|
|
+ $err = $e->getMessage();
|
|
|
+ $code = $e->getCode();
|
|
|
+ Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
|
|
|
+ return null;
|
|
|
}
|
|
|
+
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 清空,暂时无用
|
|
|
- */
|
|
|
public function clear() {
|
|
|
$this->_redis->flushAll();
|
|
|
}
|