server.php 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: stanley-king
  5. * Date: 2016/10/5
  6. * Time: 下午11:39
  7. */
  8. namespace search;
  9. use Log;
  10. interface IProcessor
  11. {
  12. public function handle_input($body);
  13. }
  14. class CentraHelper
  15. {
  16. const body_header_len = 10;
  17. const read_time_out = 600;
  18. const write_time_out = 5;
  19. private $mListenSocket;
  20. private $mEvbase;
  21. private $mEv;
  22. private $mStreams;
  23. private $mBuffers;
  24. private $mContents;
  25. private $mProcessor;
  26. private static $stInstance;
  27. public static function instance()
  28. {
  29. if(self::$stInstance == null) {
  30. self::$stInstance = new CentraHelper();
  31. }
  32. return self::$stInstance;
  33. }
  34. private function __construct()
  35. {
  36. $this->mStreams = array();
  37. $this->mBuffers = array();
  38. $this->mContents = [];
  39. }
  40. public function init(IProcessor $processor)
  41. {
  42. $this->mProcessor = $processor;
  43. }
  44. public function run_loop($sockfd)
  45. {
  46. $this->mListenSocket = $sockfd;
  47. Log::record("stream_set_blocking",Log::DEBUG);
  48. if(stream_set_blocking($this->mListenSocket, 0) == false) {
  49. Log::record("stream_set_blocking error",Log::DEBUG);
  50. }
  51. $this->mEvbase = event_base_new();
  52. $this->mEv = event_new();
  53. Log::record("event_set",Log::DEBUG);
  54. if(event_set($this->mEv, $this->mListenSocket, EV_READ | EV_PERSIST, 'ev_accept', $this->mEvbase) == false) {
  55. Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
  56. }
  57. Log::record("event_base_set",Log::DEBUG);
  58. if(event_base_set($this->mEv, $this->mEvbase) == false) {
  59. Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
  60. }
  61. Log::record("event_add",Log::DEBUG);
  62. if(event_add($this->mEv) == false) {
  63. Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
  64. }
  65. $ret = event_base_loop($this->mEvbase);
  66. Log::record("event_base_loop ret={$ret}",Log::DEBUG);
  67. }
  68. public function ev_accept($socket, $flag, $base)
  69. {
  70. $pid = posix_getpid();
  71. Log::record("ev_accept pid={$pid} socket_fd={$socket}",Log::DEBUG);
  72. static $bufid = 1;
  73. $stream = stream_socket_accept($socket);
  74. if($stream == false) {
  75. Log::record("stream_socket_accept return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
  76. return;
  77. }
  78. Log::record("stream_socket_accept pid={$pid} stream={$stream}",Log::DEBUG);
  79. stream_set_blocking($stream, 0);
  80. $buffer = event_buffer_new($stream, 'ev_read', NULL, 'ev_error', $bufid);
  81. if($buffer == false) {
  82. fclose($stream);
  83. Log::record("event_buffer_new return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
  84. return;
  85. }
  86. event_buffer_base_set($buffer, $base);
  87. event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
  88. event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
  89. event_buffer_priority_set($buffer, 10);
  90. event_buffer_enable($buffer, EV_READ | EV_PERSIST);
  91. $this->mStreams[$bufid] = $stream;
  92. $this->mBuffers[$bufid] = $buffer;
  93. $this->mContents[$bufid] = "";
  94. $bufid++;
  95. if($bufid < 0) $bufid = 1;
  96. }
  97. public function ev_read($buffer, $bufid)
  98. {
  99. $pid = posix_getpid();
  100. Log::record("ev_read begin pid={$pid}",Log::DEBUG);
  101. $content = &$this->mContents[$bufid];
  102. while (true)
  103. {
  104. $read = event_buffer_read($buffer, 256);
  105. if(empty($read)) {
  106. break;
  107. } else {
  108. $content .= $read;
  109. }
  110. }
  111. $start = 0;
  112. $left = strlen($content);
  113. do
  114. {
  115. if($left > self::body_header_len)
  116. {
  117. $len = substr($content,$start,self::body_header_len);
  118. $len = intval($len);
  119. if($left >= self::body_header_len + $len)
  120. {
  121. $body = substr($content,$start + self::body_header_len,$len);
  122. if($this->mProcessor != null)
  123. {
  124. $data = $this->mProcessor->handle_input($body);
  125. $header = sprintf("%010d",strlen($data));
  126. $data = $header . $data;
  127. $ret = event_buffer_write($buffer,$data,strlen($data));
  128. if($ret == false) break;
  129. }
  130. $start += self::body_header_len + $len;
  131. $left = $left - self::body_header_len - $len;
  132. }
  133. else {
  134. break;
  135. }
  136. }
  137. else {
  138. break;
  139. }
  140. } while ($left > 0);
  141. if($start > 0) {
  142. $content = substr($content,$start);
  143. }
  144. Log::record("ev_read end pid={$pid}",Log::DEBUG);
  145. }
  146. public function ev_error($buffer, $error, $bufid)
  147. {
  148. $error = socket_strerror($error);
  149. Log::record("ev_error id={$bufid} error={$error}",Log::DEBUG);
  150. event_buffer_disable($buffer, EV_READ | EV_WRITE);
  151. event_buffer_free($buffer);
  152. if(array_key_exists($bufid,$this->mStreams)) {
  153. stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
  154. fclose($this->mStreams[$bufid]);
  155. unset($this->mStreams[$bufid]);
  156. }
  157. unset($this->mBuffers[$bufid]);
  158. unset($this->mContents[$bufid]);
  159. }
  160. private function remote_addr()
  161. {
  162. global $config;
  163. $host = $config['searcher']['host'];
  164. $port = $config['searcher']['port'];
  165. return "{$host}:{$port}";
  166. }
  167. }