mConfig = C('redis'); $this->mStates = $states; } function run() { @date_default_timezone_set('Asia/Shanghai'); while (true) { try { $redis = new Redis; //多线程版本不能定义为类成员变量,可能和redis库的设计有关。 $ret = $redis->pconnect($this->mConfig['master']['host'], $this->mConfig['master']['port']); $redis->setOption(Redis::OPT_READ_TIMEOUT, 3600); if($ret == false) { Log::record("redis 连接失败.",Log::ERR); } else { Log::record("redis 连接成功.",Log::DEBUG); } Log::record("Message thread start run....",Log::DEBUG); $redis->subscribe(all_channels(), function ($redis,$chan,$msg) { $this->synchronized(function() use ($redis,$chan,$msg) { handler_redis($redis,$chan,$msg); }); }); Log::record("Message thread quit....",Log::DEBUG); } catch (Exception $ex) { Log::record("subscriber quit err={$ex->getMessage()} code={$ex->getCode()}"); sleep(5); } } } public function dispatch($channel,$msg) { if(empty($msg)) return false; $msg = json_decode($msg,true); if($msg == false || !is_array($msg)) { return false; } $ret = true; if($channel == 'ch_index') { StatesHelper::onIndex($this->mStates,$msg); } elseif($channel == 'searcher') { StatesHelper::onSearcher($this->mStates,$msg); } elseif($channel == 'activity') { StatesHelper::onActivity($this->mStates,$msg); } elseif($channel == 'goods') { StatesHelper::onGoods($this->mStates,$msg); } elseif($channel == 'refill') { StatesHelper::onRefill($this->mStates,$msg); } else { $ret = false; } return $ret; } }