stanley-king 3 yıl önce
ebeveyn
işleme
aba3f6d8d9

+ 0 - 8
core/framework/libraries/CoRefPool.php

@@ -39,9 +39,6 @@ abstract class CoRefPool
             $this->mUsingClients[$cid] = ['client' => $client, 'ref_count' => 1,'ifTransacting' => false];
         }
 
-//        $count = count($this->mUsingClients);
-//        Log::record("mysqli get cid={$cid} mUsingCount={$count}",Log::DEBUG);
-
         return $client;
     }
 
@@ -113,7 +110,6 @@ abstract class CoRefPool
 
     public function put($cid)
     {
-//        Log::record("CoRefPool put cid={$cid}",Log::DEBUG);
         if(array_key_exists($cid,$this->mUsingClients))
         {
             $this->mUsingClients[$cid]['ref_count'] -= 1;
@@ -129,11 +125,7 @@ abstract class CoRefPool
                 if($suspend_cid > 0) {
                     Co::resume($suspend_cid);
                 }
-//                Log::record("mysqli CoRefPool put cid={$cid} suspend_cid={$suspend_cid} refcount={$refcount}",Log::DEBUG);
             }
-//            else {
-//                Log::record("mysqli CoRefPool put cid={$cid} refcount={$refcount}",Log::DEBUG);
-//            }
         }
         else {
             $msg = __METHOD__ . " mysqli cannot find mysqli cid={$cid} client";

+ 21 - 0
helper/queue/iqueue.php

@@ -215,6 +215,9 @@ class IServer
 abstract class ILooper
 {
     private $_stop = false;
+    private $_pause = false;
+    private $_cid = 0;
+
     private $mServer;
     const MAX_COROUTINE = 500;
 
@@ -234,6 +237,19 @@ abstract class ILooper
 
     abstract protected function handle($msg);
 
+    public function pause()
+    {
+        $this->_pause = true;
+    }
+
+    public function resume()
+    {
+        $this->_pause = false;
+        if($this->_cid > 0) {
+            Co::resume($this->_cid);
+        }
+    }
+
     public function stop()
     {
         Log::record(__FUNCTION__,Log::DEBUG);
@@ -242,11 +258,15 @@ abstract class ILooper
 
     public function run()
     {
+        $this->_cid = Co::getPcid();
         $queues = $this->mServer->scan();
         while (true)
         {
             try
             {
+                if($this->_pause) {
+                    Co::suspend();
+                }
                 if ($this->_stop) break;
 
                 perfor_clear();
@@ -338,6 +358,7 @@ abstract class ILooper
                 Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
             }
         }
+
         Log::record("ILooper Run quit.", Log::DEBUG);
     }
 

+ 18 - 14
helper/refill/ProviderManager.php

@@ -17,17 +17,17 @@ use Exception;
 
 class ProviderManager
 {
-    const LowestQuality  = 1;
-    const HighestQuality = 5;
-
-    protected $mProviderNames;
+    protected $mOpenedProviderNames;
     protected $mAllQMapPTS;
     protected $mProviders;
     protected $mSpecTypes;
 
     public function __construct()
     {
-
+        $this->mOpenedProviderNames = [];
+        $this->mAllQMapPTS = [];
+        $this->mProviders = [];
+        $this->mSpecTypes = [];
     }
 
     public function getQPTA()
@@ -35,7 +35,7 @@ class ProviderManager
         return $this->mAllQMapPTS;
     }
 
-    private function map_cfg($cfgs,$refill_type)
+    private function map_cfg($cfgs,$refill_type,$channels)
     {
         $card_types = function ($stypes)
         {
@@ -52,12 +52,15 @@ class ProviderManager
         foreach ($cfgs as $item)
         {
             $name = $item['name'];
-            $cfg = $item['cfg'];
+            if(!array_key_exists($name,$channels) || $channels[$name]['opened'] == false) {
+                continue;
+            }
 
+            $cfg = $item['cfg'];
             $provider = $this->create_provider($name,$cfg,$refill_type);
             if($provider !== false) {
                 $this->mProviders[$name] = $provider;
-                $this->mProviderNames[] = $name;
+                $this->mOpenedProviderNames[] = $name;
             } else {
                 continue;
             }
@@ -123,18 +126,19 @@ class ProviderManager
                 include($file);
             }
 
-            $this->mProviderNames = [];
+            $this->mOpenedProviderNames = [];
             $this->mAllQMapPTS = [];
             $this->mProviders = [];
             $this->mSpecTypes = [];
 
+            $channels = $this->read_channel();
+
             global $config;
-            $this->map_cfg($config['phone_providers'],'RefillPhone');
-            $this->map_cfg($config['oil_providers'],'RefillOil');
-            $this->map_cfg($config['third_providers'],'RefillPhone');
-            $this->mProviderNames = array_unique($this->mProviderNames);
+            $this->map_cfg($config['phone_providers'],'RefillPhone',$channels);
+            $this->map_cfg($config['oil_providers'],'RefillOil',$channels);
+            $this->map_cfg($config['third_providers'],'RefillPhone',$channels);
+            $this->mOpenedProviderNames = array_unique($this->mOpenedProviderNames);
 
-            $channels = $this->read_channel();
             foreach ($channels as $item)
             {
                 $name = $item['name'];

+ 8 - 2
helper/refill/policy/chctl.php

@@ -5,6 +5,7 @@ namespace refill;
 
 use Log;
 use mtopcard;
+use algorithm;
 
 class chctl
 {
@@ -60,7 +61,7 @@ class chctl
         }
     }
 
-    public function load()
+    public function load($opened_names)
     {
         $this->mSpeedtable = [];
         foreach (self::$cache_names as $cache)
@@ -75,8 +76,13 @@ class chctl
 
             foreach ($cfgs as $items)
             {
-                foreach ($items as $item) {
+                foreach ($items as $item)
+                {
                     $name = $item['name'];
+                    if(!algorithm::binary_search($opened_names,$name)) {
+                        continue;
+                    }
+
                     $amount = $item['amount'];
                     $card_type = $item['type'];
 

+ 10 - 3
helper/refill/policy/merchant_price.php

@@ -2,6 +2,7 @@
 
 namespace refill;
 use mtopcard;
+use algorithm;
 
 class merchant_price
 {
@@ -10,10 +11,11 @@ class merchant_price
 
     public function __construct()
     {
-        $this->load();
+        $this->mPrices = [];
+        $this->mExtraPrices = [];
     }
 
-    public function load()
+    public function load($mchids)
     {
         $this->mPrices = [];
         $this->mExtraPrices = [];
@@ -30,11 +32,15 @@ class merchant_price
 
             foreach ($items as $item)
             {
+                $mchid = intval($item['mchid']);
+                if(!algorithm::binary_search($mchids,$mchid)) {
+                    continue;
+                }
+
                 $card_types = $item['card_types'];
                 $price = ncPriceFormat($item['price']);
                 $extra_price = ncPriceFormat($item['extra_price']);
 
-                $mchid = intval($item['mchid']);
                 $spec = intval($item['spec']);
                 $quality = intval($item['quality']);
 
@@ -46,6 +52,7 @@ class merchant_price
                 }
             }
         }
+
     }
 
     public function price($mchid,$card_type,$spec,$quality,$pcode)

+ 13 - 5
helper/refill/policy/mgroup.php

@@ -2,6 +2,7 @@
 //机构指定通道分配
 
 namespace refill;
+use algorithm;
 
 class mchannel_item
 {
@@ -116,14 +117,14 @@ class rgroup_ctl
 
     public function __construct()
     {
-        $this->load();
+        $this->mMch2Channel = [];
     }
 
-    public function load()
+    public function load($opened_names,$opened_merchants)
     {
         //加载所有的通道组
         $groups = $this->load_groups();
-        $this->load_merchant($groups);
+        $this->load_merchant($groups,$opened_names,$opened_merchants);
     }
 
     public function find_providers($mchid,$spec,$card_type,$quality)
@@ -152,13 +153,16 @@ class rgroup_ctl
         return $groups;
     }
 
-    private function load_merchant($groups)
+    private function load_merchant($groups,$opened_names,$opened_merchants)
     {
         $this->mMch2Channel = [];
         $mchitems = Model('')->table('merchant')->limit(1000)->select();
         foreach ($mchitems as $item)
         {
             $mchid = intval($item['mchid']);
+            if(!algorithm::binary_search($opened_merchants,$mchid)) {
+                continue;
+            }
 
             $ids_text = $item['group_ids'];
             if (empty($ids_text)) continue;
@@ -177,8 +181,12 @@ class rgroup_ctl
 
                     $group = $groups[$gid];
                     $chitems = $group->infos();
-                    foreach ($chitems as $text) {
+                    foreach ($chitems as $text)
+                    {
                         [$spec, $card_type, $quality, $chname] = $this->parase($text);
+                        if(!algorithm::binary_search($opened_names,$chname)) {
+                            continue;
+                        }
                         $mchchannel->add_channel($spec, $card_type, $quality, $chname);
                     }
 

+ 2 - 1
helper/refill/policy/rstorage.php

@@ -144,7 +144,8 @@ class rstorage
     private $mMerchants;
     public function __construct()
     {
-        $this->load();
+        $this->mSystems = [];
+        $this->mMerchants = [];
     }
 
     public function load()

+ 31 - 3
helper/refill/policy/xyz/policy.php

@@ -31,18 +31,46 @@ class policy extends ProviderManager implements IPolicy
     {
         parent::load();
 
-        $this->mChannelControl->load();
+        $opened_names = $this->mOpenedProviderNames;
+        sort($opened_names);
+        $opened_merchants = $this->opened_merchants();
+
+        $this->mChannelControl->load($opened_names);
         $this->mChannelControl->update_price($this);
 
         $this->mQuality->load();
-        $this->mPrices->load();
+        $this->mPrices->load($opened_merchants);
 
         $turn_name = 'oil_amount_lock_turn';
         $this->mAmountLockTurn = rkcache($turn_name);
         Log::record("AmountLockTurn = {$this->mAmountLockTurn}",Log::DEBUG);
 
         $this->mStorageLocker->load();
-        $this->mGroupCtl->load();
+        $this->mGroupCtl->load($opened_names,$opened_merchants);
+    }
+
+    private function opened_merchants()
+    {
+        $mchids = [];
+
+        $i = 0;
+        while (true)
+        {
+            $start = $i * 1000;
+            $items = Model()->table('merchant')->field('*')->where(['merchant_state' => 1])->order('mchid asc')->limit("{$start},1000")->select();
+            if(empty($items)) {
+                break;
+            }
+            $i++;
+
+            foreach ($items as $item) {
+                $mchids[] = intval($item['mchid']);
+            }
+        }
+
+        sort($mchids);
+
+        return $mchids;
     }
 
     public function find_providers(int $mchid, int $spec, int $card_type, int $org_quality, int $quality, $regin_no, $pcode, $order_time, $commit_times): array

+ 47 - 42
rdispatcher/coall.php

@@ -24,7 +24,6 @@ require_once(BASE_CORE_PATH . '/framework/libraries/CoMysqliPool.php');
 require_once(BASE_CORE_PATH . '/framework/libraries/CoPool.php');
 require_once(BASE_CORE_PATH . '/framework/libraries/CoRedisPool.php');
 
-//Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL | SWOOLE_HOOK_SLEEP | SWOOLE_HOOK_TCP]);
 Co::set(['hook_flags' => SWOOLE_HOOK_ALL | SWOOLE_HOOK_NATIVE_CURL]);
 if (empty($_SERVER['argv'][1])) exit('parameter error');
 $process_count = intval($_SERVER['argv'][1]);
@@ -49,16 +48,18 @@ function handle_error($level, $message, $file, $line)
     Log::record($trace,Log::ERR);
 }
 
-function subscribe_message(&$quit, &$redis, $channels)
+function subscribe_message(&$quit, &$redis, $channels,$looper)
 {
-    $redis = new Swoole\Coroutine\Redis();
+    $redis = new Redis();
     while (!$quit)
     {
         try
         {
             Log::record("subscribe_message start quit=" . strbool($quit),Log::DEBUG);
-            if(!$redis->connected) {
+            if(!$redis->isConnected()) {
+                $redis->close();
                 $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
+                $redis->setOption(Redis::OPT_READ_TIMEOUT, 3600);
             }
             else {
                 $ret = true;
@@ -66,46 +67,45 @@ function subscribe_message(&$quit, &$redis, $channels)
 
             if(!$ret) {
                 Log::record("subscribe_message cannot connet redis.",Log::DEBUG);
-                $redis->close();
+                sleep(1);
+                continue;
             }
-            elseif($redis->subscribe($channels))
+
+            $ret = $redis->subscribe($channels,function ($redis,$channel,$msg) use(&$quit,$looper)
             {
-                while ($msg = $redis->recv())
-                {
-                    [$sub_type, $channel, $content] = $msg;
-                    $content = json_decode($content,true);
-                    $type = $content['type'];
-
-                    if($channel != 'refill') continue;
-                    if($quit) break;
-
-                    if($type == 'channel' || $type == 'merchant') {
-                        refill\RefillFactory::instance()->load();
-                    }
-                    elseif($type == 'ratio') {
-                        $ins = Cache::getInstance('cacheredis');
-                        $val = $ins->get_org('channel_ratios');
-
-                        if(empty($val)) continue;
-                        $val = json_decode($val,true);
-                        if(empty($val)) continue;
-                        $ratios = $val['ratios'];
-                        if(empty($ratios)) continue;
-
-                        refill\RefillFactory::instance()->UpdateRatio($ratios);
-                    }
-                    else {
-                        Log::record("subscribe_message dont not handle mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
-                    }
+                Log::record("channel={$channel} msg={$msg}",Log::DEBUG);
+                $content = json_decode($msg,true);
+                $type = $content['type'];
+
+                if($channel != 'refill') return;
+                if($quit) return;
+
+                if($type == 'channel' || $type == 'merchant') {
+                    $looper->pause();
+                    refill\RefillFactory::instance()->load();
+                    $looper->resume();
                 }
-            }
-            else {
-                Log::record("subscribe_message subscribe error",Log::ERR);
-            }
+                elseif($type == 'ratio') {
+                    $ins = Cache::getInstance('cacheredis');
+                    $val = $ins->get_org('channel_ratios');
+
+                    if(empty($val)) return;
+                    $val = json_decode($val,true);
+                    if(empty($val)) return;
+                    $ratios = $val['ratios'];
+                    if(empty($ratios)) return;
+
+                    refill\RefillFactory::instance()->UpdateRatio($ratios);
+                }
+                else {
+                    Log::record("subscribe_message dont not handle mgs:{$channel}-{$type}",Log::DEBUG);
+                }
+            });
+            Log::record("subscribe ret={$ret}",Log::DEBUG);
         }
         catch (Exception $ex)
         {
-            Log::record($ex->getMessage(),Log::ERR);
+            Log::record("subscribe_message " . $ex->getMessage(), Log::ERR);
         }
     }
 
@@ -123,11 +123,12 @@ for ($i = 0; $i < $process_count;$i++)
 
         $sub_quit = false;
         $sub_redis = null;
-        go(function () use (&$sub_quit,&$sub_redis) {
-            subscribe_message($sub_quit,$sub_redis,['refill']);
+        $looper = new processor(false);
+
+        go(function () use (&$sub_quit,&$sub_redis,$looper) {
+            subscribe_message($sub_quit,$sub_redis,['refill'],$looper);
         });
 
-        $looper = new processor(false);
         go(function () use ($looper) {
             $looper->run();
         });
@@ -154,6 +155,7 @@ for ($i = 0; $i < $process_count;$i++)
                 }
                 Log::record("coroutine_num = {$num}",Log::DEBUG);
             } while($num > 1);
+
             CoRedisPool::instance()->stop();
             CoMysqliPool::instance()->stop();
         });
@@ -173,9 +175,12 @@ while (true)
     }
     else
     {
+        foreach ($workers as $pid => $worker) {
+            Swoole\Process::kill($pid, SIGTERM);
+        }
+
         foreach ($workers as $pid => $worker)
         {
-            Swoole\Process::kill($pid,SIGTERM);
             if($status = Swoole\Process::wait(true)) {
                 Log::record("Graceful Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
             }

+ 5 - 0
test/TestRefillThird.php

@@ -362,4 +362,9 @@ class TestRefillThird extends TestCase
         $proxy = new refill_proxy("210fe406954220f56085997d6a4c5b80");
         $resp = $proxy->send($url, $params);
     }
+
+    public function testLoad()
+    {
+        refill\RefillFactory::instance()->load();
+    }
 }