connect_id = 0; $this->connections = array(); $this->buffers = array(); } public function init(IProcessor $processor) { $this->processor = $processor; } public function run_loop($sockfd) { $this->socket = $sockfd; //$this->socket = stream_socket_server ($this->remote_addr(), $errno, $errstr); stream_set_blocking($this->socket, 0); $this->ev_base = event_base_new(); $this->ev = event_new(); event_set($this->ev, $this->socket, EV_READ | EV_PERSIST, 'ev_accept', $this->ev_base); event_base_set($this->ev, $this->ev_base); event_add($this->ev); event_base_loop($this->ev_base); } public function ev_accept($socket, $flag, $base) { $this->connect_id += 1; $connection = stream_socket_accept($socket); if($connection == false) return; stream_set_blocking($connection, 0); $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $this->connect_id); if($buffer == false) { fclose($connection); return; } event_buffer_base_set($buffer, $base); event_buffer_timeout_set($buffer, self::time_out, self::time_out); event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); event_buffer_priority_set($buffer, 10); event_buffer_enable($buffer, EV_READ | EV_PERSIST); // we need to save both buffer and connection outside $this->connections[$this->connect_id] = $connection; $this->buffers[$this->connect_id] = $buffer; $this->contents[$this->connect_id] = ""; } public function ev_read($buffer, $id) { $content = &$this->contents[$id]; while ($read = event_buffer_read($buffer, 256)) { $content .= $read; } $start = 0; $left = strlen($content); do { if($left > self::body_header_len) { $len = substr($content,$start,self::body_header_len); $len = intval($len); if($left >= self::body_header_len + $len) { $body = substr($content,$start + self::body_header_len,$len); if($this->processor != null) { $data = $this->processor->handle_input($body); $header = sprintf("%010d",strlen($data)); $data = $header . $data; event_buffer_write($buffer,$data,strlen($data)); } $start += self::body_header_len + $len; $left = $left - self::body_header_len - $len; } else { break; } } else { break; } } while ($left > 0); if($start > 0) { $content = substr($content,$start); } } public function ev_error($buffer, $error, $id) { event_buffer_disable($this->buffers[$id], EV_READ | EV_WRITE); event_buffer_free($this->buffers[$id]); fclose($this->connections[$id]); unset($this->connections[$id]); unset($this->buffers[$id]); unset($this->contents[$id]); } private function remote_addr() { global $config; $host = $config['searcher']['host']; $port = $config['searcher']['port']; return "{$host}:{$port}"; } }