iqueue.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. <?php
  2. declare(strict_types=1);
  3. namespace queue;
  4. require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
  5. use Redis;
  6. use Exception;
  7. use Log;
  8. use refill\util;
  9. use Swoole;
  10. class IQueueDB
  11. {
  12. private $_redis;
  13. private $_queue_name;
  14. //存定义存储表的数量,系统会随机分配存储
  15. public function __construct($queue_name)
  16. {
  17. if ( !extension_loaded('redis') ) {
  18. throw_exception('redis failed to load');
  19. }
  20. $this->_queue_name = $queue_name;
  21. $this->_redis = new Redis();
  22. $this->connect();
  23. }
  24. public function connect(): bool
  25. {
  26. if ($this->_redis->isConnected()) {
  27. return true;
  28. }
  29. else {
  30. $this->close();
  31. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
  32. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  33. return $this->_redis->isConnected();
  34. }
  35. }
  36. public function close()
  37. {
  38. if ($this->_redis->isConnected()) {
  39. $this->_redis->close();
  40. } else {
  41. Log::record("redis has closed",Log::DEBUG);
  42. }
  43. }
  44. public function rpush($value)
  45. {
  46. try
  47. {
  48. if($this->connect()) {
  49. $ret = $this->_redis->rPush($this->_queue_name, $value);
  50. return $ret;
  51. } else {
  52. Log::record("IQueueDB::rpush connect=false", Log::DEBUG);
  53. return false;
  54. }
  55. }
  56. catch(Exception $e) {
  57. Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
  58. return false;
  59. }
  60. }
  61. public function lpush($value)
  62. {
  63. try
  64. {
  65. if ($this->connect()) {
  66. $ret = $this->_redis->lPush($this->_queue_name, $value);
  67. return $ret;
  68. } else {
  69. Log::record("IQueueDB::lpush connect=false", Log::DEBUG);
  70. return false;
  71. }
  72. }
  73. catch(Exception $e) {
  74. Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
  75. return false;
  76. }
  77. }
  78. public function scan()
  79. {
  80. $list_key[] = $this->_queue_name;
  81. return $list_key;
  82. }
  83. public function rpop()
  84. {
  85. $key = $this->_queue_name;
  86. $result = $this->_redis->rPop($key);
  87. return $result;
  88. }
  89. public function lpop()
  90. {
  91. $key = $this->_queue_name;
  92. $result = $this->_redis->lPop($key);
  93. return $result;
  94. }
  95. public function brpop($key, $time)
  96. {
  97. $result = $this->_redis->brPop($key, $time);
  98. if ($result) {
  99. return $result[1];
  100. } else {
  101. return null;
  102. }
  103. }
  104. public function blpop($key, $time)
  105. {
  106. $result = $this->_redis->blPop($key, $time);
  107. if ($result) {
  108. return $result[1];
  109. } else {
  110. return null;
  111. }
  112. }
  113. public function clear() {
  114. $this->_redis->flushAll();
  115. }
  116. }
  117. /**
  118. * 队列处理
  119. *
  120. *
  121. * @package
  122. */
  123. class IClient
  124. {
  125. private $mQueuedb;
  126. public function __construct($queueDb)
  127. {
  128. $this->mQueuedb = $queueDb;
  129. }
  130. public function push($key, $value)
  131. {
  132. return $this->mQueuedb->lpush(serialize([$key=>$value]));
  133. }
  134. }
  135. class IServer
  136. {
  137. private $_queuedb;
  138. public function __construct($queueDb) {
  139. $this->_queuedb = $queueDb;
  140. }
  141. public function connect() : bool
  142. {
  143. return $this->_queuedb->connect();
  144. }
  145. public function rpop()
  146. {
  147. $result = $this->_queuedb->rpop();
  148. if($result != null) {
  149. return unserialize($result);
  150. } else {
  151. return false;
  152. }
  153. }
  154. public function lpop()
  155. {
  156. $result = $this->_queuedb->lpop();
  157. if($result != null) {
  158. return unserialize($result);
  159. } else {
  160. return false;
  161. }
  162. }
  163. public function brpop($key,$time)
  164. {
  165. $result = $this->_queuedb->brpop($key,$time);
  166. if($result != null) {
  167. return unserialize($result);
  168. } else {
  169. return false;
  170. }
  171. }
  172. public function blpop($key,$time)
  173. {
  174. $result = $this->_queuedb->lpop($key,$time);
  175. if($result != null) {
  176. return unserialize($result);
  177. } else {
  178. return false;
  179. }
  180. }
  181. public function scan() {
  182. return $this->_queuedb->scan();
  183. }
  184. public function stop()
  185. {
  186. $this->_queuedb->close();
  187. }
  188. }
  189. abstract class ILooper
  190. {
  191. private $_stop = false;
  192. private $_pause = 0;
  193. private $_cid = 0;
  194. private $mServer;
  195. const MAX_COROUTINE = 500;
  196. protected function __construct($server)
  197. {
  198. $this->mServer = $server;
  199. }
  200. public function prepare()
  201. {
  202. if (ob_get_level()) ob_end_clean();
  203. pcntl_signal(SIGINT, [$this,'sig_handler']);
  204. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  205. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  206. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  207. }
  208. abstract protected function handle($msg);
  209. public function pause()
  210. {
  211. $this->_pause = 1;
  212. do{
  213. Swoole\Coroutine::sleep(1);
  214. } while($this->_pause == 1);
  215. Log::record("runloop cid={$this->_cid} pause={$this->_pause}",Log::DEBUG);
  216. $this->wait();
  217. }
  218. public function resume()
  219. {
  220. $exists = Swoole\Coroutine::exists($this->_cid);
  221. Log::record("runloop resume cid={$this->_cid} exists={$exists} pause={$this->_pause}",Log::DEBUG);
  222. if($this->_pause == 2) {
  223. $this->_pause = 0;
  224. Swoole\Coroutine::resume($this->_cid);
  225. }
  226. }
  227. public function stop()
  228. {
  229. Log::record(__FUNCTION__,Log::DEBUG);
  230. $this->_stop = true;
  231. $this->wait();
  232. }
  233. private function wait()
  234. {
  235. do {
  236. $res = Swoole\Coroutine::stats();
  237. $num = $res['coroutine_num'];
  238. if($num > 10) {
  239. Swoole\Coroutine::sleep(1);
  240. }
  241. Log::record("wait coroutine quit num = {$num}",Log::DEBUG);
  242. } while($num > 10);
  243. do
  244. {
  245. $count = 0;
  246. $coros = Swoole\Coroutine::list();
  247. foreach ($coros as $cid)
  248. {
  249. Log::record("sub coroutine cid={$cid}",Log::DEBUG);
  250. $pcid = Swoole\Coroutine::getPcid($cid);
  251. if($pcid == $this->_cid) {
  252. $count += 1;
  253. }
  254. }
  255. if($count > 0) {
  256. Swoole\Coroutine::sleep(1);
  257. }
  258. Log::record("wait sub coroutine quit num = {$count}",Log::DEBUG);
  259. }
  260. while($count > 0);
  261. Log::record("wait subcoroutine quit all",Log::DEBUG);
  262. }
  263. public function run()
  264. {
  265. refill\RefillFactory::instance();
  266. $this->_cid = Swoole\Coroutine::getCid();
  267. Log::record("runloop cid={$this->_cid}",Log::DEBUG);
  268. $queues = $this->mServer->scan();
  269. while (true)
  270. {
  271. try
  272. {
  273. if($this->_pause == 1) {
  274. $this->_pause = 2;
  275. Swoole\Coroutine::suspend();
  276. }
  277. if ($this->_stop) break;
  278. perfor_clear();
  279. if(defined('USE_COROUTINE') && USE_COROUTINE)
  280. {
  281. $res = Swoole\Coroutine::stats();
  282. $num = $res['coroutine_num'];
  283. $mem = memory_get_usage();
  284. if($num < ILooper::MAX_COROUTINE)
  285. {
  286. if($this->mServer->connect() == false) {
  287. Swoole\Coroutine::sleep(1);
  288. Log::record("Processor redis disconnect.",Log::ERR);
  289. continue;
  290. }
  291. if(defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP)
  292. {
  293. $content = $this->mServer->brpop($queues,1);
  294. if(empty($content)) {
  295. continue;
  296. }
  297. }
  298. else
  299. {
  300. $content = $this->mServer->rpop();
  301. if(empty($content)) {
  302. Swoole\Coroutine::sleep(1);
  303. continue;
  304. }
  305. }
  306. if($this->_stop)
  307. {
  308. foreach ($content as $key => $params) {
  309. util::push_queue($key, $params);
  310. }
  311. }
  312. else
  313. {
  314. go(function () use ($content, $num, $mem) {
  315. $start = microtime(true);
  316. Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}", Log::DEBUG);
  317. $method = $this->handle($content);
  318. $use_time = microtime(true) - $start;
  319. $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}", $use_time);
  320. Log::record($msg, Log::DEBUG);
  321. });
  322. }
  323. }
  324. else {
  325. Swoole\Coroutine::sleep(0.1);
  326. }
  327. }
  328. else
  329. {
  330. if($this->mServer->connect() == false) {
  331. sleep(1);
  332. continue;
  333. }
  334. $content = $this->mServer->pop($queues,1);
  335. if(empty($content)) continue;
  336. if($this->_stop)
  337. {
  338. foreach ($content as $key => $params) {
  339. DispatcherClient::instance()->push($key, $params);
  340. }
  341. }
  342. else
  343. {
  344. perfor_clear();
  345. perfor_start();
  346. $this->handle($content);
  347. perfor_end('Handle Request');
  348. $info = perfor_log();
  349. Log::record("{$info} \r\n",Log::DEBUG);
  350. }
  351. }
  352. }
  353. catch (Exception $e)
  354. {
  355. $err = $e->getMessage();
  356. $code = $e->getCode();
  357. Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
  358. }
  359. }
  360. Log::record("ILooper Run quit.", Log::DEBUG);
  361. }
  362. private function sig_handler($signo)
  363. {
  364. Log::record("queue quit at sig_handler.",Log::DEBUG);
  365. switch($signo) {
  366. case SIGINT:
  367. case SIGHUP:
  368. case SIGQUIT:
  369. case SIGTERM:
  370. $this->_stop = true;
  371. break;
  372. default:
  373. break;
  374. }
  375. }
  376. }