123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- <?php
- /**
- * Created by PhpStorm.
- * User: stanley-king
- * Date: 2016/10/5
- * Time: 下午11:39
- */
- namespace search;
- interface IProcessor
- {
- public function handle_input($body);
- }
- class CentraHelper
- {
- const body_header_len = 10;
- const time_out = 3600;
- private $socket;
- private $ev_base;
- private $ev;
- private $connections;
- private $buffers;
- private $contents;
- private $connect_id;
- private $processor;
- private static $stInstance;
- public static function instance()
- {
- if(self::$stInstance == null) {
- self::$stInstance = new CentraHelper();
- }
- return self::$stInstance;
- }
- private function __construct()
- {
- $this->connect_id = 0;
- $this->connections = array();
- $this->buffers = array();
- }
- public function init(IProcessor $processor)
- {
- $this->processor = $processor;
- }
- public function run_loop($sockfd)
- {
- $this->socket = $sockfd;
- //$this->socket = stream_socket_server ($this->remote_addr(), $errno, $errstr);
- stream_set_blocking($this->socket, 0);
- $this->ev_base = event_base_new();
- $this->ev = event_new();
- event_set($this->ev, $this->socket, EV_READ | EV_PERSIST, 'ev_accept', $this->ev_base);
- event_base_set($this->ev, $this->ev_base);
- event_add($this->ev);
- event_base_loop($this->ev_base);
- }
- public function ev_accept($socket, $flag, $base)
- {
- $this->connect_id += 1;
- $connection = stream_socket_accept($socket);
- if($connection == false) return;
- stream_set_blocking($connection, 0);
- $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $this->connect_id);
- if($buffer == false) {
- fclose($connection);
- return;
- }
- event_buffer_base_set($buffer, $base);
- event_buffer_timeout_set($buffer, self::time_out, self::time_out);
- event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
- event_buffer_priority_set($buffer, 10);
- event_buffer_enable($buffer, EV_READ | EV_PERSIST);
- // we need to save both buffer and connection outside
- $this->connections[$this->connect_id] = $connection;
- $this->buffers[$this->connect_id] = $buffer;
- $this->contents[$this->connect_id] = "";
- }
- public function ev_read($buffer, $id)
- {
- $content = &$this->contents[$id];
- while ($read = event_buffer_read($buffer, 256)) {
- $content .= $read;
- }
- $start = 0;
- $left = strlen($content);
- do
- {
- if($left > self::body_header_len)
- {
- $len = substr($content,$start,self::body_header_len);
- $len = intval($len);
- if($left >= self::body_header_len + $len)
- {
- $body = substr($content,$start + self::body_header_len,$len);
- if($this->processor != null) {
- $data = $this->processor->handle_input($body);
- $header = sprintf("%010d",strlen($data));
- $data = $header . $data;
- event_buffer_write($buffer,$data,strlen($data));
- }
- $start += self::body_header_len + $len;
- $left = $left - self::body_header_len - $len;
- }
- else {
- break;
- }
- }
- else {
- break;
- }
- } while ($left > 0);
- if($start > 0) {
- $content = substr($content,$start);
- }
- }
- public function ev_error($buffer, $error, $id)
- {
- event_buffer_disable($this->buffers[$id], EV_READ | EV_WRITE);
- event_buffer_free($this->buffers[$id]);
- fclose($this->connections[$id]);
- unset($this->connections[$id]);
- unset($this->buffers[$id]);
- unset($this->contents[$id]);
- }
- private function remote_addr()
- {
- global $config;
- $host = $config['searcher']['host'];
- $port = $config['searcher']['port'];
- return "{$host}:{$port}";
- }
- }
|