stanley-king 3 yıl önce
ebeveyn
işleme
4da224773d

+ 5 - 1
docker/compose/xyzt/arw/docker-compose.yml

@@ -39,7 +39,7 @@ services:
       - /nfs/upload:/var/www/html/data/upload
       - /mnt/testlog:/var/www/html/data/log
     container_name: "panda-codispatcher"
-    command: [php,"/var/www/html/rdispatcher/coall.php","16"]
+    command: [php,"/var/www/html/rdispatcher/codispatcher.php","16"]
     deploy:
       resources:
         limits:
@@ -55,6 +55,10 @@ services:
       - /mnt/testlog:/var/www/html/data/log
     container_name: "panda-coall"
     command: [php,"/var/www/html/rdispatcher/coall.php","1"]
+    deploy:
+      resources:
+        limits:
+          cpus: '8'
 
   queuesrv:
     image: php-zts-debug:7.3.18

+ 4 - 2
helper/queue/iqueue.php

@@ -8,6 +8,7 @@ require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
 use Redis;
 use Exception;
 use Log;
+use refill\util;
 use Swoole;
 
 class IQueueDB
@@ -305,12 +306,13 @@ abstract class ILooper
                         if($this->_stop)
                         {
                             foreach ($content as $key => $params) {
-                                DispatcherClient::instance()->push($key, $params);
+                                util::push_queue($key, $params);
                             }
                         }
                         else
                         {
-                            go(function ()use ($content,$num,$mem) {
+                            go(function ()use ($content,$num,$mem)
+                            {
                                 $start = microtime(true);
                                 Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}",Log::DEBUG);
 

+ 18 - 50
helper/refill/util.php

@@ -224,12 +224,7 @@ class util
     {
         try
         {
-            if(defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('add', $params);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('add', $params);
-            }
-
+            $ret = self::push_queue('add', $params);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -241,11 +236,7 @@ class util
     {
         try
         {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('add_zero', $params);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('add_zero', $params);
-            }
+            $ret = self::push_queue('add_zero', $params);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -257,11 +248,7 @@ class util
     {
         try
         {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('addthird', $params);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('addthird', $params);
-            }
+            $ret = self::push_queue('addthird', $params);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -273,11 +260,7 @@ class util
     {
         try
         {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('notify', ['channel' => $chname, 'params' => $params]);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('notify', ['channel' => $chname, 'params' => $params]);
-            }
+            $ret = self::push_queue('notify', ['channel' => $chname, 'params' => $params]);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -289,11 +272,7 @@ class util
     {
         try
         {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('notify_mechant', ['order_id' => $order_id, 'manual' => $manual]);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('notify_mechant', ['order_id' => $order_id, 'manual' => $manual]);
-            }
+            $ret = self::push_queue('notify_mechant', ['order_id' => $order_id, 'manual' => $manual]);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -305,11 +284,7 @@ class util
     {
         try
         {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('query', ['order_id' => $order_id]);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('query', ['order_id' => $order_id]);
-            }
+            $ret = self::push_queue('query', ['order_id' => $order_id]);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -320,11 +295,7 @@ class util
     public static function push_query_net($order_id)
     {
         try {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('query_net', ['order_id' => $order_id]);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('query_net', ['order_id' => $order_id]);
-            }
+            $ret = self::push_queue('query_net', ['order_id' => $order_id]);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -336,11 +307,7 @@ class util
     {
         try
         {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('manual_success', ['order_id' => $order_id]);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('manual_success', ['order_id' => $order_id]);
-            }
+            $ret = self::push_queue('manual_success', ['order_id' => $order_id]);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -352,11 +319,7 @@ class util
     {
         try
         {
-            if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
-                $ret = self::push_queue('manual_cancel', ['order_id' => $order_id]);
-            } else {
-                $ret = queue\DispatcherClient::instance()->push('manual_cancel', ['order_id' => $order_id]);
-            }
+            $ret = self::push_queue('manual_cancel', ['order_id' => $order_id]);
             return $ret !== false;
         }
         catch (Exception $ex) {
@@ -364,11 +327,16 @@ class util
         }
     }
 
-    private static function push_queue($method, $value)
+    public static function push_queue($method, $value)
     {
-        $queue_name = 'QUEUE_DISPATCHER_CO';
-        $ins = Cache::getInstance('cacheredis');
-        return $ins->lpush($queue_name, serialize([$method => $value]));
+        if (defined('USE_COROUTINE') && USE_COROUTINE && defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP) {
+            $queue_name = 'QUEUE_DISPATCHER_CO';
+            $ins = Cache::getInstance('cacheredis');
+            return $ins->lpush($queue_name, serialize([$method => $value]));
+        }
+        else {
+            return queue\DispatcherClient::instance()->push($method,$value);
+        }
     }
 
     public static function dispatcher_queue_length()