|
@@ -28,6 +28,10 @@ function all_channels() {
|
|
return ['refill'];
|
|
return ['refill'];
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+function strbool($value) {
|
|
|
|
+ return $value ? 'true' : 'false';
|
|
|
|
+}
|
|
|
|
+
|
|
function handle_error($level, $message, $file, $line)
|
|
function handle_error($level, $message, $file, $line)
|
|
{
|
|
{
|
|
if($level == E_NOTICE) return;
|
|
if($level == E_NOTICE) return;
|
|
@@ -40,44 +44,47 @@ function handle_error($level, $message, $file, $line)
|
|
Log::record($trace,Log::ERR);
|
|
Log::record($trace,Log::ERR);
|
|
}
|
|
}
|
|
|
|
|
|
-function sub_message(&$waiting_quit,&$sub_redis,$channels)
|
|
|
|
|
|
+function subscribe_message(&$quit, &$redis, $channels)
|
|
{
|
|
{
|
|
- $sub_redis = new Swoole\Coroutine\Redis();
|
|
|
|
- while (!$waiting_quit)
|
|
|
|
|
|
+ $redis = new Swoole\Coroutine\Redis();
|
|
|
|
+ while (!$quit)
|
|
{
|
|
{
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- if(!$sub_redis->connected) {
|
|
|
|
- $ret = $sub_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
|
|
|
|
- Log::record("sub_message redis connected = {$sub_redis->connected}",Log::DEBUG);
|
|
|
|
|
|
+ Log::record("subscribe_message start quit=" . strbool($quit),Log::DEBUG);
|
|
|
|
+ if(!$redis->connected) {
|
|
|
|
+ $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
|
|
|
|
+ Log::record("subscribe_message redis connected = {$redis->connected}",Log::DEBUG);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
$ret = true;
|
|
$ret = true;
|
|
}
|
|
}
|
|
|
|
|
|
if(!$ret) {
|
|
if(!$ret) {
|
|
- Log::record("sub_message cannot connet redis.",Log::DEBUG);
|
|
|
|
- $sub_redis->close();
|
|
|
|
|
|
+ Log::record("subscribe_message cannot connet redis.",Log::DEBUG);
|
|
|
|
+ $redis->close();
|
|
Swoole\Coroutine::sleep(1);
|
|
Swoole\Coroutine::sleep(1);
|
|
}
|
|
}
|
|
- elseif($sub_redis->subscribe($channels))
|
|
|
|
|
|
+ elseif($redis->subscribe($channels))
|
|
{
|
|
{
|
|
- while ($msg = $sub_redis->recv())
|
|
|
|
|
|
+ while ($msg = $redis->recv())
|
|
{
|
|
{
|
|
[$sub_type, $channel, $content] = $msg;
|
|
[$sub_type, $channel, $content] = $msg;
|
|
$content = unserialize($content);
|
|
$content = unserialize($content);
|
|
$type = $content['type'];
|
|
$type = $content['type'];
|
|
|
|
|
|
if($channel != 'refill') continue;
|
|
if($channel != 'refill') continue;
|
|
|
|
+ if($quit) break;
|
|
|
|
+
|
|
if($type == 'channel' || $type == 'merchant') {
|
|
if($type == 'channel' || $type == 'merchant') {
|
|
- Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
|
|
|
|
|
|
+ Log::record("subscribe_message recv mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
|
|
refill\RefillFactory::instance()->load();
|
|
refill\RefillFactory::instance()->load();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- Log::record("sub_message redis recv timeout",Log::DEBUG);
|
|
|
|
|
|
+ Log::record("subscribe_message redis recv timeout",Log::DEBUG);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
- Log::record("sub_message subscribe error",Log::ERR);
|
|
|
|
|
|
+ Log::record("subscribe_message subscribe error",Log::ERR);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (Exception $ex)
|
|
catch (Exception $ex)
|
|
@@ -86,25 +93,26 @@ function sub_message(&$waiting_quit,&$sub_redis,$channels)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- $waiting_quit = true;
|
|
|
|
- Log::record("sub_message quit",Log::DEBUG);
|
|
|
|
|
|
+ Log::record("subscribe_message quit =" . strbool($quit),Log::DEBUG);
|
|
|
|
+ $quit = true;
|
|
}
|
|
}
|
|
|
|
|
|
-function ratio_update(&$waiting_quit)
|
|
|
|
|
|
+function ratio_update(&$quit)
|
|
{
|
|
{
|
|
//每分钟的第0秒 将上一分钟的数据,写入HD5File。
|
|
//每分钟的第0秒 将上一分钟的数据,写入HD5File。
|
|
//每分钟的第2秒 统计之前的数据
|
|
//每分钟的第2秒 统计之前的数据
|
|
//每分钟的第3秒触发更新 成功率统计数据
|
|
//每分钟的第3秒触发更新 成功率统计数据
|
|
$cur_min = 0;
|
|
$cur_min = 0;
|
|
- while (!$waiting_quit)
|
|
|
|
|
|
+ while (!$quit)
|
|
{
|
|
{
|
|
- Log::record("ratio_update start calc",Log::DEBUG);
|
|
|
|
|
|
+ Log::record("ratio_update start calc quit =" . strbool($quit),Log::DEBUG);
|
|
|
|
|
|
for ($i = 0; $i < 61; $i++) {
|
|
for ($i = 0; $i < 61; $i++) {
|
|
$cur_time = time();
|
|
$cur_time = time();
|
|
$time_sec = $cur_time;
|
|
$time_sec = $cur_time;
|
|
$next_min = $time_sec - $time_sec % 60;
|
|
$next_min = $time_sec - $time_sec % 60;
|
|
|
|
|
|
|
|
+ if($quit) break;
|
|
if($next_min > $cur_min && $time_sec % 60 == 3) {
|
|
if($next_min > $cur_min && $time_sec % 60 == 3) {
|
|
$cur_min = $next_min;
|
|
$cur_min = $next_min;
|
|
break;
|
|
break;
|
|
@@ -112,13 +120,13 @@ function ratio_update(&$waiting_quit)
|
|
Swoole\Coroutine::sleep(1);
|
|
Swoole\Coroutine::sleep(1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if(!$waiting_quit) {
|
|
|
|
|
|
+ if(!$quit) {
|
|
refill\RefillFactory::instance()->UpdateRatio();
|
|
refill\RefillFactory::instance()->UpdateRatio();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- $waiting_quit = true;
|
|
|
|
- Log::record("ratio_update quit",Log::DEBUG);
|
|
|
|
|
|
+ Log::record("ratio_update quit =". strbool($quit),Log::DEBUG);
|
|
|
|
+ $quit = true;
|
|
}
|
|
}
|
|
|
|
|
|
$workers = [];
|
|
$workers = [];
|
|
@@ -128,23 +136,26 @@ for ($i = 0; $i < $process_count;$i++)
|
|
{
|
|
{
|
|
Base::run_util();
|
|
Base::run_util();
|
|
set_error_handler('handle_error');
|
|
set_error_handler('handle_error');
|
|
- $waiting_quit = false;
|
|
|
|
|
|
+ $sub_quit = false;
|
|
$sub_redis = null;
|
|
$sub_redis = null;
|
|
- go(function () use (&$waiting_quit,&$sub_redis) {
|
|
|
|
- sub_message($waiting_quit,$sub_redis,['refill']);
|
|
|
|
|
|
+ go(function () use (&$sub_quit,&$sub_redis) {
|
|
|
|
+ subscribe_message($sub_quit,$sub_redis,['refill']);
|
|
});
|
|
});
|
|
- go(function () use (&$waiting_quit) {
|
|
|
|
- ratio_update($waiting_quit);
|
|
|
|
|
|
+
|
|
|
|
+ $ratio_quit = false;
|
|
|
|
+ go(function () use (&$ratio_quit) {
|
|
|
|
+ ratio_update($ratio_quit);
|
|
});
|
|
});
|
|
|
|
|
|
$looper = new processor(true);
|
|
$looper = new processor(true);
|
|
- Swoole\Process::signal(SIGTERM, function($signal_num) use ($worker,&$waiting_quit,$sub_redis,$looper)
|
|
|
|
|
|
+ Swoole\Process::signal(SIGTERM, function($signal_num) use ($worker,&$sub_quit,$sub_redis,&$ratio_quit,$looper)
|
|
{
|
|
{
|
|
Log::record("signal call SIGTERM begin = $signal_num, #{$worker->pid}",Log::DEBUG);
|
|
Log::record("signal call SIGTERM begin = $signal_num, #{$worker->pid}",Log::DEBUG);
|
|
|
|
|
|
set_error_handler(null);
|
|
set_error_handler(null);
|
|
try {
|
|
try {
|
|
- $waiting_quit = true;
|
|
|
|
|
|
+ $sub_quit = true;
|
|
|
|
+ $ratio_quit = true;
|
|
$sub_redis->close();
|
|
$sub_redis->close();
|
|
}
|
|
}
|
|
catch(Exception $ex) {
|
|
catch(Exception $ex) {
|