iqueue.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. <?php
  2. declare(strict_types=1);
  3. namespace queue;
  4. require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
  5. use Redis;
  6. use Exception;
  7. use Log;
  8. class IQueueDB
  9. {
  10. private $_redis;
  11. private $_queue_name;
  12. //存定义存储表的数量,系统会随机分配存储
  13. private $_comode;
  14. public function __construct($queue_name,$comode = false)
  15. {
  16. if ( !extension_loaded('redis') ) {
  17. throw_exception('redis failed to load');
  18. }
  19. $this->_queue_name = $queue_name;
  20. $this->_comode = $comode;
  21. if ($this->_comode) {
  22. $this->_redis = new \Swoole\Coroutine\Redis();
  23. $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
  24. } else {
  25. $this->_redis = new Redis();
  26. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'), 20);
  27. }
  28. $this->connect();
  29. }
  30. public function connect(): bool
  31. {
  32. if ($this->_comode) {
  33. if ($this->_redis->connected) return true;
  34. $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
  35. Log::record("Swoole\Coroutine\Redis connect ret = {$ret}", Log::DEBUG);
  36. return $this->_redis->connected;
  37. } else {
  38. if ($this->_redis->isConnected()) return true;
  39. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'), 20);
  40. Log::record("Redis connect ret = {$ret}", Log::DEBUG);
  41. return $this->_redis->isConnected();
  42. }
  43. }
  44. public function close()
  45. {
  46. if ($this->_comode)
  47. {
  48. if ($this->_redis->connected) {
  49. $this->_redis->close();
  50. }
  51. } elseif ($this->_redis->isConnected()) {
  52. $this->_redis->close();
  53. } else {
  54. Log::record("redis has closed",Log::DEBUG);
  55. }
  56. }
  57. public function rpush($value)
  58. {
  59. try {
  60. return $this->_redis->rPush($this->_queue_name, $value);
  61. } catch(Exception $e) {
  62. Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
  63. return false;
  64. }
  65. }
  66. public function lpush($value)
  67. {
  68. try {
  69. return $this->_redis->lPush($this->_queue_name,$value);
  70. } catch(Exception $e) {
  71. Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
  72. return false;
  73. }
  74. }
  75. public function scan()
  76. {
  77. $list_key[] = $this->_queue_name;
  78. return $list_key;
  79. }
  80. public function rpop($key, $time)
  81. {
  82. $result = $this->_redis->brPop($key, $time);
  83. if ($result) {
  84. return $result[1];
  85. } else {
  86. return null;
  87. }
  88. }
  89. public function lpop($key, $time)
  90. {
  91. $result = $this->_redis->blPop($key, $time);
  92. if ($result) {
  93. return $result[1];
  94. } else {
  95. return null;
  96. }
  97. }
  98. public function clear() {
  99. $this->_redis->flushAll();
  100. }
  101. }
  102. /**
  103. * 队列处理
  104. *
  105. *
  106. * @package
  107. */
  108. class IClient
  109. {
  110. private $mQueuedb;
  111. public function __construct($queueDb)
  112. {
  113. $this->mQueuedb = $queueDb;
  114. }
  115. public function push($key, $value)
  116. {
  117. return $this->mQueuedb->lpush(serialize([$key=>$value]));
  118. }
  119. }
  120. class IServer
  121. {
  122. private $_queuedb;
  123. public function __construct($queueDb) {
  124. $this->_queuedb = $queueDb;
  125. }
  126. public function connect() : bool
  127. {
  128. return $this->_queuedb->connect();
  129. }
  130. public function pop($key,$time)
  131. {
  132. $result = $this->_queuedb->rpop($key,$time);
  133. if($result != null) {
  134. return unserialize($result);
  135. } else {
  136. return false;
  137. }
  138. }
  139. public function lpop($key,$time)
  140. {
  141. $result = $this->_queuedb->lpop($key,$time);
  142. if($result != null) {
  143. return unserialize($result);
  144. } else {
  145. return false;
  146. }
  147. }
  148. public function scan() {
  149. return $this->_queuedb->scan();
  150. }
  151. public function stop()
  152. {
  153. $this->_queuedb->close();
  154. }
  155. }
  156. abstract class ILooper
  157. {
  158. private $_stop = false;
  159. private $mServer;
  160. const MAX_COROUTINE = 1000;
  161. protected function __construct($server)
  162. {
  163. $this->mServer = $server;
  164. }
  165. public function prepare()
  166. {
  167. if (ob_get_level()) ob_end_clean();
  168. pcntl_signal(SIGINT, [$this,'sig_handler']);
  169. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  170. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  171. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  172. }
  173. abstract protected function handle($msg);
  174. public function stop()
  175. {
  176. Log::record(__FUNCTION__,Log::DEBUG);
  177. $this->_stop = true;
  178. }
  179. public function run()
  180. {
  181. $queues = $this->mServer->scan();
  182. while (true)
  183. {
  184. try
  185. {
  186. if ($this->_stop) break;
  187. perfor_clear();
  188. if(defined('USE_COROUTINE') && USE_COROUTINE)
  189. {
  190. $res = \Swoole\Coroutine::stats();
  191. $num = $res['coroutine_num'];
  192. $mem = memory_get_usage();
  193. if($num < ILooper::MAX_COROUTINE)
  194. {
  195. if($this->mServer->connect() == false) {
  196. \Swoole\Coroutine::sleep(1);
  197. Log::record("Processor redis disconnect.",Log::ERR);
  198. continue;
  199. }
  200. $content = $this->mServer->pop($queues,1);
  201. if(empty($content)) continue;
  202. $msg = json_encode($content);
  203. Log::record("messge:{$msg}",Log::DEBUG);
  204. if($this->_stop)
  205. {
  206. foreach ($content as $key => $params) {
  207. DispatcherClient::instance()->push($key, $params);
  208. }
  209. }
  210. else
  211. {
  212. go(function ()use ($content,$num,$mem) {
  213. $start = microtime(true);
  214. Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}",Log::DEBUG);
  215. $method = $this->handle($content);
  216. $use_time = microtime(true) - $start;
  217. $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}",$use_time);
  218. Log::record($msg,Log::DEBUG);
  219. });
  220. }
  221. }
  222. else {
  223. \Swoole\Coroutine::sleep(0.1);
  224. }
  225. }
  226. else
  227. {
  228. if($this->mServer->connect() == false) {
  229. sleep(1);
  230. continue;
  231. }
  232. $content = $this->mServer->pop($queues,1);
  233. if(empty($content)) continue;
  234. if($this->_stop)
  235. {
  236. foreach ($content as $key => $params) {
  237. DispatcherClient::instance()->push($key, $params);
  238. }
  239. }
  240. else
  241. {
  242. perfor_clear();
  243. perfor_start();
  244. $this->handle($content);
  245. perfor_end('Handle Request');
  246. $info = perfor_log();
  247. Log::record("{$info} \r\n",Log::DEBUG);
  248. }
  249. }
  250. }
  251. catch (Exception $e)
  252. {
  253. $err = $e->getMessage();
  254. $code = $e->getCode();
  255. Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
  256. }
  257. }
  258. Log::record("ILooper Run quit.", Log::DEBUG);
  259. }
  260. private function sig_handler($signo)
  261. {
  262. Log::record("queue quit at sig_handler.",Log::DEBUG);
  263. switch($signo) {
  264. case SIGINT:
  265. case SIGHUP:
  266. case SIGQUIT:
  267. case SIGTERM:
  268. $this->_stop = true;
  269. break;
  270. default:
  271. break;
  272. }
  273. }
  274. }