mStreams = []; $this->mBuffers = []; $this->mContents = []; $this->mEvAccepts = []; $this->mEvSignals = []; $this->mEvConnects = []; $this->mEvTimeouts = []; //event_init(); //for libevent C 库 2.16 版本 $this->mEvBase = event_base_new(); } public function init(IProcessor $processor) { $this->mProcessor = $processor; $this->mProcessor->onStart(); } public function handle_error($level, $message, $file, $line) { if($level == E_NOTICE) return; $trace = "buffer_looper: level={$level},msg={$message} file={$file},line={$line}\n"; $backtrace = debug_backtrace(); foreach ($backtrace as $item) { $trace .= "{$item['file']}\t{$item['line']}\t{$item['function']}\n"; } Log::record($trace,Log::ERR); } public function run_loop() { set_error_handler([$this, 'handle_error']); $this->add_signal(SIGINT); $this->add_signal(SIGQUIT); $this->add_signal(SIGTERM); Log::record("event_base_loop running....",Log::DEBUG); $ret = event_base_loop($this->mEvBase); Log::record("event_base_loop ret={$ret}",Log::DEBUG); } public function add_listen($stream) { $fd = intval($stream); if($fd < 0) return false; $this->mEvAccepts[$fd] = event_new(); $this->mStreams[$fd] = $stream; if(event_set($this->mEvAccepts[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) { Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG); } if(event_base_set($this->mEvAccepts[$fd], $this->mEvBase) == false) { Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG); } if(event_add($this->mEvAccepts[$fd]) == false) { Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG); } } private function add_signal($val) { $this->mEvSignals[$val] = event_new(); if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) { Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG); } if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) { Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG); } if(event_add($this->mEvSignals[$val]) == false) { Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG); } } public function onSignal($sig, $flag) { Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG); if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM) { event_base_loopexit($this->mEvBase); foreach ($this->mEvTimeouts as $event) { event_del($event); event_free($event); } $this->mEvTimeouts = []; foreach ($this->mBuffers as $bufid => $buffer) { event_buffer_disable($buffer, EV_READ | EV_WRITE); event_buffer_free($buffer); } $this->mBuffers = []; foreach ($this->mStreams as $fd => $stream) { socket_close($this->mStreams[$fd]); } $this->mStreams = []; $this->mContents = []; foreach ($this->mEvSignals as $event) { event_del($event); event_free($event); } $this->mEvSignals = []; } } public function onAccept($socket, $event) { Log::record("onAccept",Log::DEBUG); Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG); $stream = socket_accept($socket); if($stream == false) { Log::record("socket_accept return false socket_fd={$socket}",Log::DEBUG); return; } else { Log::record("socket_accept stream={$stream}",Log::DEBUG); $this->add_stream($stream); } } public function onWrite($buffer, $bufid) { } private function add_stream($stream) { $bufid = intval($stream); socket_set_nonblock($stream); $buffer = event_buffer_new($stream, [$this,'onRead'], NULL, [$this,'onError'], $bufid); if($buffer == false) { socket_close($stream); return false; } else { event_buffer_base_set($buffer,$this->mEvBase); event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out); event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); event_buffer_priority_set($buffer, 10); event_buffer_enable($buffer, EV_READ | EV_PERSIST); $this->mStreams[$bufid] = $stream; $this->mBuffers[$bufid] = $buffer; $this->mContents[$bufid] = ""; return $bufid; } } public function block($bufid) { $stream = $this->mStreams[$bufid]; $buffer = $this->mBuffers[$bufid]; $success = event_buffer_disable($buffer, EV_READ | EV_PERSIST); Log::record("event_buffer_disable success={$success}",Log::DEBUG); $success = socket_set_block($stream); Log::record("socket_set_block success={$success}",Log::DEBUG); } public function unblock($bufid) { $stream = $this->mStreams[$bufid]; $buffer = $this->mBuffers[$bufid]; $success = socket_set_nonblock($stream); Log::record("socket_set_nonblock success={$success}",Log::DEBUG); $success = event_buffer_enable($buffer, EV_READ | EV_PERSIST); Log::record("event_buffer_enable success={$success}",Log::DEBUG); } public function onConnectTimer($stream, $event, $params) { $error = socket_last_error(); Log::record("onConnectTimer sig={$stream},flag={$event},error={$error}",Log::DEBUG); $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']); if($stream != false) { $bufid = $this->add_stream($stream); $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']); } } public function onConnect($stream, $event, $params) { $error = socket_last_error(); Log::record("onConnect sig={$stream},flag={$event},socket error={$error}",Log::DEBUG); if($event == EV_WRITE) { $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR); if($ret == 0) { Log::record("onConnect EV_WRITE success ",Log::DEBUG); $fd = intval($stream); event_del($this->mEvConnects[$fd]); event_free($this->mEvConnects[$fd]); event_del($this->mEvTimeouts[$fd]); event_free($this->mEvTimeouts[$fd]); unset($this->mEvConnects[$fd]); unset($this->mEvTimeouts[$fd]); $bufid = $this->add_stream($stream); $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']); } else { Log::record("onConnect EV_WRITE Error",Log::DEBUG); } } elseif($event == EV_WRITE | EV_READ) { Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG); } else { Log::record("onConnect ETIMEOUT",Log::DEBUG); } } private function do_connect($host,$port,$args,$old_stream = false) { $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if($stream === false) return false; socket_set_nonblock($stream); $fd = intval($stream); if($old_stream == false) { $this->mEvConnects[$fd] = event_new(); $this->mEvTimeouts[$fd] = event_timer_new(); } else { $oldfd = intval($old_stream); socket_close($old_stream); Log::record("close old stream={$old_stream}",Log::DEBUG); $this->mEvConnects[$fd] = $this->mEvConnects[$oldfd]; $this->mEvTimeouts[$fd] = $this->mEvTimeouts[$oldfd]; unset($this->mEvConnects[$oldfd]); unset($this->mEvTimeouts[$oldfd]); } $ret = socket_connect($stream,$host,$port); if($ret == false) { if(event_set($this->mEvConnects[$fd], $stream, EV_WRITE | EV_READ, [$this, 'onConnect'],['host' => $host,'port' => $port,'args' => $args]) == false) { Log::record("event_set error connect fd={$stream}",Log::DEBUG); } if(event_base_set($this->mEvConnects[$fd], $this->mEvBase) == false) { Log::record("event_base_set error connect fd={$stream}",Log::DEBUG); } if(event_add($this->mEvConnects[$fd]) == false) { Log::record("event_add error connect fd={$stream}",Log::DEBUG); } if(event_timer_set($this->mEvTimeouts[$fd], [$this, 'onConnectTimer'],['stream' => $stream, 'host' => $host,'port' => $port,'args' => $args]) == false) { Log::record("event_set error connect fd={$stream}",Log::DEBUG); } if(event_base_set($this->mEvTimeouts[$fd], $this->mEvBase) == false) { Log::record("event_base_set error connect fd={$stream}",Log::DEBUG); } if(event_timer_add($this->mEvTimeouts[$fd],1000000) == false) { Log::record("event_base_set error connect fd={$stream}",Log::DEBUG); } return false; } else { event_del($this->mEvConnects[$fd]); event_free($this->mEvConnects[$fd]); event_del($this->mEvTimeouts[$fd]); event_free($this->mEvTimeouts[$fd]); unset($this->mEvConnects[$fd]); unset($this->mEvTimeouts[$fd]); return $stream; } } public function connect($host,$port,$args) { $stream = $this->do_connect($host,$port,$args,false); if($stream != false) { $bufid = $this->add_stream($stream); $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args); } } public function onRead($buffer, $bufid) { Log::record("onRead bufid={$bufid}",Log::DEBUG); while($read = event_buffer_read($buffer, 1024)) { Log::record("read={$read}",Log::DEBUG); $this->mContents[$bufid] .= $read; $this->proc($bufid); } } public function onError($buffer, $event, $bufid) { Log::record("onError bufid={$bufid} flag={$event}",Log::DEBUG); if ($event == (EVBUFFER_READ | EVBUFFER_TIMEOUT)) { event_buffer_enable($buffer, EV_READ); $ret = $this->write($bufid,''); if($ret == false) { $this->close($bufid); } } else { event_buffer_disable($buffer, EV_READ | EV_WRITE); event_buffer_free($buffer); if(array_key_exists($bufid,$this->mStreams)) { socket_close($this->mStreams[$bufid]); unset($this->mStreams[$bufid]); } if(array_key_exists($bufid,$this->mBuffers)) { unset($this->mBuffers[$bufid]); } if(array_key_exists($bufid,$this->mContents)) { unset($this->mContents[$bufid]); } if($this->mProcessor != null) { $this->mProcessor->onClose($bufid); } } } private function proc($bufid) { $content = $this->mContents[$bufid]; $start = 0; $left = strlen($content); while($left > self::body_header_len) { $header = substr($content,$start,self::body_header_len); if(!is_numeric($header)) { $this->close($bufid); return; } $body_len = intval($header); if($body_len == 0) { //这是一个心跳包 $start += self::body_header_len; $left -= self::body_header_len; } elseif($left >= self::body_header_len + $body_len) { $body = substr($content,$start + self::body_header_len,$body_len); $this->mProcessor->onRequest($bufid,$body); $start += self::body_header_len + $body_len; $left -= self::body_header_len + $body_len; } else { break; } } if($start > 0) { $str = substr($content,$start); if($str === false) { $this->mContents[$bufid] = ''; } else { $this->mContents[$bufid] = $str; } } } public function write($bufid,$data) { if(!is_string($data)) { Log::record(__METHOD__ . " write data is not string.",Log::ERR); } if(array_key_exists($bufid,$this->mBuffers)) { $buffer = $this->mBuffers[$bufid]; $header = sprintf("%010d",strlen($data)); $data = $header . $data; $ret = event_buffer_write($buffer,$data,strlen($data)); if($ret == false) { Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR); } return $ret; } else { return false; } } public function close($bufid) { if(!array_key_exists($bufid,$this->mBuffers)) return false; Log::record(__METHOD__ . " close socket.",Log::DEBUG); $buffer = $this->mBuffers[$bufid]; event_buffer_disable($buffer, EV_READ | EV_WRITE); event_buffer_free($buffer); if(array_key_exists($bufid,$this->mStreams)) { socket_close($this->mStreams[$bufid]); unset($this->mStreams[$bufid]); } if(array_key_exists($bufid,$this->mBuffers)) { unset($this->mBuffers[$bufid]); } if(array_key_exists($bufid,$this->mContents)) { unset($this->mContents[$bufid]); } if($this->mProcessor != null) { $this->mProcessor->onClose($bufid); } } }