|
@@ -0,0 +1,146 @@
|
|
|
+<?php
|
|
|
+/**
|
|
|
+ * Created by PhpStorm.
|
|
|
+ * User: stanley-king
|
|
|
+ * Date: 2016/10/5
|
|
|
+ * Time: 下午11:39
|
|
|
+ */
|
|
|
+
|
|
|
+namespace search;
|
|
|
+
|
|
|
+function ev_accept($socket, $flag, $base)
|
|
|
+{
|
|
|
+ CentraHelper::instance()->ev_accept($socket,$flag,$base);
|
|
|
+}
|
|
|
+function ev_read($buffer, $id)
|
|
|
+{
|
|
|
+ CentraHelper::instance()->ev_read($buffer,$id);
|
|
|
+}
|
|
|
+
|
|
|
+function ev_error($buffer, $error, $id)
|
|
|
+{
|
|
|
+ CentraHelper::instance()->ev_error($buffer,$error,$id);
|
|
|
+}
|
|
|
+
|
|
|
+interface IProcessor
|
|
|
+{
|
|
|
+ public function handle_input($body);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+class CentraHelper
|
|
|
+{
|
|
|
+ const body_len = 10;
|
|
|
+
|
|
|
+ private $socket;
|
|
|
+ private $ev_base;
|
|
|
+ private $ev;
|
|
|
+ private $connections;
|
|
|
+ private $buffers;
|
|
|
+ private $contents;
|
|
|
+ private $connect_id;
|
|
|
+
|
|
|
+ private $processor;
|
|
|
+ private static $stInstance;
|
|
|
+
|
|
|
+ public static function instance()
|
|
|
+ {
|
|
|
+ if(self::$stInstance == null) {
|
|
|
+ self::$stInstance = new CentraHelper();
|
|
|
+ }
|
|
|
+ return self::$stInstance;
|
|
|
+ }
|
|
|
+
|
|
|
+ private function __construct()
|
|
|
+ {
|
|
|
+ $this->connect_id = 0;
|
|
|
+ $this->connections = array();
|
|
|
+ $this->buffers = array();
|
|
|
+ }
|
|
|
+ public function init(IProcessor $processor)
|
|
|
+ {
|
|
|
+ $this->processor = $processor;
|
|
|
+ }
|
|
|
+
|
|
|
+ public function run_loop()
|
|
|
+ {
|
|
|
+ $this->socket = stream_socket_server ('tcp://0.0.0.0:2000', $errno, $errstr);
|
|
|
+ stream_set_blocking($this->socket, 0);
|
|
|
+ $this->ev_base = event_base_new();
|
|
|
+ $this->ev = event_new();
|
|
|
+ event_set($this->ev, $this->socket, EV_READ | EV_PERSIST, 'ev_accept', $this->ev_base);
|
|
|
+ event_base_set($this->ev, $this->ev_base);
|
|
|
+ event_add($this->ev);
|
|
|
+ event_base_loop($this->ev_base);
|
|
|
+ }
|
|
|
+
|
|
|
+ public function ev_accept($socket, $flag, $base)
|
|
|
+ {
|
|
|
+ $this->connect_id += 1;
|
|
|
+ $connection = stream_socket_accept($socket);
|
|
|
+ stream_set_blocking($connection, 0);
|
|
|
+ $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $this->connect_id);
|
|
|
+
|
|
|
+ event_buffer_base_set($buffer, $base);
|
|
|
+ event_buffer_timeout_set($buffer, 30, 30);
|
|
|
+ event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
|
|
|
+ event_buffer_priority_set($buffer, 10);
|
|
|
+ event_buffer_enable($buffer, EV_READ | EV_PERSIST);
|
|
|
+
|
|
|
+ // we need to save both buffer and connection outside
|
|
|
+ $this->connections[$this->connect_id] = $connection;
|
|
|
+ $this->buffers[$this->connect_id] = $buffer;
|
|
|
+ $this->contents[$this->connect_id] = "";
|
|
|
+ }
|
|
|
+
|
|
|
+ public function ev_read($buffer, $id)
|
|
|
+ {
|
|
|
+ $content = &$this->contents[$id];
|
|
|
+ while ($read = event_buffer_read($buffer, 256)) {
|
|
|
+ $content .= $read;
|
|
|
+ }
|
|
|
+
|
|
|
+ $start = 0;
|
|
|
+ $left = strlen($content);
|
|
|
+ do
|
|
|
+ {
|
|
|
+ if($left > self::body_len)
|
|
|
+ {
|
|
|
+ $len = substr($content,$start,self::body_len);
|
|
|
+ $len = intval($len);
|
|
|
+ if($left >= self::body_len + $len)
|
|
|
+ {
|
|
|
+ $body = substr($content,$start + self::body_len,$len);
|
|
|
+ if($this->processor != null) {
|
|
|
+ $data = $this->processor->handle_input($body);
|
|
|
+ $header = sprintf("%010d",strlen($data));
|
|
|
+ $data = $header . $data;
|
|
|
+ event_buffer_write($buffer,$data,strlen($data));
|
|
|
+ }
|
|
|
+ $start += self::body_len + $len;
|
|
|
+ $left = $left - self::body_len - $len;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } while ($left > 0);
|
|
|
+
|
|
|
+ if($start > 0) {
|
|
|
+ $content = substr($content,$start);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public function ev_error($buffer, $error, $id)
|
|
|
+ {
|
|
|
+ event_buffer_disable($this->buffers[$id], EV_READ | EV_WRITE);
|
|
|
+ event_buffer_free($this->buffers[$id]);
|
|
|
+ fclose($this->connections[$id]);
|
|
|
+ unset($this->connections[$id]);
|
|
|
+ unset($this->buffers[$id]);
|
|
|
+ unset($this->contents[$id]);
|
|
|
+ }
|
|
|
+}
|