stanley-king 4 سال پیش
والد
کامیت
b25b26042f

+ 2 - 2
docker/compose/lz-28/docker-compose.yml

@@ -56,12 +56,12 @@ services:
        - "redisrv"
 
   rdsrv:
-    image: php-swool:latest
+    image: php-zts-debug:7.3.18
     volumes:
       - ../../conf/etc/localtime:/etc/localtime:ro
       - ../../../:/var/www/html
       - /nfs/upload:/var/www/html/data/upload
-      - ../../conf/php/lz-php-swoole.ini:/usr/local/etc/php/php.ini
+      - ../../conf/php/lz-php.ini:/usr/local/etc/php/php.ini
     container_name: "panda-dispatcher"
     command: [php,"/var/www/html/rdispatcher/dispatcher.php","20"]
     depends_on:

+ 2 - 2
docker/compose/stanley/docker-compose.yml

@@ -62,12 +62,12 @@ services:
        - "redisrv"
 
   rdsrv:
-    image: php-swool:latest
+    image: php-zts-debug:7.3.18
     volumes:
       - ../../conf/etc/localtime:/etc/localtime:ro
       - ../../../:/var/www/html
       - /Volumes/Transcend/upload:/var/www/html/data/upload
-      - ../../conf/php/php-swoole-debug.ini:/usr/local/etc/php/php.ini
+      - ../../conf/php/php-local-debug.ini:/usr/local/etc/php/php.ini
     links:
       - redisrv
     container_name: "panda-dispatcher"

+ 4 - 6
helper/queue/iqueue.php

@@ -143,11 +143,10 @@ abstract class ILooper
     public function prepare()
     {
         if (ob_get_level()) ob_end_clean();
-
-//        pcntl_signal(SIGINT,  [$this,'sig_handler']);
-//        pcntl_signal(SIGHUP,  [$this,'sig_handler']);
-//        pcntl_signal(SIGQUIT, [$this,'sig_handler']);
-//        pcntl_signal(SIGTERM, [$this,'sig_handler']);
+        pcntl_signal(SIGINT,  [$this,'sig_handler']);
+        pcntl_signal(SIGHUP,  [$this,'sig_handler']);
+        pcntl_signal(SIGQUIT, [$this,'sig_handler']);
+        pcntl_signal(SIGTERM, [$this,'sig_handler']);
     }
 
     abstract protected function handle($msg);
@@ -157,7 +156,6 @@ abstract class ILooper
         $queues = $this->mServer->scan();
         while (true)
         {
-//          pcntl_signal_dispatch();
             try
             {
                 if ($this->_stop) break;

+ 85 - 0
rdispatcher/codispatcher.php

@@ -0,0 +1,85 @@
+<?php
+declare(strict_types=0);
+
+define('APP_ID', 'rdispatcher');
+define('MOBILE_SERVER',true);
+define('SUPPORT_PTHREAD',false);
+
+define('BASE_ROOT_PATH',str_replace('/rdispatcher','',dirname(__FILE__)));
+define('BASE_PATH',BASE_ROOT_PATH . '/rdispatcher');
+
+require_once(BASE_ROOT_PATH . '/global.php');
+require_once(BASE_ROOT_PATH . '/fooder.php');
+require_once(BASE_HELPER_PATH . '/event_looper.php');
+require_once(BASE_HELPER_PATH . '/queue/rdispatcher.php');
+require_once(BASE_HELPER_PATH . '/refill/CoRefillFactory.php');
+require_once(BASE_HELPER_PATH . '/algorithm.php');
+
+require_once(BASE_PATH . '/processor.php');
+require_once(BASE_PATH . '/proxy.php');
+
+//这个可以
+//Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP ^ SWOOLE_HOOK_UDP ^
+//    SWOOLE_HOOK_FILE ^ SWOOLE_HOOK_UNIX ^
+//    SWOOLE_HOOK_STREAM_FUNCTION ^
+//    SWOOLE_HOOK_BLOCKING_FUNCTION ^
+//    SWOOLE_HOOK_PROC ^
+//    SWOOLE_HOOK_SLEEP ^
+//    SWOOLE_HOOK_TLS ^
+//    SWOOLE_HOOK_SSL
+//]);
+
+//这样会导致,curl 协程部分不执行,收不到数据
+//Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL | SWOOLE_HOOK_SOCKETS]);
+
+//这样会导致,redis任何阻塞函数不返回
+//Co::set(['hook_flags' => SWOOLE_HOOK_ALL]);
+
+//目前没发现问题
+Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP ^ SWOOLE_HOOK_FILE]);
+
+if (empty($_SERVER['argv'][1])) exit('parameter error');
+$count = intval($_SERVER['argv'][1]);
+
+function all_channels() {
+    return ['refill'];
+}
+
+function handle_error($level, $message, $file, $line)
+{
+    if($level == E_NOTICE) return;
+    $trace = "handle_error: level={$level},msg={$message} file={$file},line={$line}\n";
+    $backtrace = debug_backtrace();
+    foreach ($backtrace as $item) {
+        $trace .= "{$item['file']}\t{$item['line']}\t{$item['function']}\n";
+    }
+
+    Log::record($trace,Log::ERR);
+}
+
+function work_proc()
+{
+    Base::run_util();
+    set_error_handler('handle_error');
+    $looper = new processor();
+    $looper->run();
+}
+
+Co\run(function() {
+    Log::record("start run process",Log::DEBUG);
+    work_proc();
+});
+
+//swoole 和 pthreads 冲突
+function corun()
+{
+    Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP]);
+    Co\run(function() {
+        Log::record("start run process",Log::DEBUG);
+        work_proc();
+    });
+}
+//corun();
+work_proc();
+
+//event\util::fork_workerex('corun',$count);

+ 46 - 0
rdispatcher/cpprocessor.php

@@ -0,0 +1,46 @@
+<?php
+
+require_once(BASE_CORE_PATH . '/framework/function/http.php');
+
+class processor extends queue\ILooper
+{
+    public function __construct()
+    {
+        parent::__construct(new queue\DispatcherServer());
+    }
+
+    protected function handle($msg)
+    {
+        if (empty($msg)) {
+            Log::record('empty body', Log::DEBUG);
+        }
+        else
+        {
+            foreach ($msg as $key => $params)
+            {
+                Log::record("start one", Log::DEBUG);
+                if (empty($params)) continue;
+
+                $method = strtolower($key);
+                $proxy = new proxy();
+                go(function () use ($proxy, $params, $method)
+                {
+                    try
+                    {
+                        if ($method == 'add') {
+                            $proxy->add($params);
+                        } elseif ($method == 'notify') {
+
+                        } elseif ($method == 'notify_mechant') {
+
+                        } else {
+                            Log::record("Error msg", Log::DEBUG);
+                        }
+                    } catch (Exception $x) {
+                        Log::record($x->getMessage(), Log::ERR);
+                    }
+                });
+            }
+        }
+    }
+}

+ 14 - 32
rdispatcher/dispatcher.php

@@ -3,7 +3,7 @@ declare(strict_types=0);
 
 define('APP_ID', 'rdispatcher');
 define('MOBILE_SERVER',true);
-define('SUPPORT_PTHREAD',false);
+define('SUPPORT_PTHREAD',true);
 
 define('BASE_ROOT_PATH',str_replace('/rdispatcher','',dirname(__FILE__)));
 define('BASE_PATH',BASE_ROOT_PATH . '/rdispatcher');
@@ -14,30 +14,13 @@ require_once(BASE_HELPER_PATH . '/event_looper.php');
 require_once(BASE_HELPER_PATH . '/queue/rdispatcher.php');
 require_once(BASE_HELPER_PATH . '/refill/CoRefillFactory.php');
 require_once(BASE_HELPER_PATH . '/algorithm.php');
+require_once(BASE_HELPER_PATH . '/message/msgstates.php');
+require_once(BASE_HELPER_PATH . '/message/msgutil.php');
+require_once(BASE_HELPER_PATH . '/message/subscriber.php');
 
 require_once(BASE_PATH . '/processor.php');
 require_once(BASE_PATH . '/proxy.php');
 
-//这个可以
-//Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP ^ SWOOLE_HOOK_UDP ^
-//    SWOOLE_HOOK_FILE ^ SWOOLE_HOOK_UNIX ^
-//    SWOOLE_HOOK_STREAM_FUNCTION ^
-//    SWOOLE_HOOK_BLOCKING_FUNCTION ^
-//    SWOOLE_HOOK_PROC ^
-//    SWOOLE_HOOK_SLEEP ^
-//    SWOOLE_HOOK_TLS ^
-//    SWOOLE_HOOK_SSL
-//]);
-
-//这样会导致,curl 协程部分不执行,收不到数据
-//Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL | SWOOLE_HOOK_SOCKETS]);
-
-//这样会导致,redis任何阻塞函数不返回
-//Co::set(['hook_flags' => SWOOLE_HOOK_ALL]);
-
-//目前没发现问题
-Co::set(['hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP]);
-
 if (empty($_SERVER['argv'][1])) exit('parameter error');
 $count = intval($_SERVER['argv'][1]);
 
@@ -57,23 +40,22 @@ function handle_error($level, $message, $file, $line)
     Log::record($trace,Log::ERR);
 }
 
+$gMessageStates = null;
 function work_proc()
 {
     Base::run_util();
     set_error_handler('handle_error');
+
+    global $gMessageStates;
+    $gMessageStates = new MsgStates();
+    StatesHelper::init();
+    $listener = new message\subscriber($gMessageStates);
+    $listener->start();
+
     $looper = new processor();
     $looper->run();
 }
 
-//swoole 和 pthreads 冲突
-function corun()
-{
-    Log::record("start run process",Log::DEBUG);
-    Co\run(function() {
-        work_proc();
-    });
-}
-//corun();
-//work_proc();
+work_proc();
 
-event\util::fork_workerex('corun',$count);
+//event\util::fork_workerex('work_proc',$count);

+ 14 - 30
rdispatcher/processor.php

@@ -11,48 +11,32 @@ class processor extends queue\ILooper
 
     protected function handle($msg)
     {
-        if(empty($msg)) {
-            Log::record('empty body',Log::DEBUG);
+        if (empty($msg)) {
+            Log::record('empty body', Log::DEBUG);
         }
         else
         {
             foreach ($msg as $key => $params)
             {
                 Log::record("start one", Log::DEBUG);
-                if(empty($params)) continue;
+                if (empty($params)) continue;
 
-                try {
-                    $method = strtolower($key);
-                    $proxy = new proxy();
-                    if($method == 'add')
-                    {
+                $method = strtolower($key);
+                $proxy = new proxy();
+                try
+                {
+                    if ($method == 'add') {
                         $proxy->add($params);
+                    } elseif ($method == 'notify') {
 
+                    } elseif ($method == 'notify_mechant') {
 
-//                    go(function () use ($proxy,$params)
-//                    {
-//                        try {
-//                            $proxy->add($params);
-//                        }
-//                        catch (Exception $x) {
-//                            Log::record($x->getMessage(),Log::ERR);
-//                        }
-//                    });
+                    } else {
+                        Log::record("Error msg", Log::DEBUG);
                     }
-                    elseif($method == 'notify') {
-
-                    }
-                    elseif($method == 'notify_mechant') {
-
-                    }
-                    else {
-                        Log::record("Error msg",Log::DEBUG);
-                    }
-                }
-                catch (Exception $x) {
-                    Log::record($x->getMessage(),Log::ERR);
+                } catch (Exception $x) {
+                    Log::record($x->getMessage(), Log::ERR);
                 }
-
             }
         }
     }

+ 1 - 1
test/TestAccRefill.php

@@ -50,7 +50,7 @@ class TestAccRefill extends TestCase
     {
         $url = $this->mReqHost . "/racc/index.php";
 //        $url = 'https://www.baidu.com.cn';
-        for ($i = 0; $i < 1; $i++)
+        for ($i = 0; $i < 10; $i++)
         {
             $params = $this->make_order();
             $resp = $this->send_md5($url, $params);