iqueue.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. <?php
  2. declare(strict_types=1);
  3. namespace queue;
  4. require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
  5. use Redis;
  6. use Exception;
  7. use Log;
  8. use Swoole;
  9. class IQueueDB
  10. {
  11. private $_redis;
  12. private $_queue_name;
  13. public function __construct($queue_name)
  14. {
  15. if ( !extension_loaded('redis') ) {
  16. throw_exception('redis failed to load');
  17. }
  18. $this->_queue_name = $queue_name;
  19. $this->_redis = new Redis();
  20. $this->connect();
  21. }
  22. public function connect(): bool
  23. {
  24. if ($this->_redis->isConnected()) {
  25. return true;
  26. }
  27. else {
  28. $this->close();
  29. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
  30. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  31. return $this->_redis->isConnected();
  32. }
  33. }
  34. public function close()
  35. {
  36. if ($this->_redis->isConnected()) {
  37. $this->_redis->close();
  38. } else {
  39. Log::record("redis has closed",Log::DEBUG);
  40. }
  41. }
  42. public function rpush($value)
  43. {
  44. try
  45. {
  46. if($this->connect()) {
  47. $ret = $this->_redis->rPush($this->_queue_name, $value);
  48. return $ret;
  49. } else {
  50. Log::record("IQueueDB::rpush connect=false", Log::DEBUG);
  51. return false;
  52. }
  53. }
  54. catch(Exception $e) {
  55. Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
  56. return false;
  57. }
  58. }
  59. public function lpush($value)
  60. {
  61. try
  62. {
  63. if ($this->connect()) {
  64. $ret = $this->_redis->lPush($this->_queue_name, $value);
  65. return $ret;
  66. } else {
  67. Log::record("IQueueDB::lpush connect=false", Log::DEBUG);
  68. return false;
  69. }
  70. }
  71. catch(Exception $e) {
  72. Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
  73. return false;
  74. }
  75. }
  76. public function scan()
  77. {
  78. $list_key[] = $this->_queue_name;
  79. return $list_key;
  80. }
  81. public function rpop()
  82. {
  83. $key = $this->_queue_name;
  84. $result = $this->_redis->rPop($key);
  85. return $result;
  86. }
  87. public function lpop()
  88. {
  89. $key = $this->_queue_name;
  90. $result = $this->_redis->lPop($key);
  91. return $result;
  92. }
  93. public function brpop($key, $time)
  94. {
  95. $result = $this->_redis->brPop($key, $time);
  96. if ($result) {
  97. return $result[1];
  98. } else {
  99. return null;
  100. }
  101. }
  102. public function blpop($key, $time)
  103. {
  104. $result = $this->_redis->blPop($key, $time);
  105. if ($result) {
  106. return $result[1];
  107. } else {
  108. return null;
  109. }
  110. }
  111. public function clear() {
  112. $this->_redis->flushAll();
  113. }
  114. }
  115. /**
  116. * 队列处理
  117. *
  118. *
  119. * @package
  120. */
  121. class IClient
  122. {
  123. private $mQueuedb;
  124. public function __construct($queueDb)
  125. {
  126. $this->mQueuedb = $queueDb;
  127. }
  128. public function push($key, $value)
  129. {
  130. return $this->mQueuedb->lpush(serialize([$key=>$value]));
  131. }
  132. }
  133. class IJsonClient
  134. {
  135. private $mQueuedb;
  136. public function __construct($queueDb)
  137. {
  138. $this->mQueuedb = $queueDb;
  139. }
  140. public function push($key, $value)
  141. {
  142. return $this->mQueuedb->lpush(json_encode(['method' => $key, 'params' => $value]));
  143. }
  144. }
  145. class IServer
  146. {
  147. private $_queuedb;
  148. public function __construct($queueDb) {
  149. $this->_queuedb = $queueDb;
  150. }
  151. public function connect() : bool
  152. {
  153. return $this->_queuedb->connect();
  154. }
  155. public function rpop()
  156. {
  157. $result = $this->_queuedb->rpop();
  158. if($result != null) {
  159. return unserialize($result);
  160. } else {
  161. return false;
  162. }
  163. }
  164. public function lpop()
  165. {
  166. $result = $this->_queuedb->lpop();
  167. if($result != null) {
  168. return unserialize($result);
  169. } else {
  170. return false;
  171. }
  172. }
  173. public function brpop($key,$time)
  174. {
  175. $result = $this->_queuedb->brpop($key,$time);
  176. if($result != null) {
  177. return unserialize($result);
  178. } else {
  179. return false;
  180. }
  181. }
  182. public function blpop($key,$time)
  183. {
  184. $result = $this->_queuedb->lpop($key,$time);
  185. if($result != null) {
  186. return unserialize($result);
  187. } else {
  188. return false;
  189. }
  190. }
  191. public function scan() {
  192. return $this->_queuedb->scan();
  193. }
  194. public function stop()
  195. {
  196. $this->_queuedb->close();
  197. }
  198. }
  199. abstract class ILooper
  200. {
  201. private $_stop = false;
  202. private $_pause = 0; // 0,正常运行,1,申请暂停,2,暂停成功
  203. private $_cid = 0;
  204. private $mServer;
  205. const MAX_COROUTINE = 300;
  206. protected function __construct($server)
  207. {
  208. $this->mServer = $server;
  209. }
  210. public function prepare()
  211. {
  212. if (ob_get_level()) ob_end_clean();
  213. pcntl_signal(SIGINT, [$this,'sig_handler']);
  214. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  215. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  216. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  217. }
  218. abstract protected function handle($msg);
  219. public function pause()
  220. {
  221. if($this->_pause != 0) {
  222. Log::record("subcoroutine pause state={$this->_pause} cannot pause.",Log::DEBUG);
  223. return;
  224. }
  225. $this->_pause = 1;
  226. do{
  227. Swoole\Coroutine::sleep(1);
  228. } while($this->_pause == 1);
  229. $this->wait();
  230. Log::record("subcoroutine pause state={$this->_pause} success.",Log::DEBUG);
  231. }
  232. public function resume()
  233. {
  234. if($this->_pause == 2) {
  235. $this->_pause = 0;
  236. Swoole\Coroutine::resume($this->_cid);
  237. }
  238. Log::record("subcoroutine resume success.",Log::DEBUG);
  239. }
  240. public function stop()
  241. {
  242. Log::record(__FUNCTION__,Log::DEBUG);
  243. $this->_stop = true;
  244. $this->wait();
  245. }
  246. private function wait()
  247. {
  248. $coroutine_num = function()
  249. {
  250. $res = Swoole\Coroutine::stats();
  251. $num = $res['coroutine_num'];
  252. return $num;
  253. };
  254. do {
  255. $num = $coroutine_num();
  256. if($num > 10) {
  257. Swoole\Coroutine::sleep(1);
  258. }
  259. } while($num > 10);
  260. do
  261. {
  262. $count = 0;
  263. $coros = Swoole\Coroutine::list();
  264. foreach ($coros as $cid)
  265. {
  266. $pcid = Swoole\Coroutine::getPcid($cid);
  267. if($pcid == $this->_cid) {
  268. $count += 1;
  269. $time_secs = Swoole\Coroutine::getElapsed($cid);
  270. $backtrace = Swoole\Coroutine::getBackTrace($cid);
  271. $trace = "wait Coroutine quit elapsed cid={$cid} seconds={$time_secs}\n";
  272. foreach ($backtrace as $item) {
  273. $trace .= "{$item['file']}\t{$item['line']}\t{$item['function']}\n";
  274. }
  275. Log::record($trace,Log::DEBUG);
  276. }
  277. }
  278. if($count > 0) {
  279. Swoole\Coroutine::sleep(1);
  280. }
  281. Log::record("wait running subcoroutine count = {$count} quit.",Log::DEBUG);
  282. }
  283. while($count > 0);
  284. $num = $coroutine_num();
  285. Log::record("subcoroutine wait: quit all. cur coroutine num={$num}",Log::DEBUG);
  286. }
  287. public function run()
  288. {
  289. $this->_cid = Swoole\Coroutine::getCid();
  290. $queues = $this->mServer->scan();
  291. while (true)
  292. {
  293. try
  294. {
  295. if ($this->_stop) break;
  296. if($this->_pause == 1) {
  297. Log::record("subcoroutine runlooper pause.",Log::DEBUG);
  298. $this->_pause = 2;
  299. Swoole\Coroutine::suspend();
  300. Log::record("subcoroutine runlooper resume success.",Log::DEBUG);
  301. }
  302. perfor_clear();
  303. if(defined('USE_COROUTINE') && USE_COROUTINE)
  304. {
  305. $res = Swoole\Coroutine::stats();
  306. $num = $res['coroutine_num'];
  307. $mem = memory_get_usage();
  308. if($num < ILooper::MAX_COROUTINE)
  309. {
  310. if($this->mServer->connect() == false) {
  311. Swoole\Coroutine::sleep(1);
  312. Log::record("Processor redis disconnect.",Log::ERR);
  313. continue;
  314. }
  315. if(defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP)
  316. {
  317. $content = $this->mServer->brpop($queues,1);
  318. if(empty($content)) {
  319. continue;
  320. }
  321. }
  322. else
  323. {
  324. $content = $this->mServer->rpop();
  325. if(empty($content)) {
  326. Swoole\Coroutine::sleep(0.1);
  327. continue;
  328. }
  329. }
  330. $pThis = $this;
  331. go(function () use ($content, $num, $mem,$pThis) {
  332. $start = microtime(true);
  333. Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}", Log::DEBUG);
  334. $method = $pThis->handle($content);
  335. $use_time = microtime(true) - $start;
  336. $res = Swoole\Coroutine::stats();
  337. $num = $res['coroutine_num'];
  338. $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}", $use_time);
  339. Log::record($msg, Log::DEBUG);
  340. });
  341. }
  342. else {
  343. Swoole\Coroutine::sleep(0.1);
  344. }
  345. }
  346. else
  347. {
  348. if($this->mServer->connect() == false) {
  349. sleep(1);
  350. continue;
  351. }
  352. $content = $this->mServer->pop($queues,1);
  353. if(empty($content)) continue;
  354. if($this->_stop)
  355. {
  356. foreach ($content as $key => $params) {
  357. DispatcherClient::instance()->push($key, $params);
  358. }
  359. }
  360. else
  361. {
  362. perfor_clear();
  363. perfor_start();
  364. $this->handle($content);
  365. perfor_end('Handle Request');
  366. $info = perfor_log();
  367. Log::record("{$info} \r\n",Log::DEBUG);
  368. }
  369. }
  370. }
  371. catch (Exception $e)
  372. {
  373. $err = $e->getMessage();
  374. $code = $e->getCode();
  375. Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
  376. }
  377. }
  378. Log::record("ILooper Run quit.", Log::DEBUG);
  379. }
  380. private function sig_handler($signo)
  381. {
  382. Log::record("queue quit at sig_handler.",Log::DEBUG);
  383. switch($signo) {
  384. case SIGINT:
  385. case SIGHUP:
  386. case SIGQUIT:
  387. case SIGTERM:
  388. $this->_stop = true;
  389. break;
  390. default:
  391. break;
  392. }
  393. }
  394. }