iqueue.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. <?php
  2. declare(strict_types=1);
  3. namespace queue;
  4. require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
  5. use Redis;
  6. use Exception;
  7. use Log;
  8. use Swoole;
  9. class IQueueDB
  10. {
  11. private $_redis;
  12. private $_queue_name;
  13. public function __construct($queue_name)
  14. {
  15. if ( !extension_loaded('redis') ) {
  16. throw_exception('redis failed to load');
  17. }
  18. $this->_queue_name = $queue_name;
  19. $this->_redis = new Redis();
  20. $this->connect();
  21. }
  22. public function connect(): bool
  23. {
  24. if ($this->_redis->isConnected()) {
  25. return true;
  26. }
  27. else {
  28. $this->close();
  29. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
  30. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  31. return $this->_redis->isConnected();
  32. }
  33. }
  34. public function close()
  35. {
  36. if ($this->_redis->isConnected()) {
  37. $this->_redis->close();
  38. } else {
  39. Log::record("redis has closed",Log::DEBUG);
  40. }
  41. }
  42. public function rpush($value)
  43. {
  44. try
  45. {
  46. if($this->connect()) {
  47. $ret = $this->_redis->rPush($this->_queue_name, $value);
  48. return $ret;
  49. } else {
  50. Log::record("IQueueDB::rpush connect=false", Log::DEBUG);
  51. return false;
  52. }
  53. }
  54. catch(Exception $e) {
  55. Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
  56. return false;
  57. }
  58. }
  59. public function lpush($value)
  60. {
  61. try
  62. {
  63. if ($this->connect()) {
  64. $ret = $this->_redis->lPush($this->_queue_name, $value);
  65. return $ret;
  66. } else {
  67. Log::record("IQueueDB::lpush connect=false", Log::DEBUG);
  68. return false;
  69. }
  70. }
  71. catch(Exception $e) {
  72. Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
  73. return false;
  74. }
  75. }
  76. public function scan()
  77. {
  78. $list_key[] = $this->_queue_name;
  79. return $list_key;
  80. }
  81. public function rpop()
  82. {
  83. $key = $this->_queue_name;
  84. $result = $this->_redis->rPop($key);
  85. return $result;
  86. }
  87. public function lpop()
  88. {
  89. $key = $this->_queue_name;
  90. $result = $this->_redis->lPop($key);
  91. return $result;
  92. }
  93. public function brpop($key, $time)
  94. {
  95. $result = $this->_redis->brPop($key, $time);
  96. if ($result) {
  97. return $result[1];
  98. } else {
  99. return null;
  100. }
  101. }
  102. public function blpop($key, $time)
  103. {
  104. $result = $this->_redis->blPop($key, $time);
  105. if ($result) {
  106. return $result[1];
  107. } else {
  108. return null;
  109. }
  110. }
  111. public function clear() {
  112. $this->_redis->flushAll();
  113. }
  114. }
  115. /**
  116. * 队列处理
  117. *
  118. *
  119. * @package
  120. */
  121. class IClient
  122. {
  123. private $mQueuedb;
  124. public function __construct($queueDb)
  125. {
  126. $this->mQueuedb = $queueDb;
  127. }
  128. public function push($key, $value)
  129. {
  130. return $this->mQueuedb->lpush(serialize([$key=>$value]));
  131. }
  132. }
  133. class IServer
  134. {
  135. private $_queuedb;
  136. public function __construct($queueDb) {
  137. $this->_queuedb = $queueDb;
  138. }
  139. public function connect() : bool
  140. {
  141. return $this->_queuedb->connect();
  142. }
  143. public function rpop()
  144. {
  145. $result = $this->_queuedb->rpop();
  146. if($result != null) {
  147. return unserialize($result);
  148. } else {
  149. return false;
  150. }
  151. }
  152. public function lpop()
  153. {
  154. $result = $this->_queuedb->lpop();
  155. if($result != null) {
  156. return unserialize($result);
  157. } else {
  158. return false;
  159. }
  160. }
  161. public function brpop($key,$time)
  162. {
  163. $result = $this->_queuedb->brpop($key,$time);
  164. if($result != null) {
  165. return unserialize($result);
  166. } else {
  167. return false;
  168. }
  169. }
  170. public function blpop($key,$time)
  171. {
  172. $result = $this->_queuedb->lpop($key,$time);
  173. if($result != null) {
  174. return unserialize($result);
  175. } else {
  176. return false;
  177. }
  178. }
  179. public function scan() {
  180. return $this->_queuedb->scan();
  181. }
  182. public function stop()
  183. {
  184. $this->_queuedb->close();
  185. }
  186. }
  187. abstract class ILooper
  188. {
  189. private $_stop = false;
  190. private $_pause = 0; // 0,正常运行,1,申请暂停,2,暂停成功
  191. private $_cid = 0;
  192. private $mServer;
  193. const MAX_COROUTINE = 20;
  194. protected function __construct($server)
  195. {
  196. $this->mServer = $server;
  197. }
  198. public function prepare()
  199. {
  200. if (ob_get_level()) ob_end_clean();
  201. pcntl_signal(SIGINT, [$this,'sig_handler']);
  202. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  203. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  204. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  205. }
  206. abstract protected function handle($msg);
  207. public function pause()
  208. {
  209. if($this->_pause != 0) {
  210. Log::record("subcoroutine pause state={$this->_pause} cannot pause.",Log::DEBUG);
  211. return;
  212. }
  213. $this->_pause = 1;
  214. do{
  215. Swoole\Coroutine::sleep(1);
  216. } while($this->_pause == 1);
  217. $this->wait();
  218. Log::record("subcoroutine pause state={$this->_pause} success.",Log::DEBUG);
  219. }
  220. public function resume()
  221. {
  222. if($this->_pause == 2) {
  223. $this->_pause = 0;
  224. Swoole\Coroutine::resume($this->_cid);
  225. }
  226. Log::record("subcoroutine resume success.",Log::DEBUG);
  227. }
  228. public function stop()
  229. {
  230. Log::record(__FUNCTION__,Log::DEBUG);
  231. $this->_stop = true;
  232. $this->wait();
  233. }
  234. private function wait()
  235. {
  236. $coroutine_num = function()
  237. {
  238. $res = Swoole\Coroutine::stats();
  239. $num = $res['coroutine_num'];
  240. return $num;
  241. };
  242. do {
  243. $num = $coroutine_num();
  244. if($num > 10) {
  245. Swoole\Coroutine::sleep(1);
  246. }
  247. } while($num > 10);
  248. do
  249. {
  250. $count = 0;
  251. $coros = Swoole\Coroutine::list();
  252. foreach ($coros as $cid)
  253. {
  254. $pcid = Swoole\Coroutine::getPcid($cid);
  255. if($pcid == $this->_cid) {
  256. $count += 1;
  257. $time_secs = Swoole\Coroutine::getElapsed($cid);
  258. $backtrace = Swoole\Coroutine::getBackTrace($cid);
  259. $trace = "wait Coroutine quit elapsed cid={$cid} seconds={$time_secs}\n";
  260. foreach ($backtrace as $item) {
  261. $trace .= "{$item['file']}\t{$item['line']}\t{$item['function']}\n";
  262. }
  263. Log::record($trace,Log::DEBUG);
  264. }
  265. }
  266. if($count > 0) {
  267. Swoole\Coroutine::sleep(1);
  268. }
  269. Log::record("wait running subcoroutine count = {$count} quit.",Log::DEBUG);
  270. }
  271. while($count > 0);
  272. $num = $coroutine_num();
  273. Log::record("subcoroutine wait: quit all. cur coroutine num={$num}",Log::DEBUG);
  274. }
  275. public function run()
  276. {
  277. $this->_cid = Swoole\Coroutine::getCid();
  278. $queues = $this->mServer->scan();
  279. while (true)
  280. {
  281. try
  282. {
  283. if ($this->_stop) break;
  284. if($this->_pause == 1) {
  285. Log::record("subcoroutine runlooper pause.",Log::DEBUG);
  286. $this->_pause = 2;
  287. Swoole\Coroutine::suspend();
  288. Log::record("subcoroutine runlooper resume success.",Log::DEBUG);
  289. }
  290. perfor_clear();
  291. if(defined('USE_COROUTINE') && USE_COROUTINE)
  292. {
  293. $res = Swoole\Coroutine::stats();
  294. $num = $res['coroutine_num'];
  295. $mem = memory_get_usage();
  296. if($num < ILooper::MAX_COROUTINE)
  297. {
  298. if($this->mServer->connect() == false) {
  299. Swoole\Coroutine::sleep(1);
  300. Log::record("Processor redis disconnect.",Log::ERR);
  301. continue;
  302. }
  303. if(defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP)
  304. {
  305. $content = $this->mServer->brpop($queues,1);
  306. if(empty($content)) {
  307. continue;
  308. }
  309. }
  310. else
  311. {
  312. $content = $this->mServer->rpop();
  313. if(empty($content)) {
  314. Swoole\Coroutine::sleep(0.1);
  315. continue;
  316. }
  317. }
  318. $pThis = $this;
  319. go(function () use ($content, $num, $mem,$pThis) {
  320. $start = microtime(true);
  321. Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}", Log::DEBUG);
  322. $method = $pThis->handle($content);
  323. $use_time = microtime(true) - $start;
  324. $res = Swoole\Coroutine::stats();
  325. $num = $res['coroutine_num'];
  326. $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}", $use_time);
  327. Log::record($msg, Log::DEBUG);
  328. });
  329. }
  330. else {
  331. Swoole\Coroutine::sleep(0.1);
  332. }
  333. }
  334. else
  335. {
  336. if($this->mServer->connect() == false) {
  337. sleep(1);
  338. continue;
  339. }
  340. $content = $this->mServer->pop($queues,1);
  341. if(empty($content)) continue;
  342. if($this->_stop)
  343. {
  344. foreach ($content as $key => $params) {
  345. DispatcherClient::instance()->push($key, $params);
  346. }
  347. }
  348. else
  349. {
  350. perfor_clear();
  351. perfor_start();
  352. $this->handle($content);
  353. perfor_end('Handle Request');
  354. $info = perfor_log();
  355. Log::record("{$info} \r\n",Log::DEBUG);
  356. }
  357. }
  358. }
  359. catch (Exception $e)
  360. {
  361. $err = $e->getMessage();
  362. $code = $e->getCode();
  363. Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
  364. }
  365. }
  366. Log::record("ILooper Run quit.", Log::DEBUG);
  367. }
  368. private function sig_handler($signo)
  369. {
  370. Log::record("queue quit at sig_handler.",Log::DEBUG);
  371. switch($signo) {
  372. case SIGINT:
  373. case SIGHUP:
  374. case SIGQUIT:
  375. case SIGTERM:
  376. $this->_stop = true;
  377. break;
  378. default:
  379. break;
  380. }
  381. }
  382. }