فهرست منبع

增加多线程,订阅发布模型

stanley-king 8 سال پیش
والد
کامیت
42bd744095
9فایلهای تغییر یافته به همراه286 افزوده شده و 64 حذف شده
  1. 7 1
      admin/control/cache.php
  2. 12 0
      fcgi_run.php
  3. 0 1
      helper/fcgi_server.php
  4. 65 0
      helper/index_tab.php
  5. 67 0
      helper/message/msgutil.php
  6. 46 0
      helper/message/publisher.php
  7. 84 0
      helper/message/subscriber.php
  8. 1 60
      mobile/control/index.php
  9. 4 2
      test/redisTest.php

+ 7 - 1
admin/control/cache.php

@@ -5,8 +5,11 @@
 
 defined('InShopNC') or exit('Access Invalid!');
 
+require_once (BASE_ROOT_PATH . '/helper/message/publisher.php');
+
 class cacheControl extends SystemControl
 {
+    private $mPublisher;
     protected $cacheItems = array(
         'setting',          // 基本缓存
         'seo',              // SEO缓存
@@ -36,6 +39,7 @@ class cacheControl extends SystemControl
         parent::__construct();
         Language::read('cache');
         $this->cacher = Cache::getInstance('cacheredis');
+        $this->mPublisher = new message\publisher();
     }
 
     /**
@@ -147,7 +151,9 @@ class cacheControl extends SystemControl
             // 删除手机专题活动相关缓存
             if (in_array('specials',$todo)) {
                 $this->del_keys('mb_special*');
-                $this->inc_version("index_tab_version");
+                //$this->inc_version("index_tab_version");
+                $this->mPublisher->modify_index_tabs();
+
                 dkcache('web_code_123');
             }
 

+ 12 - 0
fcgi_run.php

@@ -9,5 +9,17 @@ require_once (BASE_ROOT_PATH . '/helper/session.php');
 require_once (BASE_ROOT_PATH . '/helper/img_helper.php');
 require_once (BASE_ROOT_PATH . '/helper/exceptionex.php');
 require_once (BASE_ROOT_PATH . '/helper/fcgi_server.php');
+require_once (BASE_ROOT_PATH . '/helper/message/msgutil.php');
+require_once (BASE_ROOT_PATH . '/helper/message/subscriber.php');
+require_once (BASE_ROOT_PATH . '/helper/index_tab.php');
+
+
+Base::mobile_init();
+
+$gMessageStates = new MsgStates();
+StatesHelper::init();
+$gMessageLock = Mutex::create();
+$listener = new message\subscriber($gMessageStates,$gMessageLock);
+$listener->start();
 
 fcgi_server::instance()->run_looper();

+ 0 - 1
helper/fcgi_server.php

@@ -73,7 +73,6 @@ class fcgi_server
         //载入敏感词词库
         DFAFilter::instance();
         require_once(BASE_ROOT_PATH.'/mobile/index.php');
-        Base::mobile_init();
 
         while(($ret = fcgi_accept()) >= 0)
         {

+ 65 - 0
helper/index_tab.php

@@ -0,0 +1,65 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: stanley-king
+ * Date: 2017/2/27
+ * Time: 上午9:57
+ */
+
+function comp_tab($left,$right)
+{
+    $t_l = intval($left['sort']);
+    $t_r = intval($right['sort']);
+
+    if($t_l > $t_r) {
+        return 1;
+    } elseif($t_l == $t_r) {
+        return 0;
+    } else {
+        return -1;
+    }
+}
+
+class index_tab
+{
+    private static $stInstance = null;
+    private $mTabs;
+    const mb_home_tab_id = 123;
+
+    public function __construct()
+    {
+        $this->mTabs = [];
+    }
+    public static function instance() {
+        if(self::$stInstance == null) {
+            self::$stInstance = new index_tab();
+        }
+
+        return self::$stInstance;
+    }
+
+    public function tabs()
+    {
+        if(StatesHelper::fetch_state('tags')) {
+            $this->init();
+        }
+        return $this->mTabs;
+    }
+
+    private function init()
+    {
+        Log::record("init tags data.");
+
+        $this->mTabs = [];
+        $mod_webcode = Model('web_code');
+        $tabs = $mod_webcode->get_cache(self::mb_home_tab_id);
+        if($tabs == null || empty($tabs)) {
+            $tabs[] = array('special_id' => 0,'name' => '首页', 'sort' => 0);
+        }
+
+        uasort($tabs,'comp_tab');
+        foreach ($tabs as $key => $val) {
+            $this->mTabs[] = array('special_id' => $val['special_id'],'name' => $val['name']);
+        }
+    }
+}

+ 67 - 0
helper/message/msgutil.php

@@ -0,0 +1,67 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: stanley-king
+ * Date: 2017/2/26
+ * Time: 下午7:36
+ */
+
+class MsgStates extends Stackable
+{
+    public function run() {
+    }
+}
+
+class StatesHelper
+{
+    static public function onIndex($states,$msg)
+    {
+        $type = $msg['type'];
+        if($type == 'tags') {
+            $states['tags'] = true;
+        }
+    }
+    static public function fetch_state($tag)
+    {
+        global $gMessageStates;
+        global $gMessageLock;
+
+        Mutex::lock($gMessageLock);
+        if(is_array($gMessageStates) && array_key_exists($tag,$gMessageStates))
+        {
+            $state = $gMessageStates[$tag];
+            if($state == true) {
+                $gMessageLock[$tag] = false;
+            }
+        }
+        else {
+            $gMessageLock[$tag] = false;
+            $state = true;
+        }
+        Mutex::unlock($gMessageLock);
+
+        return $state;
+    }
+
+    static public function init()
+    {
+        global $gMessageStates;
+        global $gMessageLock;
+
+        Mutex::lock($gMessageLock);
+        $gMessageStates['tags'] = true;
+        Mutex::unlock($gMessageLock);
+    }
+}
+
+function handler_redis($redis, $chan, $msg)
+{
+    Log::record("message: {$chan} -- {$msg}",Log::DEBUG);
+    $cur_trd = Thread::getCurrentThread();
+    $cur_trd->dispatch($chan,$msg);
+}
+
+function all_channels()
+{
+    return array('ch_index');
+}

+ 46 - 0
helper/message/publisher.php

@@ -0,0 +1,46 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: stanley-king
+ * Date: 2017/2/27
+ * Time: 上午10:17
+ */
+
+namespace message;
+
+use Log;
+use Redis;
+
+class publisher
+{
+    private $mConfig;
+    private $mRedis;
+    private $mConnected;
+
+    public function __construct()
+    {
+        $this->mConfig = C('redis');
+        $this->mRedis = new Redis();
+        $ret = $this->mRedis->pconnect($this->mConfig['master']['host'], $this->mConfig['master']['port']);
+        if($ret == false) {
+            Log::record("redis 连接失败.",Log::ERR);
+            $this->mConnected = false;
+        } else {
+            Log::record("redis 连接成功.",Log::DEBUG);
+            $this->mConnected = true;
+        }
+    }
+    public function __destruct()
+    {
+        if($this->mConnected) {
+            $this->mRedis->close();
+        }
+    }
+
+    public function modify_index_tabs()
+    {
+        if($this->mConnected) {
+            $this->mRedis->publish('ch_index',serialize(array('type'=>'tags','value' => 0)));
+        }
+    }
+}

+ 84 - 0
helper/message/subscriber.php

@@ -0,0 +1,84 @@
+<?php
+
+/**
+ * Created by PhpStorm.
+ * User: stanley-king
+ * Date: 2017/2/24
+ * Time: 下午5:04
+ */
+
+
+namespace message;
+
+use Thread;
+use Log;
+use Redis;
+use Exception;
+use StatesHelper;
+use Mutex;
+
+class subscriber extends Thread
+{
+    private $mConfig;
+    private $mStates;
+    private $mLock;
+
+    public function __construct($states,$mutex)
+    {
+        $this->mConfig = C('redis');
+        $this->mStates = $states;
+        $this->mLock = $mutex;
+    }
+
+    function run()
+    {
+        @date_default_timezone_set('Asia/Shanghai');
+
+        while (true)
+        {
+            try
+            {
+                $redis  = new Redis; //多线程版本不能定义为类成员变量,可能和redis库的设计有关。
+                $ret = $redis->pconnect($this->mConfig['master']['host'], $this->mConfig['master']['port']);
+                $redis->setOption(Redis::OPT_READ_TIMEOUT, 600);
+
+
+                if($ret == false) {
+                    Log::record("redis 连接失败.",Log::ERR);
+                } else {
+                    Log::record("redis 连接成功.",Log::DEBUG);
+                }
+
+                Log::record("Message thread start run....",Log::DEBUG);
+
+                $redis->subscribe(all_channels(), 'handler_redis');
+                Log::record("Message thread quit....",Log::DEBUG);
+            } catch (Exception $ex) {
+                Log::record("subscriber quit err={$ex->getMessage()} code={$ex->getCode()}");
+            }
+        }
+    }
+
+    protected function dispatch($channel,$msg)
+    {
+        Log::record("ch={$channel} msg={$msg}",Log::DEBUG);
+
+        if(empty($msg)) return false;
+        $msg = unserialize($msg);
+        if($msg == false || !is_array($msg)) {
+            return false;
+        }
+
+        if($channel == 'ch_index')
+        {
+            Mutex::lock($this->mLock);
+            $ret = StatesHelper::onIndex($this->mStates,$msg);
+            Mutex::unlock($this->mLock);
+
+            return $ret;
+        }
+        else {
+            return false;
+        }
+    }
+}

+ 1 - 60
mobile/control/index.php

@@ -12,70 +12,11 @@ defined('InShopNC') or exit('Access Invalid!');
 
 
 require_once(BASE_ROOT_PATH . '/helper/goods_helper.php');
-require_once(BASE_ROOT_PATH . '/helper/activity/version_checker.php');
 require_once(BASE_ROOT_PATH . '/helper/special_helper.php');
+require_once(BASE_ROOT_PATH . '/helper/index_tab.php');
 require_once(BASE_ROOT_PATH . '/mobile/control/special.php');
 
 
-function comp_tab($left,$right)
-{
-    $t_l = intval($left['sort']);
-    $t_r = intval($right['sort']);
-
-    if($t_l > $t_r) {
-        return 1;
-    } elseif($t_l == $t_r) {
-        return 0;
-    } else {
-        return -1;
-    }
-}
-
-class index_tab
-{
-    static private $stInstance = null;
-    private $verchecker;
-    private $mTabs;
-    const mb_home_tab_id = 123;
-
-
-    static public function instance()
-    {
-        if(self::$stInstance == null) {
-            self::$stInstance = new index_tab();
-        }
-
-        if(self::$stInstance->verchecker->need_init()) {
-            self::$stInstance->init();
-        }
-
-        return self::$stInstance;
-    }
-
-    private function __construct()
-    {
-        $this->verchecker = new activity\version_checker('index_tab_version',300);
-    }
-
-    public function tabs() {
-        return $this->mTabs;
-    }
-
-    private function init()
-    {
-        $this->mTabs = [];
-        $mod_webcode = Model('web_code');
-        $tabs = $mod_webcode->get_cache(self::mb_home_tab_id);
-        if($tabs == null || empty($tabs)) {
-            $tabs[] = array('special_id' => 0,'name' => '首页', 'sort' => 0);
-        }
-
-        uasort($tabs,'comp_tab');
-        foreach ($tabs as $key => $val) {
-            $this->mTabs[] = array('special_id' => $val['special_id'],'name' => $val['name']);
-        }
-    }
-}
 
 class indexControl extends specialControl
 {

+ 4 - 2
test/redisTest.php

@@ -76,8 +76,10 @@ class redisTest extends PHPUnit_Framework_TestCase
     {
         $redis = new Redis();
         $redis->pconnect('127.0.0.1',6379);
-        $redis->publish('chan-1', 'hello, world 1!'); // send message to channel 1.
-        $redis->publish('chan-2', 'hello, world 2!'); // send message to channel 2.
+
+
+        $redis->publish('ch_index',serialize(array('type'=>'tags','value' => 0))); // send message to channel 1.
+        //$redis->publish('chan-2', 'hello, world 2!'); // send message to channel 2.
         $redis->close();
     }
     public function testSubscribe()