|
@@ -53,46 +53,54 @@ class CentraHelper
|
|
|
|
|
|
public function run_loop($sockfd)
|
|
|
{
|
|
|
+ //$this->mListenSocket = stream_socket_server ($this->remote_addr(), $errno, $errstr);
|
|
|
$this->mListenSocket = $sockfd;
|
|
|
- echo "stream_set_blocking \n";
|
|
|
+ Log::record("stream_set_blocking",Log::DEBUG);
|
|
|
if(stream_set_blocking($this->mListenSocket, 0) == false) {
|
|
|
- echo "stream_set_blocking error \n";
|
|
|
+ Log::record("stream_set_blocking error",Log::DEBUG);
|
|
|
}
|
|
|
$this->mEvbase = event_base_new();
|
|
|
$this->mEv = event_new();
|
|
|
|
|
|
- echo "event_set\n";
|
|
|
+ Log::record("event_set",Log::DEBUG);
|
|
|
if(event_set($this->mEv, $this->mListenSocket, EV_READ | EV_PERSIST, 'ev_accept', $this->mEvbase) == false) {
|
|
|
- echo "event_set error EV_READ | EV_PERSIST\n";
|
|
|
+ Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
- echo "event_base_set\n";
|
|
|
+ Log::record("event_base_set",Log::DEBUG);
|
|
|
if(event_base_set($this->mEv, $this->mEvbase) == false) {
|
|
|
- echo "event_set error EV_READ | EV_PERSIST\n";
|
|
|
+ Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
- echo "event_add\n";
|
|
|
+ Log::record("event_add",Log::DEBUG);
|
|
|
if(event_add($this->mEv) == false) {
|
|
|
- echo "event_add error EV_READ | EV_PERSIST\n";
|
|
|
+ Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
$ret = event_base_loop($this->mEvbase);
|
|
|
- echo "event_base_loop ret={$ret}\n";
|
|
|
+ Log::record("event_base_loop ret={$ret}",Log::DEBUG);
|
|
|
}
|
|
|
|
|
|
public function ev_accept($socket, $flag, $base)
|
|
|
{
|
|
|
- Log::record("ev_accept socket_fd={$socket}",Log::DEBUG);
|
|
|
+ $pid = posix_getpid();
|
|
|
+ Log::record("ev_accept pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
|
|
|
static $bufid = 1;
|
|
|
|
|
|
$stream = stream_socket_accept($socket);
|
|
|
- if($stream == false) return;
|
|
|
+ if($stream == false) {
|
|
|
+ Log::record("stream_socket_accept return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Log::record("stream_socket_accept pid={$pid} ",Log::DEBUG);
|
|
|
|
|
|
stream_set_blocking($stream, 0);
|
|
|
$buffer = event_buffer_new($stream, 'ev_read', NULL, 'ev_error', $bufid);
|
|
|
if($buffer == false) {
|
|
|
fclose($stream);
|
|
|
+ Log::record("event_buffer_new return false pid={$pid} socket_fd={$socket}",Log::DEBUG);
|
|
|
+
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -112,6 +120,9 @@ class CentraHelper
|
|
|
|
|
|
public function ev_read($buffer, $bufid)
|
|
|
{
|
|
|
+ $pid = posix_getpid();
|
|
|
+ Log::record("ev_read pid={$pid}",Log::DEBUG);
|
|
|
+
|
|
|
$content = &$this->mContents[$bufid];
|
|
|
while ($read = event_buffer_read($buffer, 256)) {
|
|
|
$content .= $read;
|
|
@@ -129,11 +140,18 @@ class CentraHelper
|
|
|
if($left >= self::body_header_len + $len)
|
|
|
{
|
|
|
$body = substr($content,$start + self::body_header_len,$len);
|
|
|
- if($this->mProcessor != null) {
|
|
|
+ if($this->mProcessor != null)
|
|
|
+ {
|
|
|
$data = $this->mProcessor->handle_input($body);
|
|
|
$header = sprintf("%010d",strlen($data));
|
|
|
$data = $header . $data;
|
|
|
- event_buffer_write($buffer,$data,strlen($data));
|
|
|
+ $ret = event_buffer_write($buffer,$data,strlen($data));
|
|
|
+ if($ret == false) {
|
|
|
+ Log::record("event_buffer_write error pid={$pid}",Log::DEBUG);
|
|
|
+ stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
+ fclose($this->mStreams[$bufid]);
|
|
|
+ unset($this->mStreams[$bufid]);
|
|
|
+ }
|
|
|
}
|
|
|
$start += self::body_header_len + $len;
|
|
|
$left = $left - self::body_header_len - $len;
|
|
@@ -150,7 +168,6 @@ class CentraHelper
|
|
|
if($start > 0) {
|
|
|
$content = substr($content,$start);
|
|
|
}
|
|
|
- ob_clean();
|
|
|
}
|
|
|
|
|
|
public function ev_error($buffer, $error, $bufid)
|
|
@@ -160,10 +177,13 @@ class CentraHelper
|
|
|
|
|
|
event_buffer_disable($buffer, EV_READ | EV_WRITE);
|
|
|
event_buffer_free($buffer);
|
|
|
- stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
- fclose($this->mStreams[$bufid]);
|
|
|
|
|
|
- unset($this->mStreams[$bufid]);
|
|
|
+ if(array_key_exists($bufid,$this->mStreams)) {
|
|
|
+ stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
|
|
|
+ fclose($this->mStreams[$bufid]);
|
|
|
+ unset($this->mStreams[$bufid]);
|
|
|
+ }
|
|
|
+
|
|
|
unset($this->mBuffers[$bufid]);
|
|
|
unset($this->mContents[$bufid]);
|
|
|
}
|