123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- <?php
- /**
- * Created by PhpStorm.
- * User: stanley-king
- * Date: 2016/10/5
- * Time: 下午11:39
- */
- namespace search;
- use Log;
- interface IProcessor
- {
- public function handle_input($body);
- }
- class CentraHelper
- {
- const body_header_len = 10;
- const read_time_out = 600;
- const write_time_out = 5;
- private $mListenSocket;
- private $mEvbase;
- private $mEv;
- private $mStreams;
- private $mBuffers;
- private $mContents;
- private $mProcessor;
- private static $stInstance;
- public static function instance()
- {
- if(self::$stInstance == null) {
- self::$stInstance = new CentraHelper();
- }
- return self::$stInstance;
- }
- private function __construct()
- {
- $this->mStreams = array();
- $this->mBuffers = array();
- $this->mContents = [];
- }
- public function init(IProcessor $processor)
- {
- $this->mProcessor = $processor;
- }
- public function run_loop($sockfd)
- {
- $this->mListenSocket = $sockfd;
- Log::record("stream_set_blocking",Log::DEBUG);
- if(stream_set_blocking($this->mListenSocket, 0) == false) {
- Log::record("stream_set_blocking error",Log::DEBUG);
- }
- $this->mEvbase = event_base_new();
- $this->mEv = event_new();
- Log::record("event_set",Log::DEBUG);
- if(event_set($this->mEv, $this->mListenSocket, EV_READ | EV_PERSIST, 'ev_accept', $this->mEvbase) == false) {
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- Log::record("event_base_set",Log::DEBUG);
- if(event_base_set($this->mEv, $this->mEvbase) == false) {
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- Log::record("event_add",Log::DEBUG);
- if(event_add($this->mEv) == false) {
- Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- $ret = event_base_loop($this->mEvbase);
- Log::record("event_base_loop ret={$ret}",Log::DEBUG);
- }
- public function ev_accept($socket, $flag, $base)
- {
- $pid = posix_getpid();
- Log::record("ev_accept pid={$pid} socket_fd={$socket}",Log::DEBUG);
- static $bufid = 1;
- $stream = stream_socket_accept($socket);
- if($stream == false) {
- Log::record("stream_socket_accept return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
- return;
- }
- Log::record("stream_socket_accept pid={$pid} stream={$stream}",Log::DEBUG);
- stream_set_blocking($stream, 0);
- $buffer = event_buffer_new($stream, 'ev_read', NULL, 'ev_error', $bufid);
- if($buffer == false) {
- fclose($stream);
- Log::record("event_buffer_new return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
- return;
- }
- event_buffer_base_set($buffer, $base);
- event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
- event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
- event_buffer_priority_set($buffer, 10);
- event_buffer_enable($buffer, EV_READ | EV_PERSIST);
- $this->mStreams[$bufid] = $stream;
- $this->mBuffers[$bufid] = $buffer;
- $this->mContents[$bufid] = "";
- $bufid++;
- if($bufid < 0) $bufid = 1;
- }
- public function ev_read($buffer, $bufid)
- {
- $pid = posix_getpid();
- Log::record("ev_read begin pid={$pid}",Log::DEBUG);
- $content = &$this->mContents[$bufid];
- while (true)
- {
- $read = event_buffer_read($buffer, 256);
- if(empty($read)) {
- break;
- } else {
- $content .= $read;
- }
- }
- $start = 0;
- $left = strlen($content);
- do
- {
- if($left > self::body_header_len)
- {
- $len = substr($content,$start,self::body_header_len);
- $len = intval($len);
- if($left >= self::body_header_len + $len)
- {
- $body = substr($content,$start + self::body_header_len,$len);
- if($this->mProcessor != null)
- {
- $data = $this->mProcessor->handle_input($body);
- $header = sprintf("%010d",strlen($data));
- $data = $header . $data;
- $ret = event_buffer_write($buffer,$data,strlen($data));
- if($ret == false) break;
- }
- $start += self::body_header_len + $len;
- $left = $left - self::body_header_len - $len;
- }
- else {
- break;
- }
- }
- else {
- break;
- }
- } while ($left > 0);
- if($start > 0) {
- $content = substr($content,$start);
- }
- Log::record("ev_read end pid={$pid}",Log::DEBUG);
- }
- public function ev_error($buffer, $error, $bufid)
- {
- $error = socket_strerror($error);
- Log::record("ev_error id={$bufid} error={$error}",Log::DEBUG);
- event_buffer_disable($buffer, EV_READ | EV_WRITE);
- event_buffer_free($buffer);
- if(array_key_exists($bufid,$this->mStreams)) {
- stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
- fclose($this->mStreams[$bufid]);
- unset($this->mStreams[$bufid]);
- }
- unset($this->mBuffers[$bufid]);
- unset($this->mContents[$bufid]);
- }
- private function remote_addr()
- {
- global $config;
- $host = $config['searcher']['host'];
- $port = $config['searcher']['port'];
- return "{$host}:{$port}";
- }
- }
|