|
@@ -7,31 +7,10 @@
|
|
|
*/
|
|
|
|
|
|
namespace search;
|
|
|
-use Log;
|
|
|
|
|
|
-interface IProcessor
|
|
|
+class CentraHelper extends srv_base
|
|
|
{
|
|
|
- public function handle_input($body);
|
|
|
-}
|
|
|
-
|
|
|
-class CentraHelper
|
|
|
-{
|
|
|
- const body_header_len = 10;
|
|
|
- const read_time_out = 600;
|
|
|
- const write_time_out = 5;
|
|
|
-
|
|
|
- private $mListenSocket;
|
|
|
- private $mEvbase;
|
|
|
- private $mEv;
|
|
|
-
|
|
|
- private $mStreams;
|
|
|
- private $mBuffers;
|
|
|
- private $mContents;
|
|
|
-
|
|
|
- private $mProcessor;
|
|
|
-
|
|
|
private static $stInstance;
|
|
|
-
|
|
|
public static function instance()
|
|
|
{
|
|
|
if(self::$stInstance == null) {
|
|
@@ -40,162 +19,8 @@ class CentraHelper
|
|
|
return self::$stInstance;
|
|
|
}
|
|
|
|
|
|
- private function __construct()
|
|
|
+ protected function __construct()
|
|
|
{
|
|
|
- $this->mStreams = array();
|
|
|
- $this->mBuffers = array();
|
|
|
- $this->mContents = [];
|
|
|
- }
|
|
|
- public function init(IProcessor $processor)
|
|
|
- {
|
|
|
- $this->mProcessor = $processor;
|
|
|
- }
|
|
|
-
|
|
|
- public function run_loop($sockfd)
|
|
|
- {
|
|
|
- $this->mListenSocket = $sockfd;
|
|
|
- Log::record("stream_set_blocking",Log::DEBUG);
|
|
|
- if(stream_set_blocking($this->mListenSocket, 0) == false) {
|
|
|
- Log::record("stream_set_blocking error",Log::DEBUG);
|
|
|
- }
|
|
|
- $this->mEvbase = event_base_new();
|
|
|
- $this->mEv = event_new();
|
|
|
-
|
|
|
- Log::record("event_set",Log::DEBUG);
|
|
|
- if(event_set($this->mEv, $this->mListenSocket, EV_READ | EV_PERSIST, 'ev_accept', $this->mEvbase) == false) {
|
|
|
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
- }
|
|
|
-
|
|
|
- Log::record("event_base_set",Log::DEBUG);
|
|
|
- if(event_base_set($this->mEv, $this->mEvbase) == false) {
|
|
|
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
- }
|
|
|
-
|
|
|
- Log::record("event_add",Log::DEBUG);
|
|
|
- if(event_add($this->mEv) == false) {
|
|
|
- Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
- }
|
|
|
-
|
|
|
- $ret = event_base_loop($this->mEvbase);
|
|
|
- Log::record("event_base_loop ret={$ret}",Log::DEBUG);
|
|
|
- }
|
|
|
-
|
|
|
- public function ev_accept($socket, $flag, $base)
|
|
|
- {
|
|
|
- $pid = posix_getpid();
|
|
|
- Log::record("ev_accept pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
-
|
|
|
- static $bufid = 1;
|
|
|
-
|
|
|
- $stream = stream_socket_accept($socket);
|
|
|
- if($stream == false) {
|
|
|
- Log::record("stream_socket_accept return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
- return;
|
|
|
- }
|
|
|
- Log::record("stream_socket_accept pid={$pid} stream={$stream}",Log::DEBUG);
|
|
|
-
|
|
|
- stream_set_blocking($stream, 0);
|
|
|
- $buffer = event_buffer_new($stream, 'ev_read', NULL, 'ev_error', $bufid);
|
|
|
- if($buffer == false) {
|
|
|
- fclose($stream);
|
|
|
- Log::record("event_buffer_new return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- event_buffer_base_set($buffer, $base);
|
|
|
- event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
|
|
|
- event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
|
|
|
- event_buffer_priority_set($buffer, 10);
|
|
|
- event_buffer_enable($buffer, EV_READ | EV_PERSIST);
|
|
|
-
|
|
|
- $this->mStreams[$bufid] = $stream;
|
|
|
- $this->mBuffers[$bufid] = $buffer;
|
|
|
- $this->mContents[$bufid] = "";
|
|
|
-
|
|
|
- $bufid++;
|
|
|
- if($bufid < 0) $bufid = 1;
|
|
|
- }
|
|
|
-
|
|
|
- public function ev_read($buffer, $bufid)
|
|
|
- {
|
|
|
- $pid = posix_getpid();
|
|
|
- Log::record("ev_read begin pid={$pid}",Log::DEBUG);
|
|
|
-
|
|
|
- $content = &$this->mContents[$bufid];
|
|
|
- while (true)
|
|
|
- {
|
|
|
- $read = event_buffer_read($buffer, 256);
|
|
|
- if(empty($read)) {
|
|
|
- break;
|
|
|
- } else {
|
|
|
- $content .= $read;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- $start = 0;
|
|
|
- $left = strlen($content);
|
|
|
- do
|
|
|
- {
|
|
|
- if($left > self::body_header_len)
|
|
|
- {
|
|
|
- $len = substr($content,$start,self::body_header_len);
|
|
|
- $len = intval($len);
|
|
|
-
|
|
|
- if($left >= self::body_header_len + $len)
|
|
|
- {
|
|
|
- $body = substr($content,$start + self::body_header_len,$len);
|
|
|
- if($this->mProcessor != null)
|
|
|
- {
|
|
|
- $data = $this->mProcessor->handle_input($body);
|
|
|
- $header = sprintf("%010d",strlen($data));
|
|
|
- $data = $header . $data;
|
|
|
- $ret = event_buffer_write($buffer,$data,strlen($data));
|
|
|
- if($ret == false) break;
|
|
|
- }
|
|
|
-
|
|
|
- $start += self::body_header_len + $len;
|
|
|
- $left = $left - self::body_header_len - $len;
|
|
|
- }
|
|
|
- else {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- break;
|
|
|
- }
|
|
|
- } while ($left > 0);
|
|
|
-
|
|
|
- if($start > 0) {
|
|
|
- $content = substr($content,$start);
|
|
|
- }
|
|
|
- Log::record("ev_read end pid={$pid}",Log::DEBUG);
|
|
|
- }
|
|
|
-
|
|
|
- public function ev_error($buffer, $error, $bufid)
|
|
|
- {
|
|
|
- $error = socket_strerror($error);
|
|
|
- Log::record("ev_error id={$bufid} error={$error}",Log::DEBUG);
|
|
|
-
|
|
|
- event_buffer_disable($buffer, EV_READ | EV_WRITE);
|
|
|
- event_buffer_free($buffer);
|
|
|
-
|
|
|
- if(array_key_exists($bufid,$this->mStreams)) {
|
|
|
- stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
- fclose($this->mStreams[$bufid]);
|
|
|
- unset($this->mStreams[$bufid]);
|
|
|
- }
|
|
|
-
|
|
|
- unset($this->mBuffers[$bufid]);
|
|
|
- unset($this->mContents[$bufid]);
|
|
|
- }
|
|
|
-
|
|
|
- private function remote_addr()
|
|
|
- {
|
|
|
- global $config;
|
|
|
-
|
|
|
- $host = $config['searcher']['host'];
|
|
|
- $port = $config['searcher']['port'];
|
|
|
-
|
|
|
- return "{$host}:{$port}";
|
|
|
+ parent::__construct();
|
|
|
}
|
|
|
}
|