mStreams = array(); $this->mBuffers = array(); $this->mContents = []; } public function init(IProcessor $processor) { $this->mProcessor = $processor; } public function run_loop($sockfd) { $this->mListenSocket = $sockfd; Log::record("stream_set_blocking",Log::DEBUG); if(stream_set_blocking($this->mListenSocket, 0) == false) { Log::record("stream_set_blocking error",Log::DEBUG); } $this->mEvbase = event_base_new(); $this->mEv = event_new(); Log::record("event_set",Log::DEBUG); if(event_set($this->mEv, $this->mListenSocket, EV_READ | EV_PERSIST, 'ev_accept', $this->mEvbase) == false) { Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG); } Log::record("event_base_set",Log::DEBUG); if(event_base_set($this->mEv, $this->mEvbase) == false) { Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG); } Log::record("event_add",Log::DEBUG); if(event_add($this->mEv) == false) { Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG); } $ret = event_base_loop($this->mEvbase); Log::record("event_base_loop ret={$ret}",Log::DEBUG); } public function ev_accept($socket, $flag, $base) { $pid = posix_getpid(); Log::record("ev_accept pid={$pid} socket_fd={$socket}",Log::DEBUG); static $bufid = 1; $stream = stream_socket_accept($socket); if($stream == false) { Log::record("stream_socket_accept return false pid={$pid} socket_fd={$socket}",Log::DEBUG); return; } Log::record("stream_socket_accept pid={$pid} stream={$stream}",Log::DEBUG); stream_set_blocking($stream, 0); $buffer = event_buffer_new($stream, 'ev_read', NULL, 'ev_error', $bufid); if($buffer == false) { fclose($stream); Log::record("event_buffer_new return false pid={$pid} socket_fd={$socket}",Log::DEBUG); return; } event_buffer_base_set($buffer, $base); event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out); event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); event_buffer_priority_set($buffer, 10); event_buffer_enable($buffer, EV_READ | EV_PERSIST); $this->mStreams[$bufid] = $stream; $this->mBuffers[$bufid] = $buffer; $this->mContents[$bufid] = ""; $bufid++; if($bufid < 0) $bufid = 1; } public function ev_read($buffer, $bufid) { $pid = posix_getpid(); Log::record("ev_read begin pid={$pid}",Log::DEBUG); $content = &$this->mContents[$bufid]; while (true) { $read = event_buffer_read($buffer, 256); if(empty($read)) { break; } else { $content .= $read; } } $start = 0; $left = strlen($content); do { if($left > self::body_header_len) { $len = substr($content,$start,self::body_header_len); $len = intval($len); if($left >= self::body_header_len + $len) { $body = substr($content,$start + self::body_header_len,$len); if($this->mProcessor != null) { $data = $this->mProcessor->handle_input($body); $header = sprintf("%010d",strlen($data)); $data = $header . $data; $ret = event_buffer_write($buffer,$data,strlen($data)); if($ret == false) break; } $start += self::body_header_len + $len; $left = $left - self::body_header_len - $len; } else { break; } } else { break; } } while ($left > 0); if($start > 0) { $content = substr($content,$start); } Log::record("ev_read end pid={$pid}",Log::DEBUG); } public function ev_error($buffer, $error, $bufid) { $error = socket_strerror($error); Log::record("ev_error id={$bufid} error={$error}",Log::DEBUG); event_buffer_disable($buffer, EV_READ | EV_WRITE); event_buffer_free($buffer); if(array_key_exists($bufid,$this->mStreams)) { stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR); fclose($this->mStreams[$bufid]); unset($this->mStreams[$bufid]); } unset($this->mBuffers[$bufid]); unset($this->mContents[$bufid]); } private function remote_addr() { global $config; $host = $config['searcher']['host']; $port = $config['searcher']['port']; return "{$host}:{$port}"; } }