server.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: stanley-king
  5. * Date: 2016/10/5
  6. * Time: 下午11:39
  7. */
  8. namespace search;
  9. interface IProcessor
  10. {
  11. public function handle_input($body);
  12. }
  13. class CentraHelper
  14. {
  15. const body_header_len = 10;
  16. const time_out = 3600;
  17. private $socket;
  18. private $ev_base;
  19. private $ev;
  20. private $connections;
  21. private $buffers;
  22. private $contents;
  23. private $connect_id;
  24. private $processor;
  25. private static $stInstance;
  26. public static function instance()
  27. {
  28. if(self::$stInstance == null) {
  29. self::$stInstance = new CentraHelper();
  30. }
  31. return self::$stInstance;
  32. }
  33. private function __construct()
  34. {
  35. $this->connect_id = 0;
  36. $this->connections = array();
  37. $this->buffers = array();
  38. }
  39. public function init(IProcessor $processor)
  40. {
  41. $this->processor = $processor;
  42. }
  43. public function run_loop($sockfd)
  44. {
  45. $this->socket = $sockfd;
  46. //$this->socket = stream_socket_server ($this->remote_addr(), $errno, $errstr);
  47. stream_set_blocking($this->socket, 0);
  48. $this->ev_base = event_base_new();
  49. $this->ev = event_new();
  50. event_set($this->ev, $this->socket, EV_READ | EV_PERSIST, 'ev_accept', $this->ev_base);
  51. event_base_set($this->ev, $this->ev_base);
  52. event_add($this->ev);
  53. event_base_loop($this->ev_base);
  54. }
  55. public function ev_accept($socket, $flag, $base)
  56. {
  57. $this->connect_id += 1;
  58. $connection = stream_socket_accept($socket);
  59. if($connection == false) return;
  60. stream_set_blocking($connection, 0);
  61. $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $this->connect_id);
  62. if($buffer == false) {
  63. fclose($connection);
  64. return;
  65. }
  66. event_buffer_base_set($buffer, $base);
  67. event_buffer_timeout_set($buffer, self::time_out, self::time_out);
  68. event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
  69. event_buffer_priority_set($buffer, 10);
  70. event_buffer_enable($buffer, EV_READ | EV_PERSIST);
  71. // we need to save both buffer and connection outside
  72. $this->connections[$this->connect_id] = $connection;
  73. $this->buffers[$this->connect_id] = $buffer;
  74. $this->contents[$this->connect_id] = "";
  75. }
  76. public function ev_read($buffer, $id)
  77. {
  78. $content = &$this->contents[$id];
  79. while ($read = event_buffer_read($buffer, 256)) {
  80. $content .= $read;
  81. }
  82. $start = 0;
  83. $left = strlen($content);
  84. do
  85. {
  86. if($left > self::body_header_len)
  87. {
  88. $len = substr($content,$start,self::body_header_len);
  89. $len = intval($len);
  90. if($left >= self::body_header_len + $len)
  91. {
  92. $body = substr($content,$start + self::body_header_len,$len);
  93. if($this->processor != null) {
  94. $data = $this->processor->handle_input($body);
  95. $header = sprintf("%010d",strlen($data));
  96. $data = $header . $data;
  97. event_buffer_write($buffer,$data,strlen($data));
  98. }
  99. $start += self::body_header_len + $len;
  100. $left = $left - self::body_header_len - $len;
  101. }
  102. else {
  103. break;
  104. }
  105. }
  106. else {
  107. break;
  108. }
  109. } while ($left > 0);
  110. if($start > 0) {
  111. $content = substr($content,$start);
  112. }
  113. }
  114. public function ev_error($buffer, $error, $id)
  115. {
  116. event_buffer_disable($this->buffers[$id], EV_READ | EV_WRITE);
  117. event_buffer_free($this->buffers[$id]);
  118. fclose($this->connections[$id]);
  119. unset($this->connections[$id]);
  120. unset($this->buffers[$id]);
  121. unset($this->contents[$id]);
  122. }
  123. private function remote_addr()
  124. {
  125. global $config;
  126. $host = $config['searcher']['host'];
  127. $port = $config['searcher']['port'];
  128. return "{$host}:{$port}";
  129. }
  130. }