ソースを参照

fix coqueue gracefull quit

stanley-king 4 年 前
コミット
749905698e

+ 12 - 1
docker/compose/stanley/docker-compose.yml

@@ -175,4 +175,15 @@ services:
     depends_on:
       - "redisrv"
       - "websrv"
-      - "searcher"
+      - "searcher"
+
+  async:
+    image: php-swool-redis:latest
+    volumes:
+      - ../../conf/etc/localtime:/etc/localtime:ro
+      - ../../conf/php/php-swoole-debug.ini:/usr/local/etc/php/php.ini
+      - ../../../:/var/www/html
+      - /Volumes/Transcend/upload:/var/www/html/data/upload
+      - /Users/stanley-king/work/PHPProject/shoplog:/var/www/html/data/log
+    container_name: "panda-async"
+    command: [php, "/var/www/html//test/examples/process/async_master.php"]

+ 28 - 0
helper/queue/iqueue.php

@@ -55,6 +55,21 @@ class IQueueDB
         }
     }
 
+    public function close()
+    {
+        if ($this->_comode) {
+            if ($this->_redis->connected) {
+                $this->_redis->close();
+                Log::record(__FUNCTION__ . " 1",Log::DEBUG);
+            }
+        } elseif ($this->_redis->isConnected()) {
+            $this->_redis->close();
+            Log::record(__FUNCTION__ . " 2",Log::DEBUG);
+        } else {
+            Log::record("redis has closed",Log::DEBUG);
+        }
+    }
+
     public function rpush($value)
     {
         try {
@@ -163,6 +178,11 @@ class IServer
     public function scan() {
         return $this->_queuedb->scan();
     }
+
+    public function stop()
+    {
+        $this->_queuedb->close();
+    }
 }
 
 abstract class ILooper
@@ -187,6 +207,13 @@ abstract class ILooper
 
     abstract protected function handle($msg);
 
+    public function stop()
+    {
+        Log::record(__FUNCTION__,Log::DEBUG);
+        $this->_stop = true;
+        $this->mServer->stop();
+    }
+
     public function run()
     {
         $queues = $this->mServer->scan();
@@ -245,6 +272,7 @@ abstract class ILooper
                 Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
             }
         }
+        Log::record("ILooper Run quit.", Log::DEBUG);
     }
 
     private function sig_handler($signo)

+ 61 - 79
rdispatcher/codispatcher.php

@@ -21,6 +21,8 @@ require_once(BASE_PATH . '/processor.php');
 require_once(BASE_PATH . '/coprocessor.php');
 require_once(BASE_PATH . '/proxy.php');
 
+Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL]);
+
 if (empty($_SERVER['argv'][1])) exit('parameter error');
 $process_count = intval($_SERVER['argv'][1]);
 
@@ -40,21 +42,16 @@ function handle_error($level, $message, $file, $line)
     Log::record($trace,Log::ERR);
 }
 
-$waiting_quit = false;
-
-function sub_message($channels)
+function sub_message(&$waiting_quit,&$sub_redis,$channels)
 {
-    global $waiting_quit;
-
-    $redis = new Swoole\Coroutine\Redis();
-
+    $sub_redis = new Swoole\Coroutine\Redis();
     while (!$waiting_quit)
     {
         try
         {
-            if(!$redis->connected) {
-                $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
-                Log::record("sub_message redis connected = {$redis->connected}",Log::DEBUG);
+            if(!$sub_redis->connected) {
+                $ret = $sub_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
+                Log::record("sub_message redis connected = {$sub_redis->connected}",Log::DEBUG);
             }
             else {
                 $ret = true;
@@ -62,12 +59,12 @@ function sub_message($channels)
 
             if(!$ret) {
                 Log::record("sub_message cannot connet redis.",Log::DEBUG);
-                $redis->close();
+                $sub_redis->close();
                 Swoole\Coroutine::sleep(1);
             }
-            elseif($redis->subscribe($channels))
+            elseif($sub_redis->subscribe($channels))
             {
-                while ($msg = $redis->recv())
+                while ($msg = $sub_redis->recv())
                 {
                     [$sub_type, $channel, $content] = $msg;
                     $content = unserialize($content);
@@ -90,27 +87,18 @@ function sub_message($channels)
             Log::record($ex->getMessage(),Log::ERR);
         }
     }
+
+    $waiting_quit = true;
     Log::record("sub_message quit",Log::DEBUG);
 }
 
 //Co\run(function()
 //{
-//    Swoole\Process::signal(SIGINT, function($signal_num) {
-//        Log::record("signal call 1 = $signal_num",Log::DEBUG);
-//    });
-//    Swoole\Process::signal(SIGHUP, function($signal_num) {
-//        Log::record("signal call 1 = $signal_num",Log::DEBUG);
-//    });
-//    Swoole\Process::signal(SIGQUIT, function($signal_num) {
-//        Log::record("signal call 1 = $signal_num",Log::DEBUG);
-//    });
 //    Swoole\Process::signal(SIGTERM, function($signal_num) {
 //        Log::record("signal call 1 = $signal_num",Log::DEBUG);
 //
 //    });
-//    Swoole\Process::signal(SIGKILL, function($signal_num) {
-//        Log::record("signal call 1 = $signal_num",Log::DEBUG);
-//    });
+
 //
 //    Base::run_util();
 //    set_error_handler('handle_error');
@@ -121,76 +109,70 @@ function sub_message($channels)
 //    $looper->run();
 //});
 
-Swoole\Runtime::enableCoroutine(false);
+$workers = [];
 for ($i = 0; $i < $process_count;$i++)
 {
-    global $waiting_quit;
-
-    $process = new Swoole\Process(function(Swoole\Process $worker) use (&$waiting_quit)
+    $process = new Swoole\Process(function(Swoole\Process $worker)
     {
-        Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL]);
-        Swoole\Process::signal(SIGINT, function($signal_num) use ($worker) {
-            Log::record("signal call 1 = $signal_num, #{$worker->pid}",Log::DEBUG);
-        });
-        Swoole\Process::signal(SIGHUP, function($signal_num) use ($worker) {
-            Log::record("signal call 2 = $signal_num, #{$worker->pid}",Log::DEBUG);
-        });
-        Swoole\Process::signal(SIGQUIT, function($signal_num) use ($worker) {
-            Log::record("signal call 3 = $signal_num, #{$worker->pid}",Log::DEBUG);
+        Base::run_util();
+        set_error_handler('handle_error');
+        $waiting_quit = false;
+        $sub_redis = null;
+        go(function () use (&$waiting_quit,&$sub_redis) {
+            sub_message($waiting_quit,$sub_redis,['refill']);
         });
-        Swoole\Process::signal(SIGTERM, function($signal_num) use ($worker,&$waiting_quit)
+
+        $looper = new processor(true);
+        Swoole\Process::signal(SIGTERM, function($signal_num) use ($worker,&$waiting_quit,$sub_redis,$looper)
         {
-            swoole_async_set(['enable_coroutine' => false]);
-            Log::record("signal call 4 = $signal_num, #{$worker->pid}",Log::DEBUG);
-            $waiting_quit = true;
-        });
-        Swoole\Process::signal(SIGKILL, function($signal_num) use ($worker) {
-            Log::record("signal call 5 = $signal_num, #{$worker->pid}",Log::DEBUG);
-        });
+            Log::record("signal call SIGTERM begin = $signal_num, #{$worker->pid}",Log::DEBUG);
 
-        Base::run_util();
-        set_error_handler('handle_error');
-        go(function () {
-            sub_message(['refill']);
+            set_error_handler(null);
+            try {
+                $waiting_quit = true;
+                $sub_redis->close();
+            }
+            catch(Exception $ex) {
+                Log::record($ex->getMessage(),Log::DEBUG);
+            }
+
+            $looper->stop();
+            do {
+                $res = Swoole\Coroutine::stats();
+                $num = $res['coroutine_num'];
+                if($num > 1) {
+                    Swoole\Coroutine::sleep(0.1);
+                }
+                Log::record("coroutine_num = {$num}",Log::DEBUG);
+            } while($num > 1);
         });
 
-//        $looper = new processor(true);
-//        $looper->run();
+        $looper->run();
 
     }, false, false, true);
 
     $pid = $process->start();
+    $workers[$pid] = $process;
 }
 
 
-Log::record("main process",Log::DEBUG);
-
-Swoole\Process::signal(SIGINT, function ($signo) {
-    Log::record("shutdown 1.", Log::DEBUG);
-});
-Swoole\Process::signal(SIGHUP, function ($signo) {
-    Log::record("shutdown 2.", Log::DEBUG);
-});
-Swoole\Process::signal(SIGQUIT, function ($signo) {
-    Log::record("shutdown 3.", Log::DEBUG);
-});
-Swoole\Process::signal(SIGTERM, function ($signo) {
-    Log::record("shutdown 4.", Log::DEBUG);
-});
-
-swoole_process::signal(SIGCHLD, function () {
-    Log::record("shutdown 5.", Log::DEBUG);
-});
-
-Swoole\Process::signal(SIGKILL, function ($signal_num) {
-    Log::record("shutdown 6.", Log::DEBUG);
-});
-
-for ($i = 0; $i < $process_count;$i++)
+Log::record("main process start wait sub process....",Log::DEBUG);
+while (true)
 {
-    Log::record("process {$i} start wait",Log::DEBUG);
-    $status = Swoole\Process::wait(true);
-    Log::record("Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
+    if($status = Swoole\Process::wait(true)) {
+        Log::record("Sub process #{$status['pid']} quit, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
+    }
+    else
+    {
+        foreach ($workers as $pid => $worker)
+        {
+            Swoole\Process::kill($pid,SIGTERM);
+            if($status = Swoole\Process::wait(true)) {
+                Log::record("Graceful Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
+            }
+        }
+        break;
+    }
 }
 Log::record("Quit all",Log::DEBUG);
 

+ 1 - 1
test/TestDispatcher.php

@@ -73,7 +73,7 @@ class TestDispatcher extends TestCase
     }
     public function testAddH()
     {
-        foreach (range(1,500,1) as $i) {
+        foreach (range(1,50000,1) as $i) {
             queue\DispatcherClient::instance()->push('add',$this->make_order());
         }
     }