|
@@ -8,29 +8,15 @@
|
|
|
|
|
|
namespace search;
|
|
|
|
|
|
-function ev_accept($socket, $flag, $base)
|
|
|
-{
|
|
|
- CentraHelper::instance()->ev_accept($socket,$flag,$base);
|
|
|
-}
|
|
|
-function ev_read($buffer, $id)
|
|
|
-{
|
|
|
- CentraHelper::instance()->ev_read($buffer,$id);
|
|
|
-}
|
|
|
-
|
|
|
-function ev_error($buffer, $error, $id)
|
|
|
-{
|
|
|
- CentraHelper::instance()->ev_error($buffer,$error,$id);
|
|
|
-}
|
|
|
-
|
|
|
interface IProcessor
|
|
|
{
|
|
|
public function handle_input($body);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
class CentraHelper
|
|
|
{
|
|
|
- const body_len = 10;
|
|
|
+ const body_header_len = 10;
|
|
|
+ const time_out = 3600;
|
|
|
|
|
|
private $socket;
|
|
|
private $ev_base;
|
|
@@ -62,9 +48,10 @@ class CentraHelper
|
|
|
$this->processor = $processor;
|
|
|
}
|
|
|
|
|
|
- public function run_loop()
|
|
|
+ public function run_loop($sockfd)
|
|
|
{
|
|
|
- $this->socket = stream_socket_server ('tcp://0.0.0.0:2000', $errno, $errstr);
|
|
|
+ $this->socket = $sockfd;
|
|
|
+ //$this->socket = stream_socket_server ($this->remote_addr(), $errno, $errstr);
|
|
|
stream_set_blocking($this->socket, 0);
|
|
|
$this->ev_base = event_base_new();
|
|
|
$this->ev = event_new();
|
|
@@ -82,7 +69,7 @@ class CentraHelper
|
|
|
$buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $this->connect_id);
|
|
|
|
|
|
event_buffer_base_set($buffer, $base);
|
|
|
- event_buffer_timeout_set($buffer, 30, 30);
|
|
|
+ event_buffer_timeout_set($buffer, self::time_out, self::time_out);
|
|
|
event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
|
|
|
event_buffer_priority_set($buffer, 10);
|
|
|
event_buffer_enable($buffer, EV_READ | EV_PERSIST);
|
|
@@ -104,21 +91,21 @@ class CentraHelper
|
|
|
$left = strlen($content);
|
|
|
do
|
|
|
{
|
|
|
- if($left > self::body_len)
|
|
|
+ if($left > self::body_header_len)
|
|
|
{
|
|
|
- $len = substr($content,$start,self::body_len);
|
|
|
+ $len = substr($content,$start,self::body_header_len);
|
|
|
$len = intval($len);
|
|
|
- if($left >= self::body_len + $len)
|
|
|
+ if($left >= self::body_header_len + $len)
|
|
|
{
|
|
|
- $body = substr($content,$start + self::body_len,$len);
|
|
|
+ $body = substr($content,$start + self::body_header_len,$len);
|
|
|
if($this->processor != null) {
|
|
|
$data = $this->processor->handle_input($body);
|
|
|
$header = sprintf("%010d",strlen($data));
|
|
|
$data = $header . $data;
|
|
|
event_buffer_write($buffer,$data,strlen($data));
|
|
|
}
|
|
|
- $start += self::body_len + $len;
|
|
|
- $left = $left - self::body_len - $len;
|
|
|
+ $start += self::body_header_len + $len;
|
|
|
+ $left = $left - self::body_header_len - $len;
|
|
|
}
|
|
|
else {
|
|
|
break;
|
|
@@ -143,4 +130,14 @@ class CentraHelper
|
|
|
unset($this->buffers[$id]);
|
|
|
unset($this->contents[$id]);
|
|
|
}
|
|
|
+
|
|
|
+ private function remote_addr()
|
|
|
+ {
|
|
|
+ global $config;
|
|
|
+
|
|
|
+ $host = $config['searcher']['host'];
|
|
|
+ $port = $config['searcher']['port'];
|
|
|
+
|
|
|
+ return "{$host}:{$port}";
|
|
|
+ }
|
|
|
}
|