iqueue.php 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. <?php
  2. declare(strict_types=1);
  3. namespace queue;
  4. require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
  5. use Redis;
  6. use Exception;
  7. use Log;
  8. use Swoole;
  9. class IQueueDB
  10. {
  11. private $_redis;
  12. private $_queue_name;
  13. //存定义存储表的数量,系统会随机分配存储
  14. private $_comode;
  15. public function __construct($queue_name,$comode = false)
  16. {
  17. if ( !extension_loaded('redis') ) {
  18. throw_exception('redis failed to load');
  19. }
  20. $this->_queue_name = $queue_name;
  21. $this->_comode = $comode;
  22. if ($this->_comode) {
  23. $this->_redis = new Swoole\Coroutine\Redis();
  24. } else {
  25. $this->_redis = new Redis();
  26. }
  27. $this->connect();
  28. }
  29. public function connect(): bool
  30. {
  31. if ($this->_comode)
  32. {
  33. if ($this->_redis->connected) {
  34. return true;
  35. }
  36. else {
  37. $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
  38. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  39. return $this->_redis->connected;
  40. }
  41. }
  42. elseif ($this->_redis->isConnected()) {
  43. return true;
  44. }
  45. else {
  46. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
  47. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  48. return $this->_redis->isConnected();
  49. }
  50. }
  51. public function close()
  52. {
  53. if ($this->_comode)
  54. {
  55. if ($this->_redis->connected) {
  56. $this->_redis->close();
  57. }
  58. } elseif ($this->_redis->isConnected()) {
  59. $this->_redis->close();
  60. } else {
  61. Log::record("redis has closed",Log::DEBUG);
  62. }
  63. }
  64. public function rpush($value)
  65. {
  66. try
  67. {
  68. if($this->connect()) {
  69. $ret = $this->_redis->rPush($this->_queue_name, $value);
  70. Log::record("IQueueDB::lpush ret={$ret}", Log::DEBUG);
  71. return $ret;
  72. } else {
  73. Log::record("IQueueDB::rpush connect=false", Log::DEBUG);
  74. return false;
  75. }
  76. }
  77. catch(Exception $e) {
  78. Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
  79. return false;
  80. }
  81. }
  82. public function lpush($value)
  83. {
  84. try
  85. {
  86. if ($this->connect()) {
  87. $ret = $this->_redis->lPush($this->_queue_name, $value);
  88. Log::record("IQueueDB::lpush ret={$ret}", Log::DEBUG);
  89. return $ret;
  90. } else{
  91. Log::record("IQueueDB::lpush connect=false", Log::DEBUG);
  92. return false;
  93. }
  94. }
  95. catch(Exception $e) {
  96. Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
  97. return false;
  98. }
  99. }
  100. public function scan()
  101. {
  102. $list_key[] = $this->_queue_name;
  103. return $list_key;
  104. }
  105. public function rpop($key, $time)
  106. {
  107. $result = $this->_redis->brPop($key, $time);
  108. $tmp = serialize($result);
  109. Log::record("IQueueDB::rpop={$tmp}",Log::DEBUG);
  110. if ($result) {
  111. return $result[1];
  112. } else {
  113. return null;
  114. }
  115. }
  116. public function lpop($key, $time)
  117. {
  118. $result = $this->_redis->blPop($key, $time);
  119. $tmp = serialize($result);
  120. Log::record("IQueueDB::lpop={$tmp}",Log::DEBUG);
  121. if ($result) {
  122. return $result[1];
  123. } else {
  124. return null;
  125. }
  126. }
  127. public function clear() {
  128. $this->_redis->flushAll();
  129. }
  130. }
  131. /**
  132. * 队列处理
  133. *
  134. *
  135. * @package
  136. */
  137. class IClient
  138. {
  139. private $mQueuedb;
  140. public function __construct($queueDb)
  141. {
  142. $this->mQueuedb = $queueDb;
  143. }
  144. public function push($key, $value)
  145. {
  146. return $this->mQueuedb->lpush(serialize([$key=>$value]));
  147. }
  148. }
  149. class IServer
  150. {
  151. private $_queuedb;
  152. public function __construct($queueDb) {
  153. $this->_queuedb = $queueDb;
  154. }
  155. public function connect() : bool
  156. {
  157. return $this->_queuedb->connect();
  158. }
  159. public function pop($key,$time)
  160. {
  161. $result = $this->_queuedb->rpop($key,$time);
  162. if($result != null) {
  163. return unserialize($result);
  164. } else {
  165. return false;
  166. }
  167. }
  168. public function lpop($key,$time)
  169. {
  170. $result = $this->_queuedb->lpop($key,$time);
  171. if($result != null) {
  172. return unserialize($result);
  173. } else {
  174. return false;
  175. }
  176. }
  177. public function scan() {
  178. return $this->_queuedb->scan();
  179. }
  180. public function stop()
  181. {
  182. $this->_queuedb->close();
  183. }
  184. }
  185. abstract class ILooper
  186. {
  187. private $_stop = false;
  188. private $mServer;
  189. const MAX_COROUTINE = 1000;
  190. protected function __construct($server)
  191. {
  192. $this->mServer = $server;
  193. }
  194. public function prepare()
  195. {
  196. if (ob_get_level()) ob_end_clean();
  197. pcntl_signal(SIGINT, [$this,'sig_handler']);
  198. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  199. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  200. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  201. }
  202. abstract protected function handle($msg);
  203. public function stop()
  204. {
  205. Log::record(__FUNCTION__,Log::DEBUG);
  206. $this->_stop = true;
  207. }
  208. public function run()
  209. {
  210. $queues = $this->mServer->scan();
  211. while (true)
  212. {
  213. try
  214. {
  215. if ($this->_stop) break;
  216. perfor_clear();
  217. if(defined('USE_COROUTINE') && USE_COROUTINE)
  218. {
  219. $res = Swoole\Coroutine::stats();
  220. $num = $res['coroutine_num'];
  221. $mem = memory_get_usage();
  222. Log::record("IQueueDB::run coroutin_num={$num} memory={$mem}",Log::DEBUG);
  223. if($num < ILooper::MAX_COROUTINE)
  224. {
  225. if($this->mServer->connect() == false) {
  226. Swoole\Coroutine::sleep(1);
  227. Log::record("Processor redis disconnect.",Log::ERR);
  228. continue;
  229. }
  230. $content = $this->mServer->pop($queues,1);
  231. if(empty($content)) continue;
  232. $msg = json_encode($content);
  233. Log::record("queuemessage:{$msg}",Log::DEBUG);
  234. if($this->_stop)
  235. {
  236. foreach ($content as $key => $params) {
  237. DispatcherClient::instance()->push($key, $params);
  238. }
  239. }
  240. else
  241. {
  242. go(function ()use ($content,$num,$mem) {
  243. $start = microtime(true);
  244. Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}",Log::DEBUG);
  245. $method = $this->handle($content);
  246. $use_time = microtime(true) - $start;
  247. $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}",$use_time);
  248. Log::record($msg,Log::DEBUG);
  249. });
  250. }
  251. }
  252. else {
  253. Swoole\Coroutine::sleep(0.1);
  254. }
  255. }
  256. else
  257. {
  258. if($this->mServer->connect() == false) {
  259. sleep(1);
  260. continue;
  261. }
  262. $content = $this->mServer->pop($queues,1);
  263. if(empty($content)) continue;
  264. if($this->_stop)
  265. {
  266. foreach ($content as $key => $params) {
  267. DispatcherClient::instance()->push($key, $params);
  268. }
  269. }
  270. else
  271. {
  272. perfor_clear();
  273. perfor_start();
  274. $this->handle($content);
  275. perfor_end('Handle Request');
  276. $info = perfor_log();
  277. Log::record("{$info} \r\n",Log::DEBUG);
  278. }
  279. }
  280. }
  281. catch (Exception $e)
  282. {
  283. $err = $e->getMessage();
  284. $code = $e->getCode();
  285. Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
  286. }
  287. }
  288. Log::record("ILooper Run quit.", Log::DEBUG);
  289. }
  290. private function sig_handler($signo)
  291. {
  292. Log::record("queue quit at sig_handler.",Log::DEBUG);
  293. switch($signo) {
  294. case SIGINT:
  295. case SIGHUP:
  296. case SIGQUIT:
  297. case SIGTERM:
  298. $this->_stop = true;
  299. break;
  300. default:
  301. break;
  302. }
  303. }
  304. }