mConfig = C('redis'); $this->mStates = $states; $this->mLock = $mutex; } function run() { @date_default_timezone_set('Asia/Shanghai'); while (true) { try { $redis = new Redis; //多线程版本不能定义为类成员变量,可能和redis库的设计有关。 $ret = $redis->pconnect($this->mConfig['master']['host'], $this->mConfig['master']['port']); $redis->setOption(Redis::OPT_READ_TIMEOUT, 3600); if($ret == false) { Log::record("redis 连接失败.",Log::ERR); } else { Log::record("redis 连接成功.",Log::DEBUG); } Log::record("Message thread start run....",Log::DEBUG); $redis->subscribe(all_channels(), 'handler_redis'); Log::record("Message thread quit....",Log::DEBUG); } catch (Exception $ex) { Log::record("subscriber quit err={$ex->getMessage()} code={$ex->getCode()}"); } } } protected function dispatch($channel,$msg) { Log::record("subscriber dispatch ch={$channel} msg={$msg}",Log::DEBUG); if(empty($msg)) return false; $msg = unserialize($msg); if($msg == false || !is_array($msg)) { return false; } if($channel == 'ch_index') { Mutex::lock($this->mLock); $ret = StatesHelper::onIndex($this->mStates,$msg); Mutex::unlock($this->mLock); return $ret; } elseif($channel == 'searcher') { Mutex::lock($this->mLock); $ret = StatesHelper::onSearcher($this->mStates,$msg); Mutex::unlock($this->mLock); return $ret; } elseif($channel == 'activity') { Mutex::lock($this->mLock); $ret = StatesHelper::onActivity($this->mStates,$msg); Mutex::unlock($this->mLock); return $ret; } elseif($channel == 'goods') { Mutex::lock($this->mLock); $ret = StatesHelper::onGoods($this->mStates,$msg); Mutex::unlock($this->mLock); return $ret; } else { return false; } } }