queue.php 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. <?php
  2. declare(strict_types=0);
  3. /**
  4. * 队列处理
  5. *
  6. *
  7. * @package
  8. */
  9. class QueueClient
  10. {
  11. private static $queuedb;
  12. public static function push($queue_name,$host,$port,$key, $value)
  13. {
  14. if (!is_object(self::$queuedb)) {
  15. self::$queuedb = new QueueDB($queue_name,$host,$port);
  16. }
  17. return self::$queuedb->push(serialize([$key=>$value]));
  18. }
  19. }
  20. class QueueServer
  21. {
  22. private $_queuedb;
  23. public function __construct($queue_name,$host,$port) {
  24. $this->_queuedb = new QueueDB($queue_name,$host,$port);
  25. }
  26. public function pop($key,$time)
  27. {
  28. $result = $this->_queuedb->pop($key,$time);
  29. if($result != null) {
  30. return unserialize($result);
  31. } else {
  32. return false;
  33. }
  34. }
  35. public function scan() {
  36. return $this->_queuedb->scan();
  37. }
  38. }
  39. class QueueDB
  40. {
  41. private $_redis;
  42. private $queue_name = '';
  43. //存定义存储表的数量,系统会随机分配存储
  44. public function __construct($name,$host,$port)
  45. {
  46. $this->queue_name = $name;
  47. if ( !extension_loaded('redis') ) {
  48. return new Exception('redis failed to load');
  49. }
  50. $this->_redis = new Redis();
  51. $this->_redis->connect($host,$port,20);
  52. $this->_redis->setOption(Redis::OPT_READ_TIMEOUT, 10);
  53. }
  54. public function push($value)
  55. {
  56. try {
  57. return $this->_redis->lPush($this->queue_name,$value);
  58. } catch(Exception $e) {
  59. return new Exception($e->getMessage());
  60. }
  61. }
  62. public function scan()
  63. {
  64. return $this->queue_name;
  65. }
  66. public function pop($key, $time)
  67. {
  68. $result = $this->_redis->brPop($key, $time);
  69. if ($result) {
  70. return $result[1];
  71. } else {
  72. return null;
  73. }
  74. }
  75. public function clear() {
  76. $this->_redis->flushAll();
  77. }
  78. }