|
@@ -15,6 +15,7 @@ interface IProcessor
|
|
|
{
|
|
|
public function onRequest($bufid, $body);
|
|
|
public function onClose($bufid);
|
|
|
+ public function onConnected($bufid,$stream,$host,$port,$args);
|
|
|
}
|
|
|
|
|
|
abstract class srv_base
|
|
@@ -23,20 +24,29 @@ abstract class srv_base
|
|
|
const read_time_out = 600;
|
|
|
const write_time_out = 5;
|
|
|
|
|
|
- private $mListenSocket;
|
|
|
- protected $mEvbase;
|
|
|
- protected $mEv;
|
|
|
+ protected $mEvBase;
|
|
|
+ protected $mEvAccepts;
|
|
|
+ protected $mEvSignals;
|
|
|
+ protected $mEvConnects;
|
|
|
|
|
|
protected $mStreams;
|
|
|
protected $mBuffers;
|
|
|
protected $mContents;
|
|
|
protected $mProcessor;
|
|
|
+ protected $mBufferID;
|
|
|
|
|
|
protected function __construct()
|
|
|
{
|
|
|
$this->mStreams = [];
|
|
|
$this->mBuffers = [];
|
|
|
$this->mContents = [];
|
|
|
+ $this->mEvAccepts = [];
|
|
|
+ $this->mEvSignals = [];
|
|
|
+ $this->mEvConnects = [];
|
|
|
+ $this->mEvTimeouts = [];
|
|
|
+
|
|
|
+ $this->mBufferID = 1;
|
|
|
+ $this->mEvBase = event_base_new();
|
|
|
}
|
|
|
|
|
|
public function init(IProcessor $processor)
|
|
@@ -44,71 +54,246 @@ abstract class srv_base
|
|
|
$this->mProcessor = $processor;
|
|
|
}
|
|
|
|
|
|
- public function run_loop($sockfd)
|
|
|
+ public function run_loop($sock)
|
|
|
{
|
|
|
- $this->mListenSocket = $sockfd;
|
|
|
- $this->mEvbase = event_base_new();
|
|
|
- $this->mEv = event_new();
|
|
|
+ $this->add_listen($sock);
|
|
|
+ $this->add_signal(SIGINT);
|
|
|
+ $this->add_signal(SIGQUIT);
|
|
|
+ $this->add_signal(SIGTERM);
|
|
|
+
|
|
|
+ $ret = event_base_loop($this->mEvBase);
|
|
|
+ Log::record("event_base_loop ret={$ret}",Log::DEBUG);
|
|
|
+ }
|
|
|
|
|
|
- Log::record("event_set",Log::DEBUG);
|
|
|
- if(event_set($this->mEv, $this->mListenSocket, EV_READ | EV_PERSIST, 'ev_accept', $this->mEvbase) == false) {
|
|
|
+ private function add_listen($fd)
|
|
|
+ {
|
|
|
+ $fd = intval($fd);
|
|
|
+ if($fd < 0) return false;
|
|
|
+
|
|
|
+ $this->mEvAccepts[$fd] = event_new();
|
|
|
+ if(event_set($this->mEvAccepts[$fd], $fd, EV_READ | EV_PERSIST, [$this, 'onAccept'], $this->mEvBase) == false) {
|
|
|
Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
- Log::record("event_base_set",Log::DEBUG);
|
|
|
- if(event_base_set($this->mEv, $this->mEvbase) == false) {
|
|
|
+ if(event_base_set($this->mEvAccepts[$fd], $this->mEvBase) == false) {
|
|
|
Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
- Log::record("event_add",Log::DEBUG);
|
|
|
- if(event_add($this->mEv) == false) {
|
|
|
+ if(event_add($this->mEvAccepts[$fd]) == false) {
|
|
|
Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- $ret = event_base_loop($this->mEvbase);
|
|
|
- Log::record("event_base_loop ret={$ret}",Log::DEBUG);
|
|
|
+ private function add_signal($val)
|
|
|
+ {
|
|
|
+ $this->mEvSignals[$val] = event_new();
|
|
|
+
|
|
|
+ if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
|
|
|
+ Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
|
|
|
+ Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(event_add($this->mEvSignals[$val]) == false) {
|
|
|
+ Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public function ev_accept($socket, $flag, $base)
|
|
|
+ public function onSignal($sig, $flag)
|
|
|
{
|
|
|
- $pid = posix_getpid();
|
|
|
- Log::record("ev_accept pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
+ Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
|
|
|
+ if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
|
|
|
+ {
|
|
|
+ event_base_loopexit($this->mEvBase);
|
|
|
|
|
|
- static $bufid = 1;
|
|
|
+ foreach ($this->mBuffers as $bufid => $buffer) {
|
|
|
+ event_buffer_disable($buffer, EV_READ | EV_WRITE);
|
|
|
+ event_buffer_free($buffer);
|
|
|
|
|
|
+ @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
+ socket_close($this->mStreams[$bufid]);
|
|
|
+ }
|
|
|
+ $this->mStreams = [];
|
|
|
+ $this->mContents = [];
|
|
|
+ $this->mBuffers = [];
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public function onAccept($socket, $event, $base)
|
|
|
+ {
|
|
|
+ Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
|
|
|
$stream = socket_accept($socket);
|
|
|
if($stream == false) {
|
|
|
- Log::record("stream_socket_accept return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
+ Log::record("stream_socket_accept return false socket_fd={$socket}",Log::DEBUG);
|
|
|
return;
|
|
|
}
|
|
|
- Log::record("stream_socket_accept pid={$pid} stream={$stream}",Log::DEBUG);
|
|
|
+ Log::record("stream_socket_accept stream={$stream}",Log::DEBUG);
|
|
|
+ $this->add_stream($stream,$base);
|
|
|
+ }
|
|
|
|
|
|
+ private function add_stream($stream,$base)
|
|
|
+ {
|
|
|
+ $bufid = $this->mBufferID;
|
|
|
socket_set_nonblock($stream);
|
|
|
- $buffer = event_buffer_new($stream, 'ev_read', NULL, 'ev_error', $bufid);
|
|
|
+ $buffer = event_buffer_new($stream, [$this,'onRead'], NULL, [$this,'onError'], $bufid);
|
|
|
+
|
|
|
if($buffer == false) {
|
|
|
- fclose($stream);
|
|
|
- Log::record("event_buffer_new return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
- return;
|
|
|
+ socket_close($stream);
|
|
|
+ return false;
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ event_buffer_base_set($buffer, $base);
|
|
|
+ event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
|
|
|
+ event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
|
|
|
+ event_buffer_priority_set($buffer, 10);
|
|
|
+ event_buffer_enable($buffer, EV_READ | EV_PERSIST);
|
|
|
+
|
|
|
+ $this->mStreams[$bufid] = $stream;
|
|
|
+ $this->mBuffers[$bufid] = $buffer;
|
|
|
+ $this->mContents[$bufid] = "";
|
|
|
+
|
|
|
+ $this->mBufferID++;
|
|
|
+ if($this->mBufferID < 0) $this->mBufferID = 1;
|
|
|
+
|
|
|
+ return $bufid;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public function block($bufid)
|
|
|
+ {
|
|
|
+ $stream = $this->mStreams[$bufid];
|
|
|
+ $buffer = $this->mBuffers[$bufid];
|
|
|
+
|
|
|
+ $success = event_buffer_disable($buffer, EV_READ | EV_PERSIST);
|
|
|
+ Log::record("event_buffer_disable success={$success}",Log::DEBUG);
|
|
|
+
|
|
|
+ $success = socket_set_block($stream);
|
|
|
+ Log::record("socket_set_block success={$success}",Log::DEBUG);
|
|
|
+ }
|
|
|
|
|
|
- event_buffer_base_set($buffer, $base);
|
|
|
- event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
|
|
|
- event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
|
|
|
- event_buffer_priority_set($buffer, 10);
|
|
|
- event_buffer_enable($buffer, EV_READ | EV_PERSIST);
|
|
|
+ public function unblock($bufid)
|
|
|
+ {
|
|
|
+ $stream = $this->mStreams[$bufid];
|
|
|
+ $buffer = $this->mBuffers[$bufid];
|
|
|
|
|
|
- $this->mStreams[$bufid] = $stream;
|
|
|
- $this->mBuffers[$bufid] = $buffer;
|
|
|
- $this->mContents[$bufid] = "";
|
|
|
+ $success = socket_set_nonblock($stream);
|
|
|
+ Log::record("socket_set_nonblock success={$success}",Log::DEBUG);
|
|
|
|
|
|
- $bufid++;
|
|
|
- if($bufid < 0) $bufid = 1;
|
|
|
+ $success = event_buffer_enable($buffer, EV_READ | EV_PERSIST);
|
|
|
+ Log::record("event_buffer_enable success={$success}",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
- public function ev_read($buffer, $bufid)
|
|
|
+ public function onConnectTimer($stream, $event, $params)
|
|
|
{
|
|
|
- $pid = posix_getpid();
|
|
|
- Log::record("ev_read begin pid={$pid}",Log::DEBUG);
|
|
|
+ $error = socket_last_error();
|
|
|
+ Log::record("onConnectTimer sig={$stream},flag={$event},error={$error}",Log::DEBUG);
|
|
|
+ $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
|
|
|
+ if($stream != false) {
|
|
|
+ $bufid = $this->add_stream($stream,$this->mEvBase);
|
|
|
+ $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public function onConnect($stream, $event, $params)
|
|
|
+ {
|
|
|
+ $error = socket_last_error();
|
|
|
+ Log::record("onConnect sig={$stream},flag={$event},socket error={$error}",Log::DEBUG);
|
|
|
+
|
|
|
+ if($event == EV_WRITE) {
|
|
|
+ $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
|
|
|
+ if($ret == 0) {
|
|
|
+ Log::record("onConnect EV_WRITE success ",Log::DEBUG);
|
|
|
+ $fd = intval($stream);
|
|
|
+ event_del($this->mEvConnects[$fd]);
|
|
|
+ event_free($this->mEvConnects[$fd]);
|
|
|
+ event_del($this->mEvTimeouts[$fd]);
|
|
|
+ event_free($this->mEvTimeouts[$fd]);
|
|
|
+ unset($this->mEvConnects[$fd]);
|
|
|
+ unset($this->mEvTimeouts[$fd]);
|
|
|
+ $bufid = $this->add_stream($stream, $this->mEvBase);
|
|
|
+ $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ Log::record("onConnect EV_WRITE Error",Log::DEBUG);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ elseif($event == EV_WRITE | EV_READ) {
|
|
|
+ Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ Log::record("onConnect ETIMEOUT",Log::DEBUG);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private function do_connect($host,$port,$args,$old_stream = false)
|
|
|
+ {
|
|
|
+ $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
|
|
|
+ socket_set_nonblock($stream);
|
|
|
+ $fd = intval($stream);
|
|
|
+
|
|
|
+ if($old_stream == false)
|
|
|
+ {
|
|
|
+ $this->mEvConnects[$fd] = event_new();
|
|
|
+ $this->mEvTimeouts[$fd] = event_timer_new();
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ $oldfd = intval($old_stream);
|
|
|
+ $this->mEvConnects[$fd] = $this->mEvConnects[$oldfd];
|
|
|
+ $this->mEvTimeouts[$fd] = $this->mEvTimeouts[$oldfd];
|
|
|
+ unset($this->mEvConnects[$oldfd]);
|
|
|
+ unset($this->mEvTimeouts[$oldfd]);
|
|
|
+ }
|
|
|
+
|
|
|
+ $ret = socket_connect($stream,$host,$port);
|
|
|
+ if($ret == false)
|
|
|
+ {
|
|
|
+ if(event_set($this->mEvConnects[$fd], $stream, EV_WRITE | EV_READ, [$this, 'onConnect'],['host' => $host,'port' => $port,'args' => $args]) == false) {
|
|
|
+ Log::record("event_set error connect fd={$stream}",Log::DEBUG);
|
|
|
+ }
|
|
|
+ if(event_base_set($this->mEvConnects[$fd], $this->mEvBase) == false) {
|
|
|
+ Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
|
|
|
+ }
|
|
|
+ if(event_timer_set($this->mEvTimeouts[$fd], [$this, 'onConnectTimer'],['stream' => $stream, 'host' => $host,'port' => $port,'args' => $args]) == false) {
|
|
|
+ Log::record("event_set error connect fd={$stream}",Log::DEBUG);
|
|
|
+ }
|
|
|
+ if(event_base_set($this->mEvTimeouts[$fd], $this->mEvBase) == false) {
|
|
|
+ Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
|
|
|
+ }
|
|
|
+ if(event_add($this->mEvConnects[$fd]) == false) {
|
|
|
+ Log::record("event_add error connect fd={$stream}",Log::DEBUG);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(event_timer_add($this->mEvTimeouts[$fd],1000000) == false) {
|
|
|
+ Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ event_del($this->mEvConnects[$fd]);
|
|
|
+ event_free($this->mEvConnects[$fd]);
|
|
|
+ event_del($this->mEvTimeouts[$fd]);
|
|
|
+ event_free($this->mEvTimeouts[$fd]);
|
|
|
+
|
|
|
+ unset($this->mEvConnects[$fd]);
|
|
|
+ unset($this->mEvTimeouts[$fd]);
|
|
|
+
|
|
|
+ return $stream;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public function connect($host,$port,$args)
|
|
|
+ {
|
|
|
+ $stream = $this->do_connect($host,$port,$args,false);
|
|
|
+ if($stream != false) {
|
|
|
+ $bufid = $this->add_stream($stream,$this->mEvBase);
|
|
|
+ $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public function onRead($buffer, $bufid)
|
|
|
+ {
|
|
|
+ Log::record("onRead bufid={$bufid}",Log::DEBUG);
|
|
|
|
|
|
$content = &$this->mContents[$bufid];
|
|
|
while (true)
|
|
@@ -154,32 +339,37 @@ abstract class srv_base
|
|
|
if($start > 0) {
|
|
|
$content = substr($content,$start);
|
|
|
}
|
|
|
- Log::record("ev_read end pid={$pid}",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
- public function ev_error($buffer, $error, $bufid)
|
|
|
+ public function onError($buffer, $event, $bufid)
|
|
|
{
|
|
|
- $error = socket_strerror($error);
|
|
|
- Log::record("ev_error id={$bufid} error={$error}",Log::DEBUG);
|
|
|
-
|
|
|
- event_buffer_disable($buffer, EV_READ | EV_WRITE);
|
|
|
- event_buffer_free($buffer);
|
|
|
+ Log::record("onError bufid={$bufid} flag={$event}",Log::DEBUG);
|
|
|
|
|
|
- if(array_key_exists($bufid,$this->mStreams)) {
|
|
|
- socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
- fclose($this->mStreams[$bufid]);
|
|
|
- unset($this->mStreams[$bufid]);
|
|
|
+ if ($event == (EVBUFFER_READ | EVBUFFER_TIMEOUT)) {
|
|
|
+ event_buffer_enable($buffer, EV_READ);
|
|
|
+ $this->write($bufid,'');
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ event_buffer_disable($buffer, EV_READ | EV_WRITE);
|
|
|
+ event_buffer_free($buffer);
|
|
|
|
|
|
- if(array_key_exists($bufid,$this->mBuffers)) {
|
|
|
- unset($this->mBuffers[$bufid]);
|
|
|
- }
|
|
|
- if(array_key_exists($bufid,$this->mContents)) {
|
|
|
- unset($this->mContents[$bufid]);
|
|
|
- }
|
|
|
+ if(array_key_exists($bufid,$this->mStreams)) {
|
|
|
+ @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
+ socket_close($this->mStreams[$bufid]);
|
|
|
+ unset($this->mStreams[$bufid]);
|
|
|
+ }
|
|
|
|
|
|
- if($this->mProcessor != null) {
|
|
|
- $this->mProcessor->onClose($bufid);
|
|
|
+ if(array_key_exists($bufid,$this->mBuffers)) {
|
|
|
+ unset($this->mBuffers[$bufid]);
|
|
|
+ }
|
|
|
+ if(array_key_exists($bufid,$this->mContents)) {
|
|
|
+ unset($this->mContents[$bufid]);
|
|
|
+ }
|
|
|
+
|
|
|
+ if($this->mProcessor != null) {
|
|
|
+ $this->mProcessor->onClose($bufid);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -194,6 +384,7 @@ abstract class srv_base
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
public function close($bufid)
|
|
|
{
|
|
|
if(!array_key_exists($bufid,$this->mBuffers)) return false;
|
|
@@ -203,8 +394,8 @@ abstract class srv_base
|
|
|
event_buffer_free($buffer);
|
|
|
|
|
|
if(array_key_exists($bufid,$this->mStreams)) {
|
|
|
- socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
- fclose($this->mStreams[$bufid]);
|
|
|
+ @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
+ socket_close($this->mStreams[$bufid]);
|
|
|
unset($this->mStreams[$bufid]);
|
|
|
}
|
|
|
|