subscriber.php 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: stanley-king
  5. * Date: 2017/2/24
  6. * Time: 下午5:04
  7. */
  8. namespace message;
  9. use Thread;
  10. use Log;
  11. use Redis;
  12. use Exception;
  13. use StatesHelper;
  14. use Mutex;
  15. class subscriber extends Thread
  16. {
  17. private $mConfig;
  18. private $mStates;
  19. private $mLock;
  20. public function __construct($states,$mutex)
  21. {
  22. $this->mConfig = C('redis');
  23. $this->mStates = $states;
  24. $this->mLock = $mutex;
  25. }
  26. function run()
  27. {
  28. @date_default_timezone_set('Asia/Shanghai');
  29. while (true)
  30. {
  31. try
  32. {
  33. $redis = new Redis; //多线程版本不能定义为类成员变量,可能和redis库的设计有关。
  34. $ret = $redis->pconnect($this->mConfig['master']['host'], $this->mConfig['master']['port']);
  35. $redis->setOption(Redis::OPT_READ_TIMEOUT, 3600);
  36. if($ret == false) {
  37. Log::record("redis 连接失败.",Log::ERR);
  38. } else {
  39. Log::record("redis 连接成功.",Log::DEBUG);
  40. }
  41. Log::record("Message thread start run....",Log::DEBUG);
  42. $redis->subscribe(all_channels(), 'handler_redis');
  43. Log::record("Message thread quit....",Log::DEBUG);
  44. }
  45. catch (Exception $ex)
  46. {
  47. Log::record("subscriber quit err={$ex->getMessage()} code={$ex->getCode()}");
  48. }
  49. }
  50. }
  51. protected function dispatch($channel,$msg)
  52. {
  53. Log::record("subscriber dispatch ch={$channel} msg={$msg}",Log::DEBUG);
  54. if(empty($msg)) return false;
  55. $msg = unserialize($msg);
  56. if($msg == false || !is_array($msg)) {
  57. return false;
  58. }
  59. if($channel == 'ch_index')
  60. {
  61. Mutex::lock($this->mLock);
  62. $ret = StatesHelper::onIndex($this->mStates,$msg);
  63. Mutex::unlock($this->mLock);
  64. return $ret;
  65. }
  66. elseif($channel == 'searcher') {
  67. Mutex::lock($this->mLock);
  68. $ret = StatesHelper::onSearcher($this->mStates,$msg);
  69. Mutex::unlock($this->mLock);
  70. return $ret;
  71. }
  72. elseif($channel == 'activity') {
  73. Mutex::lock($this->mLock);
  74. $ret = StatesHelper::onActivity($this->mStates,$msg);
  75. Mutex::unlock($this->mLock);
  76. return $ret;
  77. }
  78. elseif($channel == 'goods') {
  79. Mutex::lock($this->mLock);
  80. $ret = StatesHelper::onGoods($this->mStates,$msg);
  81. Mutex::unlock($this->mLock);
  82. return $ret;
  83. }
  84. else {
  85. return false;
  86. }
  87. }
  88. }