stanley-king před 4 roky
rodič
revize
6689294d4a
1 změnil soubory, kde provedl 90 přidání a 0 odebrání
  1. 90 0
      queue/control/queue.php

+ 90 - 0
queue/control/queue.php

@@ -0,0 +1,90 @@
+<?php
+/**
+ * 队列
+*
+*
+*
+*
+*/
+defined('InShopNC') or exit('Access Invalid!');
+
+//此行代码会导致bug
+//ini_set('default_socket_timeout', -1);
+
+class queueControl extends BaseCronControl
+{
+    private $_stop = false;
+
+    public function indexOp()
+    {
+        if (ob_get_level()) ob_end_clean();
+
+        pcntl_signal(SIGINT,  [$this,'sig_handler']);
+        pcntl_signal(SIGHUP,  [$this,'sig_handler']);
+        pcntl_signal(SIGQUIT, [$this,'sig_handler']);
+        pcntl_signal(SIGTERM, [$this,'sig_handler']);
+
+        $logic_queue = Logic('queue');
+
+        $worker = new QueueServer();
+        $queues = $worker->scan();
+
+        $empty_times = 0;
+        while (true)
+        {
+            pcntl_signal_dispatch();
+            try
+            {
+                if ($this->_stop) break;
+
+                $content = $worker->pop($queues, 1);
+                if(is_array($content))
+                {
+                    $method = key($content);
+                    $arg = current($content);
+
+                    $argx = json_encode($arg,JSON_UNESCAPED_UNICODE);
+
+                    Log::record("method={$method} args={$argx}",Log::DEBUG);
+                    $result = $logic_queue->$method($arg);
+                    if (!$result['state']) {
+                        $this->log($result['msg'],false);
+                    }
+                    $empty_times = 0;
+                }
+                else
+                {
+                    $empty_times ++;
+                    if($empty_times > 600) {
+                        $model = Model();
+                        $model->checkActive();
+                        unset($model);
+                        $empty_times = 0;
+                    }
+                }
+            }
+            catch (Exception $e)
+            {
+                $err = $e->getMessage();
+                $code = $e->getCode();
+                Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
+                break;
+            }
+        }
+    }
+
+    private function sig_handler($signo)
+    {
+        Log::record("queue quit at sig_handler.",Log::DEBUG);
+        switch($signo) {
+            case SIGINT:
+            case SIGHUP:
+            case SIGQUIT:
+            case SIGTERM:
+                $this->_stop = true;
+                break;
+            default:
+                break;
+        }
+    }
+}