iqueue.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. <?php
  2. declare(strict_types=1);
  3. namespace queue;
  4. require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
  5. use Redis;
  6. use Exception;
  7. use Log;
  8. use Swoole;
  9. class IQueueDB
  10. {
  11. private $_redis;
  12. private $_queue_name;
  13. //存定义存储表的数量,系统会随机分配存储
  14. private $_comode;
  15. public function __construct($queue_name,$comode = false)
  16. {
  17. if ( !extension_loaded('redis') ) {
  18. throw_exception('redis failed to load');
  19. }
  20. $this->_queue_name = $queue_name;
  21. $this->_comode = $comode;
  22. if ($this->_comode) {
  23. $this->_redis = new Swoole\Coroutine\Redis();
  24. } else {
  25. $this->_redis = new Redis();
  26. }
  27. $this->connect();
  28. }
  29. public function connect(): bool
  30. {
  31. if ($this->_comode)
  32. {
  33. if ($this->_redis->connected) {
  34. return true;
  35. }
  36. else {
  37. $this->close();
  38. $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
  39. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  40. return $this->_redis->connected;
  41. }
  42. }
  43. elseif ($this->_redis->isConnected()) {
  44. return true;
  45. }
  46. else {
  47. $this->close();
  48. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
  49. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  50. return $this->_redis->isConnected();
  51. }
  52. }
  53. public function close()
  54. {
  55. if ($this->_comode)
  56. {
  57. if ($this->_redis->connected) {
  58. $this->_redis->close();
  59. }
  60. } elseif ($this->_redis->isConnected()) {
  61. $this->_redis->close();
  62. } else {
  63. Log::record("redis has closed",Log::DEBUG);
  64. }
  65. }
  66. public function rpush($value)
  67. {
  68. try
  69. {
  70. if($this->connect()) {
  71. $ret = $this->_redis->rPush($this->_queue_name, $value);
  72. Log::record("IQueueDB::lpush ret={$ret}", Log::DEBUG);
  73. return $ret;
  74. } else {
  75. Log::record("IQueueDB::rpush connect=false", Log::DEBUG);
  76. return false;
  77. }
  78. }
  79. catch(Exception $e) {
  80. Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
  81. return false;
  82. }
  83. }
  84. public function lpush($value)
  85. {
  86. try
  87. {
  88. if ($this->connect()) {
  89. $ret = $this->_redis->lPush($this->_queue_name, $value);
  90. Log::record("IQueueDB::lpush ret={$ret}", Log::DEBUG);
  91. return $ret;
  92. } else {
  93. Log::record("IQueueDB::lpush connect=false", Log::DEBUG);
  94. return false;
  95. }
  96. }
  97. catch(Exception $e) {
  98. Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
  99. return false;
  100. }
  101. }
  102. public function scan()
  103. {
  104. $list_key[] = $this->_queue_name;
  105. return $list_key;
  106. }
  107. public function rpop($key)
  108. {
  109. $result = $this->_redis->rPop($key);
  110. $tmp = serialize($result);
  111. Log::record("IQueueDB::rpop={$tmp}",Log::DEBUG);
  112. if ($result) {
  113. return $result[1];
  114. } else {
  115. return null;
  116. }
  117. }
  118. public function lpop($key)
  119. {
  120. $result = $this->_redis->lPop($key);
  121. $tmp = serialize($result);
  122. Log::record("IQueueDB::lpop={$tmp}",Log::DEBUG);
  123. if ($result) {
  124. return $result[1];
  125. } else {
  126. return null;
  127. }
  128. }
  129. public function brpop($key, $time)
  130. {
  131. $result = $this->_redis->brPop($key, $time);
  132. $tmp = serialize($result);
  133. Log::record("IQueueDB::brPop={$tmp}",Log::DEBUG);
  134. if ($result) {
  135. return $result[1];
  136. } else {
  137. return null;
  138. }
  139. }
  140. public function blpop($key, $time)
  141. {
  142. $result = $this->_redis->blPop($key, $time);
  143. $tmp = serialize($result);
  144. Log::record("IQueueDB::blPop={$tmp}",Log::DEBUG);
  145. if ($result) {
  146. return $result[1];
  147. } else {
  148. return null;
  149. }
  150. }
  151. public function clear() {
  152. $this->_redis->flushAll();
  153. }
  154. }
  155. /**
  156. * 队列处理
  157. *
  158. *
  159. * @package
  160. */
  161. class IClient
  162. {
  163. private $mQueuedb;
  164. public function __construct($queueDb)
  165. {
  166. $this->mQueuedb = $queueDb;
  167. }
  168. public function push($key, $value)
  169. {
  170. return $this->mQueuedb->lpush(serialize([$key=>$value]));
  171. }
  172. }
  173. class IServer
  174. {
  175. private $_queuedb;
  176. public function __construct($queueDb) {
  177. $this->_queuedb = $queueDb;
  178. }
  179. public function connect() : bool
  180. {
  181. return $this->_queuedb->connect();
  182. }
  183. public function rpop($key)
  184. {
  185. $result = $this->_queuedb->rpop($key);
  186. if($result != null) {
  187. return unserialize($result);
  188. } else {
  189. return false;
  190. }
  191. }
  192. public function lpop($key)
  193. {
  194. $result = $this->_queuedb->lpop($key);
  195. if($result != null) {
  196. return unserialize($result);
  197. } else {
  198. return false;
  199. }
  200. }
  201. public function brpop($key,$time)
  202. {
  203. $result = $this->_queuedb->brpop($key,$time);
  204. if($result != null) {
  205. return unserialize($result);
  206. } else {
  207. return false;
  208. }
  209. }
  210. public function blpop($key,$time)
  211. {
  212. $result = $this->_queuedb->lpop($key,$time);
  213. if($result != null) {
  214. return unserialize($result);
  215. } else {
  216. return false;
  217. }
  218. }
  219. public function scan() {
  220. return $this->_queuedb->scan();
  221. }
  222. public function stop()
  223. {
  224. $this->_queuedb->close();
  225. }
  226. }
  227. abstract class ILooper
  228. {
  229. private $_stop = false;
  230. private $mServer;
  231. const MAX_COROUTINE = 1000;
  232. protected function __construct($server)
  233. {
  234. $this->mServer = $server;
  235. }
  236. public function prepare()
  237. {
  238. if (ob_get_level()) ob_end_clean();
  239. pcntl_signal(SIGINT, [$this,'sig_handler']);
  240. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  241. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  242. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  243. }
  244. abstract protected function handle($msg);
  245. public function stop()
  246. {
  247. Log::record(__FUNCTION__,Log::DEBUG);
  248. $this->_stop = true;
  249. }
  250. public function run()
  251. {
  252. $queues = $this->mServer->scan();
  253. while (true)
  254. {
  255. try
  256. {
  257. if ($this->_stop) break;
  258. perfor_clear();
  259. if(defined('USE_COROUTINE') && USE_COROUTINE)
  260. {
  261. $res = Swoole\Coroutine::stats();
  262. $num = $res['coroutine_num'];
  263. $mem = memory_get_usage();
  264. Log::record("IQueueDB::run coroutin_num={$num} memory={$mem}",Log::DEBUG);
  265. if($num < ILooper::MAX_COROUTINE)
  266. {
  267. if($this->mServer->connect() == false) {
  268. Swoole\Coroutine::sleep(1);
  269. Log::record("Processor redis disconnect.",Log::ERR);
  270. continue;
  271. }
  272. $content = $this->mServer->rpop($queues);
  273. if(empty($content)) {
  274. Swoole\Coroutine::sleep(1);
  275. continue;
  276. }
  277. if($this->_stop)
  278. {
  279. foreach ($content as $key => $params) {
  280. DispatcherClient::instance()->push($key, $params);
  281. }
  282. }
  283. else
  284. {
  285. go(function ()use ($content,$num,$mem) {
  286. $start = microtime(true);
  287. Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}",Log::DEBUG);
  288. $method = $this->handle($content);
  289. $use_time = microtime(true) - $start;
  290. $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}",$use_time);
  291. Log::record($msg,Log::DEBUG);
  292. });
  293. }
  294. }
  295. else {
  296. Swoole\Coroutine::sleep(0.1);
  297. }
  298. }
  299. else
  300. {
  301. if($this->mServer->connect() == false) {
  302. sleep(1);
  303. continue;
  304. }
  305. $content = $this->mServer->pop($queues,1);
  306. if(empty($content)) continue;
  307. if($this->_stop)
  308. {
  309. foreach ($content as $key => $params) {
  310. DispatcherClient::instance()->push($key, $params);
  311. }
  312. }
  313. else
  314. {
  315. perfor_clear();
  316. perfor_start();
  317. $this->handle($content);
  318. perfor_end('Handle Request');
  319. $info = perfor_log();
  320. Log::record("{$info} \r\n",Log::DEBUG);
  321. }
  322. }
  323. }
  324. catch (Exception $e)
  325. {
  326. $err = $e->getMessage();
  327. $code = $e->getCode();
  328. Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
  329. }
  330. }
  331. Log::record("ILooper Run quit.", Log::DEBUG);
  332. }
  333. private function sig_handler($signo)
  334. {
  335. Log::record("queue quit at sig_handler.",Log::DEBUG);
  336. switch($signo) {
  337. case SIGINT:
  338. case SIGHUP:
  339. case SIGQUIT:
  340. case SIGTERM:
  341. $this->_stop = true;
  342. break;
  343. default:
  344. break;
  345. }
  346. }
  347. }