stanley-king il y a 3 ans
Parent
commit
62810c626e
2 fichiers modifiés avec 61 ajouts et 14 suppressions
  1. 60 13
      helper/queue/iqueue.php
  2. 1 1
      rdispatcher/codispatcher.php

+ 60 - 13
helper/queue/iqueue.php

@@ -42,6 +42,7 @@ class IQueueDB
                 return true;
             }
             else {
+                $this->close();
                 $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
                 Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
                 return $this->_redis->connected;
@@ -51,6 +52,7 @@ class IQueueDB
             return true;
         }
         else {
+            $this->close();
             $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
             Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
             return $this->_redis->isConnected();
@@ -115,9 +117,9 @@ class IQueueDB
         return $list_key;
     }
 
-    public function rpop($key, $time)
+    public function rpop($key)
     {
-        $result = $this->_redis->brPop($key, $time);
+        $result = $this->_redis->rPop($key);
 
         $tmp = serialize($result);
         Log::record("IQueueDB::rpop={$tmp}",Log::DEBUG);
@@ -128,9 +130,9 @@ class IQueueDB
             return null;
         }
     }
-    public function lpop($key, $time)
+    public function lpop($key)
     {
-        $result = $this->_redis->blPop($key, $time);
+        $result = $this->_redis->lPop($key);
 
         $tmp = serialize($result);
         Log::record("IQueueDB::lpop={$tmp}",Log::DEBUG);
@@ -142,6 +144,33 @@ class IQueueDB
         }
     }
 
+    public function brpop($key, $time)
+    {
+        $result = $this->_redis->brPop($key, $time);
+
+        $tmp = serialize($result);
+        Log::record("IQueueDB::brPop={$tmp}",Log::DEBUG);
+
+        if ($result) {
+            return $result[1];
+        } else {
+            return null;
+        }
+    }
+    public function blpop($key, $time)
+    {
+        $result = $this->_redis->blPop($key, $time);
+
+        $tmp = serialize($result);
+        Log::record("IQueueDB::blPop={$tmp}",Log::DEBUG);
+
+        if ($result) {
+            return $result[1];
+        } else {
+            return null;
+        }
+    }
+
     public function clear() {
         $this->_redis->flushAll();
     }
@@ -181,16 +210,35 @@ class IServer
         return $this->_queuedb->connect();
     }
 
-    public function pop($key,$time)
+    public function rpop($key)
+    {
+        $result = $this->_queuedb->rpop($key);
+        if($result != null) {
+            return unserialize($result);
+        } else {
+            return false;
+        }
+    }
+    public function lpop($key)
+    {
+        $result = $this->_queuedb->lpop($key);
+        if($result != null) {
+            return unserialize($result);
+        } else {
+            return false;
+        }
+    }
+
+    public function brpop($key,$time)
     {
-        $result = $this->_queuedb->rpop($key,$time);
+        $result = $this->_queuedb->brpop($key,$time);
         if($result != null) {
             return unserialize($result);
         } else {
             return false;
         }
     }
-    public function lpop($key,$time)
+    public function blpop($key,$time)
     {
         $result = $this->_queuedb->lpop($key,$time);
         if($result != null) {
@@ -263,12 +311,11 @@ abstract class ILooper
                             continue;
                         }
 
-                        $content = $this->mServer->pop($queues,1);
-                        if(empty($content)) continue;
-
-                        $msg = json_encode($content);
-                        Log::record("queuemessage:{$msg}",Log::DEBUG);
-
+                        $content = $this->mServer->rpop($queues);
+                        if(empty($content)) {
+                            Swoole\Coroutine::sleep(1);
+                            continue;
+                        }
 
                         if($this->_stop)
                         {

+ 1 - 1
rdispatcher/codispatcher.php

@@ -126,7 +126,7 @@ for ($i = 0; $i < $process_count;$i++)
             subscribe_message($sub_quit,$sub_redis,['refill']);
         });
 
-        $looper = new processor(true);
+        $looper = new processor(false);
         go(function () use ($looper) {
             $looper->run();
         });