|
@@ -21,8 +21,6 @@ require_once(BASE_PATH . '/processor.php');
|
|
|
require_once(BASE_PATH . '/coprocessor.php');
|
|
|
require_once(BASE_PATH . '/proxy.php');
|
|
|
|
|
|
-Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL]);
|
|
|
-
|
|
|
if (empty($_SERVER['argv'][1])) exit('parameter error');
|
|
|
$process_count = intval($_SERVER['argv'][1]);
|
|
|
|
|
@@ -42,13 +40,25 @@ function handle_error($level, $message, $file, $line)
|
|
|
Log::record($trace,Log::ERR);
|
|
|
}
|
|
|
|
|
|
+$waiting_quit = false;
|
|
|
+
|
|
|
function sub_message($channels)
|
|
|
{
|
|
|
- while (true)
|
|
|
+ global $waiting_quit;
|
|
|
+
|
|
|
+ $redis = new Swoole\Coroutine\Redis();
|
|
|
+
|
|
|
+ while (!$waiting_quit)
|
|
|
{
|
|
|
- try {
|
|
|
- $redis = new Swoole\Coroutine\Redis();
|
|
|
- $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if(!$redis->connected) {
|
|
|
+ $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
|
|
|
+ Log::record("sub_message redis connected = {$redis->connected}",Log::DEBUG);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ $ret = true;
|
|
|
+ }
|
|
|
|
|
|
if(!$ret) {
|
|
|
Log::record("sub_message cannot connet redis.",Log::DEBUG);
|
|
@@ -57,26 +67,22 @@ function sub_message($channels)
|
|
|
}
|
|
|
elseif($redis->subscribe($channels))
|
|
|
{
|
|
|
- Log::record("sub_message starting recv",Log::DEBUG);
|
|
|
while ($msg = $redis->recv())
|
|
|
{
|
|
|
[$sub_type, $channel, $content] = $msg;
|
|
|
- Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$content}",Log::DEBUG);
|
|
|
-
|
|
|
$content = unserialize($content);
|
|
|
$type = $content['type'];
|
|
|
- Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
|
|
|
|
|
|
- if($channel == 'refill' && $type == 'channel') {
|
|
|
- Log::record("recv publish message reload refill",Log::DEBUG);
|
|
|
+ if($channel != 'refill') continue;
|
|
|
+ if($type == 'channel' || $type == 'merchant') {
|
|
|
+ Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
|
|
|
refill\RefillFactory::instance()->load();
|
|
|
}
|
|
|
}
|
|
|
- $redis->close();
|
|
|
- Log::record("sub_message redis close",Log::DEBUG);
|
|
|
+ Log::record("sub_message redis recv timeout",Log::DEBUG);
|
|
|
}
|
|
|
else {
|
|
|
- Log::record("subscribe publish message error",Log::ERR);
|
|
|
+ Log::record("sub_message subscribe error",Log::ERR);
|
|
|
}
|
|
|
}
|
|
|
catch (Exception $ex)
|
|
@@ -84,11 +90,28 @@ function sub_message($channels)
|
|
|
Log::record($ex->getMessage(),Log::ERR);
|
|
|
}
|
|
|
}
|
|
|
+ Log::record("sub_message quit",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
//Co\run(function()
|
|
|
//{
|
|
|
+// Swoole\Process::signal(SIGINT, function($signal_num) {
|
|
|
+// Log::record("signal call 1 = $signal_num",Log::DEBUG);
|
|
|
+// });
|
|
|
+// Swoole\Process::signal(SIGHUP, function($signal_num) {
|
|
|
+// Log::record("signal call 1 = $signal_num",Log::DEBUG);
|
|
|
+// });
|
|
|
+// Swoole\Process::signal(SIGQUIT, function($signal_num) {
|
|
|
+// Log::record("signal call 1 = $signal_num",Log::DEBUG);
|
|
|
+// });
|
|
|
+// Swoole\Process::signal(SIGTERM, function($signal_num) {
|
|
|
+// Log::record("signal call 1 = $signal_num",Log::DEBUG);
|
|
|
+//
|
|
|
+// });
|
|
|
+// Swoole\Process::signal(SIGKILL, function($signal_num) {
|
|
|
+// Log::record("signal call 1 = $signal_num",Log::DEBUG);
|
|
|
+// });
|
|
|
+//
|
|
|
// Base::run_util();
|
|
|
// set_error_handler('handle_error');
|
|
|
// go(function () {
|
|
@@ -98,25 +121,77 @@ function sub_message($channels)
|
|
|
// $looper->run();
|
|
|
//});
|
|
|
|
|
|
+Swoole\Runtime::enableCoroutine(false);
|
|
|
for ($i = 0; $i < $process_count;$i++)
|
|
|
{
|
|
|
- $process = new Swoole\Process(function(Swoole\Process $worker)
|
|
|
+ global $waiting_quit;
|
|
|
+
|
|
|
+ $process = new Swoole\Process(function(Swoole\Process $worker) use (&$waiting_quit)
|
|
|
{
|
|
|
+ Co::set(['hook_flags' => SWOOLE_HOOK_NATIVE_CURL]);
|
|
|
+ Swoole\Process::signal(SIGINT, function($signal_num) use ($worker) {
|
|
|
+ Log::record("signal call 1 = $signal_num, #{$worker->pid}",Log::DEBUG);
|
|
|
+ });
|
|
|
+ Swoole\Process::signal(SIGHUP, function($signal_num) use ($worker) {
|
|
|
+ Log::record("signal call 2 = $signal_num, #{$worker->pid}",Log::DEBUG);
|
|
|
+ });
|
|
|
+ Swoole\Process::signal(SIGQUIT, function($signal_num) use ($worker) {
|
|
|
+ Log::record("signal call 3 = $signal_num, #{$worker->pid}",Log::DEBUG);
|
|
|
+ });
|
|
|
+ Swoole\Process::signal(SIGTERM, function($signal_num) use ($worker,&$waiting_quit)
|
|
|
+ {
|
|
|
+ swoole_async_set(['enable_coroutine' => false]);
|
|
|
+ Log::record("signal call 4 = $signal_num, #{$worker->pid}",Log::DEBUG);
|
|
|
+ $waiting_quit = true;
|
|
|
+ });
|
|
|
+ Swoole\Process::signal(SIGKILL, function($signal_num) use ($worker) {
|
|
|
+ Log::record("signal call 5 = $signal_num, #{$worker->pid}",Log::DEBUG);
|
|
|
+ });
|
|
|
+
|
|
|
Base::run_util();
|
|
|
set_error_handler('handle_error');
|
|
|
go(function () {
|
|
|
sub_message(['refill']);
|
|
|
});
|
|
|
- $looper = new processor(true);
|
|
|
- $looper->run();
|
|
|
+
|
|
|
+// $looper = new processor(true);
|
|
|
+// $looper->run();
|
|
|
|
|
|
}, false, false, true);
|
|
|
|
|
|
- $process->start();
|
|
|
+ $pid = $process->start();
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+Log::record("main process",Log::DEBUG);
|
|
|
+
|
|
|
+Swoole\Process::signal(SIGINT, function ($signo) {
|
|
|
+ Log::record("shutdown 1.", Log::DEBUG);
|
|
|
+});
|
|
|
+Swoole\Process::signal(SIGHUP, function ($signo) {
|
|
|
+ Log::record("shutdown 2.", Log::DEBUG);
|
|
|
+});
|
|
|
+Swoole\Process::signal(SIGQUIT, function ($signo) {
|
|
|
+ Log::record("shutdown 3.", Log::DEBUG);
|
|
|
+});
|
|
|
+Swoole\Process::signal(SIGTERM, function ($signo) {
|
|
|
+ Log::record("shutdown 4.", Log::DEBUG);
|
|
|
+});
|
|
|
+
|
|
|
+swoole_process::signal(SIGCHLD, function () {
|
|
|
+ Log::record("shutdown 5.", Log::DEBUG);
|
|
|
+});
|
|
|
+
|
|
|
+Swoole\Process::signal(SIGKILL, function ($signal_num) {
|
|
|
+ Log::record("shutdown 6.", Log::DEBUG);
|
|
|
+});
|
|
|
+
|
|
|
for ($i = 0; $i < $process_count;$i++)
|
|
|
{
|
|
|
+ Log::record("process {$i} start wait",Log::DEBUG);
|
|
|
$status = Swoole\Process::wait(true);
|
|
|
Log::record("Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
|
|
|
-}
|
|
|
+}
|
|
|
+Log::record("Quit all",Log::DEBUG);
|
|
|
+
|
|
|
+
|