mProcessor = $processor; $this->mFd = $fd; $this->mParent = $parent; $this->mBufferEvent = new EventBufferEvent($base, $fd, EventBufferEvent::OPT_CLOSE_ON_FREE); $this->mBufferEvent->setCallbacks([$this, "onRead"], NULL, [$this, "onEvent"], NULL); if (!$this->mBufferEvent->enable(Event::READ)) { Log::record("Failed to enable READ",Log::ERR); throw new Exception("Failed to enable READ"); } } public function __destruct() { $scope = new scope_trace(__METHOD__); } public function onRead($bev, $ctx) { new scope_trace(__METHOD__); while($read = $this->mBufferEvent->read(1024)) { Log::record("read={$read}",Log::DEBUG); $this->mContent .= $read; $this->proc(); } } public function onEvent($bev, $events, $ctx) { $scope = new scope_trace(__METHOD__); if ($events & EventBufferEvent::ERROR) { Log::record('Error from bufferevent'); } if ($events & EventBufferEvent::EOF) { Log::record('Socker recv EOF.'); } if ($events & (EventBufferEvent::EOF | EventBufferEvent::ERROR)) { $this->mBufferEvent->disable(Event::READ); $this->mBufferEvent->close(); $this->mBufferEvent->free(); $this->mBufferEvent = null; $this->mParent->remove($this->mFd); $this->mParent = null; $this->mProcessor = null; } } public function write($data) { $scope = new scope_trace(__METHOD__); return $this->mBufferEvent->write($data); } private function proc() { $content = &$this->mContent; $start = 0; $left = strlen($content); while($left > self::body_header_len) { $header = substr($content,$start,self::body_header_len); if(!is_numeric($header)) { $this->mBufferEvent->close(); return; } $body_len = intval($header); if($body_len == 0) { //这是一个心跳包 $start += self::body_header_len; $left -= self::body_header_len; } elseif($left >= self::body_header_len + $body_len) { $body = substr($content,$start + self::body_header_len,$body_len); $this->mProcessor->onRequest($this->mFd,$body); $start += self::body_header_len + $body_len; $left -= self::body_header_len + $body_len; } else { break; } } if($start > 0) { $str = substr($content,$start); if($str === false) { $this->mContent = ''; } else { $this->mContent = $str; } } } } class EventLooper { protected $mEvBase; protected $mEvSignals; protected $mEvTimeouts; protected $mListener; protected $mReaders; protected $mProcessor; public function __construct() { $this->mEvSignals = []; $this->mEvTimeouts = []; $this->mReaders = []; $this->mEvBase = new EventBase(); } public function init(IProcessor $processor) { $this->mProcessor = $processor; $this->mProcessor->onStart(); } public function run_loop() { $scope = new scope_trace(__METHOD__); // $this->add_signal(SIGINT); // $this->add_signal(SIGQUIT); // $this->add_signal(SIGTERM); Log::record('EventLooper start run loop....',Log::DEBUG); // $this->mEvBase->loop(); $this->mEvBase->dispatch(); } public function add_listen($stream) { $this->mListener = new EventListener($this->mEvBase, [$this, 'onAccept'], $this->mEvBase, EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $stream); if (!$this->mListener) { Log::record("Couldn't create listener",Log::ERR); return false; } $this->mListener->setErrorCallback([$this, 'onAcceptError']); } private function add_signal($val) { $base = $this->mEvBase; $this->mEvSignals[$val] = Event::signal($base, $val, [$this, 'onSignal']); $this->mEvSignals[$val]->add(); } public function onSignal($sig, $flag) { $scope = new scope_trace(__METHOD__); Log::record("ev_signal sig={$sig},flag={$flag}", Log::DEBUG); $this->mEvBase->exit(NULL); } public function onAccept($listener, $fd, $address, $ctx) { $scope = new scope_trace(__METHOD__); Log::record("fd = {$fd}",Log::DEBUG); $this->mReaders[$fd] = new SocketReader($this->mEvBase, $fd,$this->mProcessor,$this); } public function onAcceptError($listener, $ctx) { $scope = new scope_trace(__METHOD__); $errno = EventUtil::getLastSocketErrno(); $errstr = EventUtil::getLastSocketError(); Log::record("Got an error {$errno} {$errstr} on the listener shutting down."); $this->mEvBase->exit(NULL); } public function write($bufid,$data) { $scope = new scope_trace(__METHOD__); if(!is_string($data)) { Log::record(__METHOD__ . " write data is not string.",Log::ERR); } if(array_key_exists($bufid,$this->mReaders)) { $buffer = $this->mReaders[$bufid]; $header = sprintf("%010d",strlen($data)); $data = $header . $data; $ret = $buffer->write($data); if($ret == false) { Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR); } return $ret; } else { return false; } } public function close($bufid) { $scope = new scope_trace(__METHOD__); if(!array_key_exists($bufid,$this->mReaders)) { return false; } $buffer = $this->mReaders[$bufid]; $this->mReaders[$bufid] = null; unset($buffer); unset($this->mReaders[$bufid]); if($this->mProcessor != null) { $this->mProcessor->onClose($bufid); } } public function remove($fd) { $scope = new scope_trace(__METHOD__); Log::record("fd = {$fd}",Log::DEBUG); unset($this->mReaders[$fd]); } }