123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- <?php
- namespace event;
- use Log;
- use scope_trace;
- use Event;
- use EventBase;
- use EventListener;
- use EventBufferEvent;
- use Exception;
- class SocketReader
- {
- const body_header_len = 10;
- private $mBufferEvent;
- protected $mContent;
- private $mProcessor;
- private $mFd;
- private $mParent;
- public function __construct($base,$fd,$processor,$parent)
- {
- $this->mProcessor = $processor;
- $this->mFd = $fd;
- $this->mParent = $parent;
- $this->mBufferEvent = new EventBufferEvent($base, $fd, EventBufferEvent::OPT_CLOSE_ON_FREE);
- $this->mBufferEvent->setCallbacks([$this, "onRead"], NULL, [$this, "onEvent"], NULL);
- if (!$this->mBufferEvent->enable(Event::READ)) {
- Log::record("Failed to enable READ",Log::ERR);
- throw new Exception("Failed to enable READ");
- }
- }
- public function __destruct()
- {
- $scope = new scope_trace(__METHOD__);
- }
- public function onRead($bev, $ctx)
- {
- new scope_trace(__METHOD__);
- while($read = $this->mBufferEvent->read(1024)) {
- Log::record("read={$read}",Log::DEBUG);
- $this->mContent .= $read;
- $this->proc();
- }
- }
- public function onEvent($bev, $events, $ctx)
- {
- $scope = new scope_trace(__METHOD__);
- if ($events & EventBufferEvent::ERROR) {
- Log::record('Error from bufferevent');
- }
- if ($events & EventBufferEvent::EOF) {
- Log::record('Socker recv EOF.');
- }
- if ($events & (EventBufferEvent::EOF | EventBufferEvent::ERROR)) {
- $this->mBufferEvent->disable(Event::READ);
- $this->mBufferEvent->close();
- $this->mBufferEvent->free();
- $this->mBufferEvent = null;
- $this->mParent->remove($this->mFd);
- $this->mParent = null;
- $this->mProcessor = null;
- }
- }
- public function write($data)
- {
- $scope = new scope_trace(__METHOD__);
- return $this->mBufferEvent->write($data);
- }
- private function proc()
- {
- $content = &$this->mContent;
- $start = 0;
- $left = strlen($content);
- while($left > self::body_header_len)
- {
- $header = substr($content,$start,self::body_header_len);
- if(!is_numeric($header)) {
- $this->mBufferEvent->close();
- return;
- }
- $body_len = intval($header);
- if($body_len == 0) { //这是一个心跳包
- $start += self::body_header_len;
- $left -= self::body_header_len;
- }
- elseif($left >= self::body_header_len + $body_len)
- {
- $body = substr($content,$start + self::body_header_len,$body_len);
- $this->mProcessor->onRequest($this->mFd,$body);
- $start += self::body_header_len + $body_len;
- $left -= self::body_header_len + $body_len;
- }
- else {
- break;
- }
- }
- if($start > 0)
- {
- $str = substr($content,$start);
- if($str === false) {
- $this->mContent = '';
- }
- else {
- $this->mContent = $str;
- }
- }
- }
- }
- class EventLooper
- {
- protected $mEvBase;
- protected $mEvSignals;
- protected $mEvTimeouts;
- protected $mListener;
- protected $mReaders;
- protected $mProcessor;
- public function __construct()
- {
- $this->mEvSignals = [];
- $this->mEvTimeouts = [];
- $this->mReaders = [];
- $this->mEvBase = new EventBase();
- }
- public function init(IProcessor $processor)
- {
- $this->mProcessor = $processor;
- $this->mProcessor->onStart();
- }
- public function run_loop()
- {
- $scope = new scope_trace(__METHOD__);
- // $this->add_signal(SIGINT);
- // $this->add_signal(SIGQUIT);
- // $this->add_signal(SIGTERM);
- Log::record('EventLooper start run loop....',Log::DEBUG);
- // $this->mEvBase->loop();
- $this->mEvBase->dispatch();
- }
- public function add_listen($stream)
- {
- $this->mListener = new EventListener($this->mEvBase, [$this, 'onAccept'], $this->mEvBase,
- EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $stream);
- if (!$this->mListener) {
- Log::record("Couldn't create listener",Log::ERR);
- return false;
- }
- $this->mListener->setErrorCallback([$this, 'onAcceptError']);
- }
- private function add_signal($val)
- {
- $base = $this->mEvBase;
- $this->mEvSignals[$val] = Event::signal($base, $val, [$this, 'onSignal']);
- $this->mEvSignals[$val]->add();
- }
- public function onSignal($sig, $flag)
- {
- $scope = new scope_trace(__METHOD__);
- Log::record("ev_signal sig={$sig},flag={$flag}", Log::DEBUG);
- $this->mEvBase->exit(NULL);
- }
- public function onAccept($listener, $fd, $address, $ctx)
- {
- $scope = new scope_trace(__METHOD__);
- Log::record("fd = {$fd}",Log::DEBUG);
- $this->mReaders[$fd] = new SocketReader($this->mEvBase, $fd,$this->mProcessor,$this);
- }
- public function onAcceptError($listener, $ctx)
- {
- $scope = new scope_trace(__METHOD__);
- $errno = EventUtil::getLastSocketErrno();
- $errstr = EventUtil::getLastSocketError();
- Log::record("Got an error {$errno} {$errstr} on the listener shutting down.");
- $this->mEvBase->exit(NULL);
- }
- public function write($bufid,$data)
- {
- $scope = new scope_trace(__METHOD__);
- if(!is_string($data)) {
- Log::record(__METHOD__ . " write data is not string.",Log::ERR);
- }
- if(array_key_exists($bufid,$this->mReaders)) {
- $buffer = $this->mReaders[$bufid];
- $header = sprintf("%010d",strlen($data));
- $data = $header . $data;
- $ret = $buffer->write($data);
- if($ret == false) {
- Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR);
- }
- return $ret;
- } else {
- return false;
- }
- }
- public function close($bufid)
- {
- $scope = new scope_trace(__METHOD__);
- if(!array_key_exists($bufid,$this->mReaders)) {
- return false;
- }
- $buffer = $this->mReaders[$bufid];
- $this->mReaders[$bufid] = null;
- unset($buffer);
- unset($this->mReaders[$bufid]);
- if($this->mProcessor != null) {
- $this->mProcessor->onClose($bufid);
- }
- }
- public function remove($fd)
- {
- $scope = new scope_trace(__METHOD__);
- Log::record("fd = {$fd}",Log::DEBUG);
- unset($this->mReaders[$fd]);
- }
- }
|