Explorar o código

add coroutine

stanley-king %!s(int64=4) %!d(string=hai) anos
pai
achega
145191778e

+ 4 - 5
core/framework/libraries/log.php

@@ -122,12 +122,11 @@ class Log
     private static function write($message, $level)
     {
         $now = @date('Y-m-d H:i:s', time());
-
-        define('USE_COROUTINE',true);
-
-
         if(defined('USE_COROUTINE') && USE_COROUTINE === true) {
-            $pid = Swoole\Coroutine::getCid();
+            $pid = getmypid();
+            $cid = Swoole\Coroutine::getCid();
+
+            $pid = "{$pid}-{$cid}";
         }
         else {
             $pid = posix_getpid();

+ 2 - 0
data/config/dev/base.ini.php

@@ -95,6 +95,8 @@ $config['redis']['slave']['host']     	= SLAVE_REDISHOST;
 $config['redis']['slave']['port']     	= 6379;
 $config['redis']['slave']['pconnect'] 	= 0;
 
+$config['coroutine']['redis_host'] = 'host.docker.internal';
+$config['coroutine']['redis_port'] = 6379;
 
 //$config['fullindexer']['open']      = false;
 //$config['fullindexer']['appname']   = '33hao';

+ 3 - 0
data/config/lingzh/base.ini.php

@@ -74,6 +74,9 @@ $config['redis']['slave']['host']     	= SLAVE_REDISHOST;
 $config['redis']['slave']['port']     	= 6379;
 $config['redis']['slave']['pconnect'] 	= 0;
 
+$config['coroutine']['redis_host'] = '172.16.110.28';
+$config['coroutine']['redis_port'] = 6379;
+
 //$config['fullindexer']['open']      = false;
 //$config['fullindexer']['appname']   = '33hao';
 

+ 3 - 0
data/config/xyz/base.ini.php

@@ -75,6 +75,9 @@ $config['redis']['slave']['host']     	= SLAVE_REDISHOST;
 $config['redis']['slave']['port']     	= 6379;
 $config['redis']['slave']['pconnect'] 	= 0;
 
+$config['coroutine']['redis_host'] = '172.26.105.125';
+$config['coroutine']['redis_port'] = 6379;
+
 
 //$config['fullindexer']['open']      = false;
 //$config['fullindexer']['appname']   = '33hao';

+ 12 - 0
data/logic/queue.logic.php

@@ -1419,4 +1419,16 @@ class queueLogic
             return callback(true, '成功放入通知队列', ['order_id' => $order_id]);
         }
     }
+
+    public function AysncAddDispatcher($params)
+    {
+        $method = $params['method'];
+        $input =  $params['params'];
+        if(empty($method || empty($method))) {
+            return callback(false, 'AysncAddDispatcher 参数为空');
+        } else {
+            $ret = queue\DispatcherClient::instance()->push($method,$input);
+            return callback($ret, "放入通知队列 ret = {$ret}",$params);
+        }
+    }
 }

+ 13 - 0
docker/compose/lz-worker/docker-compose.yml

@@ -12,6 +12,19 @@ services:
     container_name: "panda-dispatcher"
     command: [php,"/var/www/html/rdispatcher/dispatcher.php","20"]
 
+  cordsrv:
+    image: php-swool-redis:latest
+    volumes:
+      - ../../conf/etc/localtime:/etc/localtime:ro
+      - ../../../:/var/www/html
+      - /nfs/upload:/var/www/html/data/upload
+      - /mnt/shoplog:/var/www/html/data/log
+      - ../../conf/php/lz-php-swoole.ini:/usr/local/etc/php/php.ini
+    container_name: "panda-codispatcher"
+    command: [php,"/var/www/html/rdispatcher/codispatcher.php","8"]
+    depends_on:
+      - "redisrv"
+
   queuesrv:
     image: php-zts-debug:7.3.18
     volumes:

+ 15 - 0
docker/compose/stanley/docker-compose.yml

@@ -78,6 +78,21 @@ services:
     depends_on:
       - "redisrv"
 
+  cordsrv:
+    image: php-swool-redis:latest
+    volumes:
+      - ../../conf/etc/localtime:/etc/localtime:ro
+      - ../../../:/var/www/html
+      - /Volumes/Transcend/upload:/var/www/html/data/upload
+      - /Users/stanley-king/work/PHPProject/shoplog:/var/www/html/data/log
+      - ../../conf/php/php-swoole-debug.ini:/usr/local/etc/php/php.ini
+    links:
+      - redisrv
+    container_name: "panda-codispatcher"
+    command: [php,"/var/www/html/rdispatcher/codispatcher.php","5"]
+    depends_on:
+      - "redisrv"
+
   websrv:
       image: php-fpm:alpine
       ports:

+ 1 - 1
docker/conf/php/lz-php-swoole.ini

@@ -401,7 +401,7 @@ max_input_time = 60
 
 ; Maximum amount of memory a script may consume (128MB)
 ; http://php.net/memory-limit
-memory_limit = 128M
+memory_limit = 256M
 
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ; Error handling and logging ;

+ 31 - 11
helper/queue/iqueue.php

@@ -9,7 +9,6 @@ use Redis;
 use Exception;
 use Log;
 
-
 class IQueueDB
 {
     private $_redis;
@@ -27,7 +26,7 @@ class IQueueDB
 
         if($comode) {
             $this->_redis  = new \Swoole\Coroutine\Redis();
-            $ret = $this->_redis->connect('192.168.1.220',6379);
+            $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
             Log::record("Swoole\Coroutine\Redis connect ret = {$ret}",Log::DEBUG);
         }
         else {
@@ -145,6 +144,7 @@ abstract class ILooper
 {
     private $_stop = false;
     private $mServer;
+    const MAX_COROUTINE = 1000;
 
     protected function __construct($server)
     {
@@ -170,16 +170,36 @@ abstract class ILooper
             try
             {
                 if ($this->_stop) break;
-                $content = $this->mServer->pop($queues,1);
-                if(empty($content)) continue;
-
-                perfor_clear();
-                perfor_start();
-
-                $this->handle($content);
 
-                perfor_end('Handle Request');
-                $info = perfor_log();
+                if(defined('USE_COROUTINE') && USE_COROUTINE)
+                {
+                    $res = \Swoole\Coroutine::stats();
+                    $num = $res['coroutine_num'];
+                    Log::record("coroutine counts = {$num}",Log::DEBUG);
+
+                    if($num < ILooper::MAX_COROUTINE)
+                    {
+                        $content = $this->mServer->pop($queues,1);
+                        if(empty($content)) continue;
+
+                        go(function ()use ($content) {
+                            $this->handle($content);
+                        });
+                    }
+                    else {
+                        \Swoole\Coroutine::sleep(0.1);
+                    }
+                }
+                else {
+                    $content = $this->mServer->pop($queues,1);
+                    if(empty($content)) continue;
+
+                    perfor_clear();
+                    perfor_start();
+                    $this->handle($content);
+                    perfor_end('Handle Request');
+                    $info = perfor_log();
+                }
                 Log::record("{$info} \r\n",Log::DEBUG);
             }
             catch (Exception $e)

+ 0 - 5
helper/refill/LZRefillFactory.php

@@ -35,11 +35,6 @@ class LZRefillFactory extends RefillBase
         return self::$stInstance;
     }
 
-    private function load()
-    {
-        $this->mPolicy->load();
-    }
-
     private function __construct()
     {
         parent::__construct();

+ 1 - 1
helper/refill/ProviderManager.php

@@ -168,7 +168,7 @@ class ProviderManager
     public function find_providers(int $spec, int $card_type,int $quality): array
     {
         $qnames = $this->mSpecTypes[$quality] ?? [];
-        $this->debug();
+//        $this->debug();
 
         $key = "{$card_type}-{$spec}";
         Log::record("quality = {$quality} , key = {$key}",Log::DEBUG);

+ 5 - 0
helper/refill/RefillBase.php

@@ -39,6 +39,11 @@ class RefillBase
         return $this->mPolicy->goods();
     }
 
+    public function load()
+    {
+        $this->mPolicy->load();
+    }
+
     public function notify($chname, $input)
     {
         $caller = $this->mPolicy->getCaller($chname);

+ 0 - 5
helper/refill/XYZRefillFactory.php

@@ -37,11 +37,6 @@ class XYZRefillFactory extends RefillBase
         return self::$stInstance;
     }
 
-    private function load()
-    {
-        $this->mPolicy->load();
-    }
-
     private function __construct()
     {
         parent::__construct();

+ 3 - 0
helper/refill/api/test/baidu/RefillPhone.php

@@ -6,6 +6,7 @@ require_once(BASE_HELPER_RAPI_PATH . '/baidu/config.php');
 
 use refill;
 use mtopcard;
+use Log;
 
 class RefillPhone extends refill\IRefillPhone
 {
@@ -35,7 +36,9 @@ class RefillPhone extends refill\IRefillPhone
         $sign = $this->sign($params);
         $params['sign'] = $sign;
 
+        Log::record("start request",Log::DEBUG);
         $resp = http_request(config::ORDER_URL, $params , 'POST' , false);
+        Log::record("recv length = " . strlen($resp),Log::DEBUG);
         if ($resp === false) {
             return [false, '系统错误',true];
         } else {

+ 12 - 0
helper/refill/util.php

@@ -10,6 +10,7 @@ use mtopcard;
 use Log;
 use Exception;
 use Cache;
+use QueueClient;
 
 class util
 {
@@ -150,6 +151,17 @@ class util
         return true;
     }
 
+    public static function async_add($method,$params)
+    {
+        try {
+            QueueClient::async_push("AysncAddDispatcher",['method' => $method,'params'=> $params],1);
+            return true;
+        }
+        catch (Exception $ex) {
+            return false;
+        }
+    }
+
     public static function push_add($params)
     {
         try {

+ 71 - 55
rdispatcher/codispatcher.php

@@ -1,7 +1,7 @@
 <?php
 declare(strict_types=0);
 
-define('APP_ID', 'rdispatcher');
+define('APP_ID', 'cordispatcher');
 define('MOBILE_SERVER',true);
 define('USE_COROUTINE',true);
 define('SUPPORT_PTHREAD',false);
@@ -21,29 +21,10 @@ require_once(BASE_PATH . '/processor.php');
 require_once(BASE_PATH . '/coprocessor.php');
 require_once(BASE_PATH . '/proxy.php');
 
-//这个可以
-//Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP ^ SWOOLE_HOOK_UDP ^
-//    SWOOLE_HOOK_FILE ^ SWOOLE_HOOK_UNIX ^
-//    SWOOLE_HOOK_STREAM_FUNCTION ^
-//    SWOOLE_HOOK_BLOCKING_FUNCTION ^
-//    SWOOLE_HOOK_PROC ^
-//    SWOOLE_HOOK_SLEEP ^
-//    SWOOLE_HOOK_TLS ^
-//    SWOOLE_HOOK_SSL
-//]);
-
-//这样会导致,curl 协程部分不执行,收不到数据
-//Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL | SWOOLE_HOOK_SOCKETS]);
-
-//这样会导致,redis任何阻塞函数不返回
-//Co::set(['hook_flags' => SWOOLE_HOOK_ALL]);
-
-//目前没发现问题
-//Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP]);
 Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL]);
 
 if (empty($_SERVER['argv'][1])) exit('parameter error');
-$count = intval($_SERVER['argv'][1]);
+$process_count = intval($_SERVER['argv'][1]);
 
 function all_channels() {
     return ['refill'];
@@ -61,46 +42,81 @@ function handle_error($level, $message, $file, $line)
     Log::record($trace,Log::ERR);
 }
 
+function sub_message($channels)
+{
+    while (true)
+    {
+        try {
+            $redis = new Swoole\Coroutine\Redis();
+            $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
+
+            if(!$ret) {
+                Log::record("sub_message cannot connet redis.",Log::DEBUG);
+                $redis->close();
+                Swoole\Coroutine::sleep(1);
+            }
+            elseif($redis->subscribe($channels))
+            {
+                Log::record("sub_message starting recv",Log::DEBUG);
+                while ($msg = $redis->recv())
+                {
+                    [$sub_type, $channel, $content] = $msg;
+                    Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$content}",Log::DEBUG);
+
+                    $content = unserialize($content);
+                    $type = $content['type'];
+                    Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
+
+                    if($channel == 'refill' && $type == 'channel') {
+                        Log::record("recv publish message reload refill",Log::DEBUG);
+                        refill\RefillFactory::instance()->load();
+                    }
+                }
+                $redis->close();
+                Log::record("sub_message redis close",Log::DEBUG);
+            }
+            else {
+                Log::record("subscribe publish message error",Log::ERR);
+            }
+        }
+        catch (Exception $ex)
+        {
+            Log::record($ex->getMessage(),Log::ERR);
+        }
+    }
+}
 
-Co\run(function() {
-    Base::run_util();
-    set_error_handler('handle_error');
-    $looper = new coprocessor();
-    $looper->run();
-});
 
-//function work_proc()
+//Co\run(function()
 //{
 //    Base::run_util();
-//
-////    go(function () {
-////        $redis = new Swoole\Coroutine\Redis;
-////        $ret = $redis->connect('host.docker.internal', 6379);
-////        $key = 'test_cache_data';
-////        $ret = wkcache($key, ["hello world"]);
-////        $data = rkcache($key);
-////    });
-//
 //    set_error_handler('handle_error');
-//    $looper = new processor();
+//    go(function () {
+//        sub_message(['refill']);
+//    });
+//    $looper = new processor(true);
 //    $looper->run();
-//}
-//
-//Co\run(function() {
-//    Log::record("start run process",Log::DEBUG);
-//    work_proc();
 //});
 
-//swoole 和 pthreads 冲突
-//function corun()
-//{
-//    Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP]);
-//    Co\run(function() {
-//        Log::record("start run process",Log::DEBUG);
-//        work_proc();
-//    });
-//}
-////corun();
-//work_proc();
+for ($i = 0; $i < $process_count;$i++)
+{
+    $process = new Swoole\Process(function(Swoole\Process $worker)
+    {
+        Base::run_util();
+        set_error_handler('handle_error');
+        go(function () {
+            sub_message(['refill']);
+        });
+        $looper = new processor(true);
+        $looper->run();
+
+    }, false, false, true);
+
+    $process->start();
+}
 
-//event\util::fork_workerex('corun',$count);
+for ($i = 0; $i < $process_count;$i++)
+{
+    $status = Swoole\Process::wait(true);
+    Log::record("Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
+}

+ 1 - 1
test/TestDispatcher.php

@@ -73,7 +73,7 @@ class TestDispatcher extends TestCase
     }
     public function testAddH()
     {
-        foreach (range(1,100,1) as $i) {
+        foreach (range(1,500,1) as $i) {
             queue\DispatcherClient::instance()->push('add',$this->make_order());
         }
     }

+ 4 - 1
test/coroutine/behavior/preemptive_timer.php

@@ -13,9 +13,12 @@ go(function (){
                 $exit = true;
             });
         }
+        else {
+            Swoole\Coroutine::sleep(1);
+        }
+
         if ($exit) {
             echo "cid ".Swoole\Coroutine::getCid()." break\n";
-            break;
         }
     }
     echo "cid ".Swoole\Coroutine::getCid()." exit\n";

+ 26 - 0
test/coroutine/behavior/preemptive_timerex.php

@@ -0,0 +1,26 @@
+<?php
+co::set(['enable_preemptive_scheduler' => true]);
+
+Co\run(function ()
+{
+    while (true)
+    {
+        $res = Swoole\Coroutine::stats();
+        $num = $res['coroutine_num'];
+        echo "Coroutine counts = {$num} \n";
+
+        if ($num < 1000)
+        {
+            go(function () {
+                echo "cid:" . Swoole\Coroutine::getCid() . " start\n";
+                Swoole\Coroutine::sleep(1);
+                echo "cid " . Swoole\Coroutine::getCid() . " end\n";
+            });
+        } else {
+//            Swoole\Coroutine::sleep(0.1);
+        }
+    }
+    echo "cid " . Swoole\Coroutine::getCid() . " exit\n";
+});
+echo "main end\n";
+

+ 13 - 13
test/coroutine/redis/sub.php

@@ -1,21 +1,21 @@
 <?php
 go(function () {
 	$redis = new Swoole\Coroutine\Redis();
-	$ret = $redis->connect('127.0.0.1', 6379);
-
-	$msg = $redis->subscribe(array("msg_1"));
-	while ($msg = $redis->recv()) 
+	$ret = $redis->connect('192.168.1.220', 6379);
+	$msg = $redis->subscribe(['refill']);
+	while (true)
 	{
+        $msg = $redis->recv(60);
 		var_dump($msg);
-	}
-});
+        $redis->close();
 
-go(function () {
-	$redis = new Swoole\Coroutine\Redis();
-	$redis->connect('127.0.0.1', 6379);
-	$msg = $redis->subscribe(array("msg_2"));
-	while ($msg = $redis->recv()) 
-	{
-		var_dump($msg);
+
+        if(empty($msg)) continue;
+		if(count($msg) !== 3) continue;
+		$channel = $msg[1];
+		$content = unserialize($msg[2]);
+
+		$type = $content['type'];
+		$val = $content['value'];
 	}
 });