iqueue.php 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. <?php
  2. declare(strict_types=1);
  3. namespace queue;
  4. require_once(BASE_ROOT_PATH . '/helper/performance_helper.php');
  5. use Redis;
  6. use Exception;
  7. use Log;
  8. use refill\util;
  9. use Swoole;
  10. class IQueueDB
  11. {
  12. private $_redis;
  13. private $_queue_name;
  14. //存定义存储表的数量,系统会随机分配存储
  15. private $_comode;
  16. public function __construct($queue_name,$comode = false)
  17. {
  18. if ( !extension_loaded('redis') ) {
  19. throw_exception('redis failed to load');
  20. }
  21. $this->_queue_name = $queue_name;
  22. $this->_comode = $comode;
  23. if ($this->_comode) {
  24. $this->_redis = new Swoole\Coroutine\Redis();
  25. } else {
  26. $this->_redis = new Redis();
  27. }
  28. $this->connect();
  29. }
  30. public function connect(): bool
  31. {
  32. if ($this->_comode)
  33. {
  34. if ($this->_redis->connected) {
  35. return true;
  36. }
  37. else {
  38. $this->close();
  39. $ret = $this->_redis->connect(C('coroutine.redis_host'), C('coroutine.redis_port'));
  40. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  41. return $this->_redis->connected;
  42. }
  43. }
  44. elseif ($this->_redis->isConnected()) {
  45. return true;
  46. }
  47. else {
  48. $this->close();
  49. $ret = $this->_redis->connect(C('queue.host'), C('queue.port'));
  50. Log::record("IQueueDB::connect ret = {$ret}", Log::DEBUG);
  51. return $this->_redis->isConnected();
  52. }
  53. }
  54. public function close()
  55. {
  56. if ($this->_comode)
  57. {
  58. if ($this->_redis->connected) {
  59. $this->_redis->close();
  60. }
  61. } elseif ($this->_redis->isConnected()) {
  62. $this->_redis->close();
  63. } else {
  64. Log::record("redis has closed",Log::DEBUG);
  65. }
  66. }
  67. public function rpush($value)
  68. {
  69. try
  70. {
  71. if($this->connect()) {
  72. $ret = $this->_redis->rPush($this->_queue_name, $value);
  73. return $ret;
  74. } else {
  75. Log::record("IQueueDB::rpush connect=false", Log::DEBUG);
  76. return false;
  77. }
  78. }
  79. catch(Exception $e) {
  80. Log::record("IQueueDB::rpush " . $e->getMessage(),Log::ERR);
  81. return false;
  82. }
  83. }
  84. public function lpush($value)
  85. {
  86. try
  87. {
  88. if ($this->connect()) {
  89. $ret = $this->_redis->lPush($this->_queue_name, $value);
  90. return $ret;
  91. } else {
  92. Log::record("IQueueDB::lpush connect=false", Log::DEBUG);
  93. return false;
  94. }
  95. }
  96. catch(Exception $e) {
  97. Log::record("IQueueDB::lpush " . $e->getMessage(),Log::ERR);
  98. return false;
  99. }
  100. }
  101. public function scan()
  102. {
  103. $list_key[] = $this->_queue_name;
  104. return $list_key;
  105. }
  106. public function rpop()
  107. {
  108. $key = $this->_queue_name;
  109. $result = $this->_redis->rPop($key);
  110. return $result;
  111. }
  112. public function lpop()
  113. {
  114. $key = $this->_queue_name;
  115. $result = $this->_redis->lPop($key);
  116. return $result;
  117. }
  118. public function brpop($key, $time)
  119. {
  120. $result = $this->_redis->brPop($key, $time);
  121. if ($result) {
  122. return $result[1];
  123. } else {
  124. return null;
  125. }
  126. }
  127. public function blpop($key, $time)
  128. {
  129. $result = $this->_redis->blPop($key, $time);
  130. if ($result) {
  131. return $result[1];
  132. } else {
  133. return null;
  134. }
  135. }
  136. public function clear() {
  137. $this->_redis->flushAll();
  138. }
  139. }
  140. /**
  141. * 队列处理
  142. *
  143. *
  144. * @package
  145. */
  146. class IClient
  147. {
  148. private $mQueuedb;
  149. public function __construct($queueDb)
  150. {
  151. $this->mQueuedb = $queueDb;
  152. }
  153. public function push($key, $value)
  154. {
  155. return $this->mQueuedb->lpush(serialize([$key=>$value]));
  156. }
  157. }
  158. class IServer
  159. {
  160. private $_queuedb;
  161. public function __construct($queueDb) {
  162. $this->_queuedb = $queueDb;
  163. }
  164. public function connect() : bool
  165. {
  166. return $this->_queuedb->connect();
  167. }
  168. public function rpop()
  169. {
  170. $result = $this->_queuedb->rpop();
  171. if($result != null) {
  172. return unserialize($result);
  173. } else {
  174. return false;
  175. }
  176. }
  177. public function lpop()
  178. {
  179. $result = $this->_queuedb->lpop();
  180. if($result != null) {
  181. return unserialize($result);
  182. } else {
  183. return false;
  184. }
  185. }
  186. public function brpop($key,$time)
  187. {
  188. $result = $this->_queuedb->brpop($key,$time);
  189. if($result != null) {
  190. return unserialize($result);
  191. } else {
  192. return false;
  193. }
  194. }
  195. public function blpop($key,$time)
  196. {
  197. $result = $this->_queuedb->lpop($key,$time);
  198. if($result != null) {
  199. return unserialize($result);
  200. } else {
  201. return false;
  202. }
  203. }
  204. public function scan() {
  205. return $this->_queuedb->scan();
  206. }
  207. public function stop()
  208. {
  209. $this->_queuedb->close();
  210. }
  211. }
  212. abstract class ILooper
  213. {
  214. private $_stop = false;
  215. private $mServer;
  216. const MAX_COROUTINE = 200;
  217. protected function __construct($server)
  218. {
  219. $this->mServer = $server;
  220. }
  221. public function prepare()
  222. {
  223. if (ob_get_level()) ob_end_clean();
  224. pcntl_signal(SIGINT, [$this,'sig_handler']);
  225. pcntl_signal(SIGHUP, [$this,'sig_handler']);
  226. pcntl_signal(SIGQUIT, [$this,'sig_handler']);
  227. pcntl_signal(SIGTERM, [$this,'sig_handler']);
  228. }
  229. abstract protected function handle($msg);
  230. public function stop()
  231. {
  232. Log::record(__FUNCTION__,Log::DEBUG);
  233. $this->_stop = true;
  234. }
  235. public function run()
  236. {
  237. $queues = $this->mServer->scan();
  238. while (true)
  239. {
  240. try
  241. {
  242. if ($this->_stop) break;
  243. perfor_clear();
  244. if(defined('USE_COROUTINE') && USE_COROUTINE)
  245. {
  246. $res = Swoole\Coroutine::stats();
  247. $num = $res['coroutine_num'];
  248. $mem = memory_get_usage();
  249. if($num < ILooper::MAX_COROUTINE)
  250. {
  251. if($this->mServer->connect() == false) {
  252. Swoole\Coroutine::sleep(1);
  253. Log::record("Processor redis disconnect.",Log::ERR);
  254. continue;
  255. }
  256. if(defined('COROUTINE_HOOK_TCP') && COROUTINE_HOOK_TCP)
  257. {
  258. $content = $this->mServer->brpop($queues,1);
  259. if(empty($content)) {
  260. continue;
  261. }
  262. }
  263. else
  264. {
  265. $content = $this->mServer->rpop();
  266. if(empty($content)) {
  267. Swoole\Coroutine::sleep(1);
  268. continue;
  269. }
  270. }
  271. if($this->_stop)
  272. {
  273. foreach ($content as $key => $params) {
  274. util::push_queue($key, $params);
  275. }
  276. }
  277. else
  278. {
  279. go(function ()use ($content,$num,$mem)
  280. {
  281. $start = microtime(true);
  282. Log::record("BeginGoFunction coroutin_num={$num} memory={$mem}",Log::DEBUG);
  283. $method = $this->handle($content);
  284. $use_time = microtime(true) - $start;
  285. $msg = sprintf("EndGoFunction coroutin_num={$num} memory={$mem} request_time=%.6f method={$method}",$use_time);
  286. Log::record($msg,Log::DEBUG);
  287. });
  288. }
  289. }
  290. else {
  291. Swoole\Coroutine::sleep(0.1);
  292. }
  293. }
  294. else
  295. {
  296. if($this->mServer->connect() == false) {
  297. sleep(1);
  298. continue;
  299. }
  300. $content = $this->mServer->pop($queues,1);
  301. if(empty($content)) continue;
  302. if($this->_stop)
  303. {
  304. foreach ($content as $key => $params) {
  305. DispatcherClient::instance()->push($key, $params);
  306. }
  307. }
  308. else
  309. {
  310. perfor_clear();
  311. perfor_start();
  312. $this->handle($content);
  313. perfor_end('Handle Request');
  314. $info = perfor_log();
  315. Log::record("{$info} \r\n",Log::DEBUG);
  316. }
  317. }
  318. }
  319. catch (Exception $e)
  320. {
  321. $err = $e->getMessage();
  322. $code = $e->getCode();
  323. Log::record("QueueDB pop err: code={$code} err={$err}",Log::ERR);
  324. }
  325. }
  326. Log::record("ILooper Run quit.", Log::DEBUG);
  327. }
  328. private function sig_handler($signo)
  329. {
  330. Log::record("queue quit at sig_handler.",Log::DEBUG);
  331. switch($signo) {
  332. case SIGINT:
  333. case SIGHUP:
  334. case SIGQUIT:
  335. case SIGTERM:
  336. $this->_stop = true;
  337. break;
  338. default:
  339. break;
  340. }
  341. }
  342. }