123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- <?php
- /**
- * Created by PhpStorm.
- * User: stanley-king
- * Date: 2017/2/24
- * Time: 下午5:04
- */
- namespace message;
- use Thread;
- use Log;
- use Redis;
- use Exception;
- use StatesHelper;
- use Mutex;
- class subscriber extends Thread
- {
- private $mConfig;
- private $mStates;
- private $mLock;
- public function __construct($states,$mutex)
- {
- $this->mConfig = C('redis');
- $this->mStates = $states;
- $this->mLock = $mutex;
- }
- function run()
- {
- @date_default_timezone_set('Asia/Shanghai');
- while (true)
- {
- try
- {
- $redis = new Redis; //多线程版本不能定义为类成员变量,可能和redis库的设计有关。
- $ret = $redis->pconnect($this->mConfig['master']['host'], $this->mConfig['master']['port']);
- $redis->setOption(Redis::OPT_READ_TIMEOUT, 3600);
- if($ret == false) {
- Log::record("redis 连接失败.",Log::ERR);
- } else {
- Log::record("redis 连接成功.",Log::DEBUG);
- }
- Log::record("Message thread start run....",Log::DEBUG);
- $redis->subscribe(all_channels(), 'handler_redis');
- Log::record("Message thread quit....",Log::DEBUG);
- }
- catch (Exception $ex)
- {
- Log::record("subscriber quit err={$ex->getMessage()} code={$ex->getCode()}");
- }
- }
- }
- protected function dispatch($channel,$msg)
- {
- Log::record("subscriber dispatch ch={$channel} msg={$msg}",Log::DEBUG);
- if(empty($msg)) return false;
- $msg = unserialize($msg);
- if($msg == false || !is_array($msg)) {
- return false;
- }
- if($channel == 'ch_index')
- {
- Mutex::lock($this->mLock);
- $ret = StatesHelper::onIndex($this->mStates,$msg);
- Mutex::unlock($this->mLock);
- return $ret;
- }
- elseif($channel == 'searcher') {
- Mutex::lock($this->mLock);
- $ret = StatesHelper::onSearcher($this->mStates,$msg);
- Mutex::unlock($this->mLock);
- return $ret;
- }
- elseif($channel == 'activity') {
- Mutex::lock($this->mLock);
- $ret = StatesHelper::onActivity($this->mStates,$msg);
- Mutex::unlock($this->mLock);
- return $ret;
- }
- elseif($channel == 'goods') {
- Mutex::lock($this->mLock);
- $ret = StatesHelper::onGoods($this->mStates,$msg);
- Mutex::unlock($this->mLock);
- return $ret;
- }
- else {
- return false;
- }
- }
- }
|