|
@@ -44,11 +44,19 @@ function handle_error($level, $message, $file, $line)
|
|
|
|
|
|
function sub_message($channels)
|
|
|
{
|
|
|
+ $redis = new Swoole\Coroutine\Redis();
|
|
|
+
|
|
|
while (true)
|
|
|
{
|
|
|
- try {
|
|
|
- $redis = new Swoole\Coroutine\Redis();
|
|
|
- $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if(!$redis->connected) {
|
|
|
+ $ret = $redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
|
|
|
+ Log::record("sub_message redis connected = {$redis->connected}",Log::DEBUG);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ $ret = true;
|
|
|
+ }
|
|
|
|
|
|
if(!$ret) {
|
|
|
Log::record("sub_message cannot connet redis.",Log::DEBUG);
|
|
@@ -57,28 +65,22 @@ function sub_message($channels)
|
|
|
}
|
|
|
elseif($redis->subscribe($channels))
|
|
|
{
|
|
|
- Log::record("sub_message starting recv",Log::DEBUG);
|
|
|
while ($msg = $redis->recv())
|
|
|
{
|
|
|
[$sub_type, $channel, $content] = $msg;
|
|
|
- Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$content}",Log::DEBUG);
|
|
|
-
|
|
|
$content = unserialize($content);
|
|
|
$type = $content['type'];
|
|
|
- Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
|
|
|
|
|
|
if($channel != 'refill') continue;
|
|
|
-
|
|
|
if($type == 'channel' || $type == 'merchant') {
|
|
|
- Log::record("recv publish message reload refill",Log::DEBUG);
|
|
|
+ Log::record("sub_message recv mgs:{$sub_type}-{$channel}-{$type}",Log::DEBUG);
|
|
|
refill\RefillFactory::instance()->load();
|
|
|
}
|
|
|
}
|
|
|
- $redis->close();
|
|
|
- Log::record("sub_message redis close",Log::DEBUG);
|
|
|
+ Log::record("sub_message redis recv timeout",Log::DEBUG);
|
|
|
}
|
|
|
else {
|
|
|
- Log::record("subscribe publish message error",Log::ERR);
|
|
|
+ Log::record("sub_message subscribe error",Log::ERR);
|
|
|
}
|
|
|
}
|
|
|
catch (Exception $ex)
|
|
@@ -88,38 +90,37 @@ function sub_message($channels)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+//Co\run(function()
|
|
|
+//{
|
|
|
+// Base::run_util();
|
|
|
+// set_error_handler('handle_error');
|
|
|
+// go(function () {
|
|
|
+// sub_message(['refill']);
|
|
|
+// });
|
|
|
+// $looper = new processor(true);
|
|
|
+// $looper->run();
|
|
|
+//});
|
|
|
+
|
|
|
+for ($i = 0; $i < $process_count;$i++)
|
|
|
+{
|
|
|
+ $process = new Swoole\Process(function(Swoole\Process $worker)
|
|
|
+ {
|
|
|
+ Base::run_util();
|
|
|
+ set_error_handler('handle_error');
|
|
|
+ go(function () {
|
|
|
+ sub_message(['refill']);
|
|
|
+ });
|
|
|
+
|
|
|
+ $looper = new processor(true);
|
|
|
+ $looper->run();
|
|
|
+
|
|
|
+ }, false, false, true);
|
|
|
|
|
|
-Co\run(function()
|
|
|
+ $process->start();
|
|
|
+}
|
|
|
+
|
|
|
+for ($i = 0; $i < $process_count;$i++)
|
|
|
{
|
|
|
- Base::run_util();
|
|
|
- set_error_handler('handle_error');
|
|
|
- go(function () {
|
|
|
- sub_message(['refill']);
|
|
|
- });
|
|
|
- $looper = new processor(true);
|
|
|
- $looper->run();
|
|
|
-});
|
|
|
-
|
|
|
-//for ($i = 0; $i < $process_count;$i++)
|
|
|
-//{
|
|
|
-// $process = new Swoole\Process(function(Swoole\Process $worker)
|
|
|
-// {
|
|
|
-// Base::run_util();
|
|
|
-// set_error_handler('handle_error');
|
|
|
-// go(function () {
|
|
|
-// sub_message(['refill']);
|
|
|
-// });
|
|
|
-//
|
|
|
-//// $looper = new processor(true);
|
|
|
-//// $looper->run();
|
|
|
-//
|
|
|
-// }, false, false, true);
|
|
|
-//
|
|
|
-// $process->start();
|
|
|
-//}
|
|
|
-
|
|
|
-//for ($i = 0; $i < $process_count;$i++)
|
|
|
-//{
|
|
|
-// $status = Swoole\Process::wait(true);
|
|
|
-// Log::record("Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
|
|
|
-//}
|
|
|
+ $status = Swoole\Process::wait(true);
|
|
|
+ Log::record("Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}",Log::DEBUG);
|
|
|
+}
|