subscriber.php 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: stanley-king
  5. * Date: 2017/2/24
  6. * Time: 下午5:04
  7. */
  8. namespace message;
  9. use Thread;
  10. use Log;
  11. use Redis;
  12. use Exception;
  13. use StatesHelper;
  14. use Mutex;
  15. class subscriber extends Thread
  16. {
  17. private $mConfig;
  18. private $mStates;
  19. private $mLock;
  20. public function __construct($states,$mutex)
  21. {
  22. $this->mConfig = C('redis');
  23. $this->mStates = $states;
  24. $this->mLock = $mutex;
  25. }
  26. function run()
  27. {
  28. @date_default_timezone_set('Asia/Shanghai');
  29. while (true)
  30. {
  31. try
  32. {
  33. $redis = new Redis; //多线程版本不能定义为类成员变量,可能和redis库的设计有关。
  34. $ret = $redis->pconnect($this->mConfig['master']['host'], $this->mConfig['master']['port']);
  35. $redis->setOption(Redis::OPT_READ_TIMEOUT, 600);
  36. if($ret == false) {
  37. Log::record("redis 连接失败.",Log::ERR);
  38. } else {
  39. Log::record("redis 连接成功.",Log::DEBUG);
  40. }
  41. Log::record("Message thread start run....",Log::DEBUG);
  42. $redis->subscribe(all_channels(), 'handler_redis');
  43. Log::record("Message thread quit....",Log::DEBUG);
  44. } catch (Exception $ex) {
  45. Log::record("subscriber quit err={$ex->getMessage()} code={$ex->getCode()}");
  46. }
  47. }
  48. }
  49. protected function dispatch($channel,$msg)
  50. {
  51. Log::record("ch={$channel} msg={$msg}",Log::DEBUG);
  52. if(empty($msg)) return false;
  53. $msg = unserialize($msg);
  54. if($msg == false || !is_array($msg)) {
  55. return false;
  56. }
  57. if($channel == 'ch_index')
  58. {
  59. Mutex::lock($this->mLock);
  60. $ret = StatesHelper::onIndex($this->mStates,$msg);
  61. Mutex::unlock($this->mLock);
  62. return $ret;
  63. }
  64. else {
  65. return false;
  66. }
  67. }
  68. }