mysql_proxy_server.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. <?php
  2. class DBServer
  3. {
  4. protected $pool_size = 20;
  5. protected $idle_pool = array(); //空闲连接
  6. protected $busy_pool = array(); //工作连接
  7. protected $wait_queue = array(); //等待的请求
  8. protected $wait_queue_max = 100; //等待队列的最大长度,超过后将拒绝新的请求
  9. /**
  10. * @var swoole_server
  11. */
  12. protected $serv;
  13. function run()
  14. {
  15. $serv = new swoole_server("127.0.0.1", 9509);
  16. $serv->set(array(
  17. 'worker_num' => 1,
  18. 'max_request' => 0,
  19. ));
  20. $serv->on('WorkerStart', array($this, 'onStart'));
  21. //$serv->on('Connect', array($this, 'onConnect'));
  22. $serv->on('Receive', array($this, 'onReceive'));
  23. //$serv->on('Close', array($this, 'onClose'));
  24. $serv->start();
  25. }
  26. function onStart($serv)
  27. {
  28. $this->serv = $serv;
  29. for ($i = 0; $i < $this->pool_size; $i++) {
  30. $db = new mysqli;
  31. $db->connect('127.0.0.1', 'root', 'root', 'www4swoole');
  32. $db_sock = swoole_get_mysqli_sock($db);
  33. swoole_event_add($db_sock, array($this, 'onSQLReady'));
  34. $this->idle_pool[] = array(
  35. 'mysqli' => $db,
  36. 'db_sock' => $db_sock,
  37. 'fd' => 0,
  38. );
  39. }
  40. echo "Server: start.Swoole version is [" . SWOOLE_VERSION . "]\n";
  41. }
  42. function onSQLReady($db_sock)
  43. {
  44. $db_res = $this->busy_pool[$db_sock];
  45. $mysqli = $db_res['mysqli'];
  46. $fd = $db_res['fd'];
  47. echo __METHOD__ . ": client_sock=$fd|db_sock=$db_sock\n";
  48. if ($result = $mysqli->reap_async_query()) {
  49. $ret = var_export($result->fetch_all(MYSQLI_ASSOC), true) . "\n";
  50. $this->serv->send($fd, $ret);
  51. if (is_object($result)) {
  52. mysqli_free_result($result);
  53. }
  54. } else {
  55. $this->serv->send($fd, sprintf("MySQLi Error: %s\n", mysqli_error($mysqli)));
  56. }
  57. //release mysqli object
  58. $this->idle_pool[] = $db_res;
  59. unset($this->busy_pool[$db_sock]);
  60. //这里可以取出一个等待请求
  61. if (count($this->wait_queue) > 0) {
  62. $idle_n = count($this->idle_pool);
  63. for ($i = 0; $i < $idle_n; $i++) {
  64. $req = array_shift($this->wait_queue);
  65. $this->doQuery($req['fd'], $req['sql']);
  66. }
  67. }
  68. }
  69. function onReceive($serv, $fd, $reactor_id, $data)
  70. {
  71. echo "Received: $data\n";
  72. //没有空闲的数据库连接
  73. if (count($this->idle_pool) == 0) {
  74. //等待队列未满
  75. if (count($this->wait_queue) < $this->wait_queue_max) {
  76. $this->wait_queue[] = array(
  77. 'fd' => $fd,
  78. 'sql' => $data,
  79. );
  80. } else {
  81. $this->serv->send($fd, "request too many, Please try again later.");
  82. }
  83. } else {
  84. $this->doQuery($fd, $data);
  85. }
  86. }
  87. function doQuery($fd, $sql)
  88. {
  89. //从空闲池中移除
  90. $db = array_pop($this->idle_pool);
  91. /**
  92. * @var mysqli
  93. */
  94. $mysqli = $db['mysqli'];
  95. for ($i = 0; $i < 2; $i++) {
  96. $result = $mysqli->query($sql, MYSQLI_ASYNC);
  97. if ($result === false) {
  98. if ($mysqli->errno == 2013 or $mysqli->errno == 2006) {
  99. $mysqli->close();
  100. $r = $mysqli->connect();
  101. if ($r === true) continue;
  102. }
  103. }
  104. break;
  105. }
  106. $db['fd'] = $fd;
  107. //加入工作池中
  108. $this->busy_pool[$db['db_sock']] = $db;
  109. }
  110. }
  111. $server = new DBServer();
  112. $server->run();