mStreams = []; $this->mContents = []; $this->mEvSignals = []; $this->mEvSockets = []; $this->mEvTimeouts = []; $this->mEvBase = event_base_new(); } public function init(IProcessor $processor) { $this->mProcessor = $processor; $this->mProcessor->onStart(); } public function run_loop() { $this->add_signal(SIGINT); $this->add_signal(SIGQUIT); $this->add_signal(SIGTERM); $ret = event_base_loop($this->mEvBase); Log::record("event_base_loop ret={$ret}",Log::DEBUG); } public function add_listen($stream) { $fd = intval($stream); if($fd < 0) return false; $this->mEvSockets[$fd] = event_new(); if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) { Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG); } if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) { Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG); } if(event_add($this->mEvSockets[$fd]) == false) { Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG); } } private function add_signal($val) { $this->mEvSignals[$val] = event_new(); if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) { Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG); } if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) { Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG); } if(event_add($this->mEvSignals[$val]) == false) { Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG); } } public function onSignal($sig, $flag) { Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG); if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM) { event_base_loopexit($this->mEvBase); foreach ($this->mEvTimeouts as $event) { event_del($event); event_free($event); } $this->mEvTimeouts = []; foreach ($this->mEvSockets as $fd => $event) { event_del($event); event_free($event); } $this->mEvSockets = []; foreach ($this->mStreams as $fd => $stream) { @socket_shutdown($this->mStreams[$fd],STREAM_SHUT_RDWR); @socket_close($this->mStreams[$fd]); } $this->mStreams = []; $this->mContents = []; foreach ($this->mEvSignals as $event) { event_del($event); event_free($event); } $this->mEvSignals = []; } } public function onAccept($socket, $event) { Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG); $stream = socket_accept($socket); if($stream == false) { Log::record("socket_accept return false socket_fd={$socket}",Log::DEBUG); return; } Log::record("socket_accept stream={$stream}",Log::DEBUG); $this->add_stream($stream); } private function add_stream($stream) { $fd = intval($stream); socket_set_nonblock($stream); $this->mEvSockets[$fd] = event_new(); $this->mStreams[$fd] = $stream; $this->mContents[$fd] = ''; if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_TIMEOUT, [$this, 'onRead']) == false) { Log::record("event_set error connect fd={$stream}",Log::DEBUG); } if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) { Log::record("event_base_set error connect fd={$stream}",Log::DEBUG); } if(event_add($this->mEvSockets[$fd],self::read_micotime) == false) { Log::record("event_add error connect fd={$stream}",Log::DEBUG); } return $fd; } public function block($fd) { $stream = $this->mStreams[$fd]; $event = $this->mEvSockets[$fd]; event_del($event); socket_set_block($stream); } public function unblock($fd) { $stream = $this->mStreams[$fd]; $event = $this->mEvSockets[$fd]; event_add($event); socket_set_nonblock($stream); } public function onTimer($stream, $event, $params) { } public function onConnect($stream, $event, $params) { $error = socket_last_error(); Log::record("onConnect stream={$stream},event={$event},socket error={$error}",Log::DEBUG); if($event == EV_TIMEOUT) { $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']); if($stream != false) { $fd = $this->add_stream($stream); $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']); } } elseif($event == EV_WRITE) { $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR); if($ret == 0) { Log::record("onConnect EV_WRITE success ",Log::DEBUG); $fd = intval($stream); event_del($this->mEvSockets[$fd]); event_free($this->mEvSockets[$fd]); unset($this->mEvSockets[$fd]); $fd = $this->add_stream($stream); $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']); } else { Log::record("onConnect EV_WRITE Error",Log::DEBUG); } } elseif($event == EV_WRITE | EV_READ) { Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG); $fd = intval($stream); event_del($this->mEvSockets[$fd]); if(event_set($this->mEvSockets[$fd], $stream, EV_TIMEOUT, [$this, 'onConnect'],$params) == false) { Log::record("event_set error connect fd={$stream}",Log::DEBUG); } if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) { Log::record("event_base_set error connect fd={$stream}",Log::DEBUG); } if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) { Log::record("event_add error connect fd={$stream}",Log::DEBUG); } } else { Log::record("onConnect ETIMEOUT",Log::DEBUG); } } private function do_connect($host,$port,$args,$old_stream = false) { $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); socket_set_nonblock($stream); $fd = intval($stream); if($old_stream == false) { $this->mEvSockets[$fd] = event_new(); } else { $oldfd = intval($old_stream); socket_close($old_stream); $this->mEvSockets[$fd] = $this->mEvSockets[$oldfd]; unset($this->mEvSockets[$oldfd]); } $ret = socket_connect($stream,$host,$port); if($ret == false) { if(event_set($this->mEvSockets[$fd], $stream, EV_WRITE | EV_READ | EV_TIMEOUT, [$this, 'onConnect'], ['host' => $host,'port' => $port,'args' => $args]) == false) { Log::record("event_set error connect fd={$stream}",Log::DEBUG); } if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) { Log::record("event_base_set error connect fd={$stream}",Log::DEBUG); } if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) { Log::record("event_add error connect fd={$stream}",Log::DEBUG); } return false; } else { event_del($this->mEvSockets[$fd]); event_free($this->mEvSockets[$fd]); unset($this->mEvSockets[$fd]); return $stream; } } public function connect($host,$port,$args) { $stream = $this->do_connect($host,$port,$args,false); if($stream != false) { $bufid = $this->add_stream($stream); $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args); } } public function onRead($stream, $event) { $fd = intval($stream); if($event == EV_TIMEOUT) { Log::record("onRead stream={$stream} event=EV_TIMEOUT",Log::DEBUG); event_add($this->mEvSockets[$fd],self::read_micotime); } elseif($event == EV_READ) { $read = @socket_read($stream, 1024); Log::record("onRead read={$read}",Log::DEBUG); if($read === false) { //WOULD_BLOCK INPROCESS 事件 event_add($this->mEvSockets[$fd],self::read_micotime); return; } else { if(strlen($read) == 0) { $this->close($fd); } else { $this->mContents[$fd] .= $read; $this->proc($fd); event_add($this->mEvSockets[$fd],self::read_micotime); } } } else { Log::record("onRead stream={$stream} event={$event}",Log::DEBUG); } } private function proc($fd) { $content = &$this->mContents[$fd]; $start = 0; $left = strlen($content); while($left > self::body_header_len) { $header = substr($content,$start,self::body_header_len); if(!is_numeric($header)) { $this->close($fd); return; } $body_len = intval($header); if($body_len == 0) { //这是一个心跳包 $start += self::body_header_len; $left -= self::body_header_len; } elseif($left >= self::body_header_len + $body_len) { $body = substr($content,$start + self::body_header_len,$body_len); $this->mProcessor->onRequest($fd,$body); $start += self::body_header_len + $body_len; $left -= self::body_header_len + $body_len; } else { break; } } if($start > 0) { $str = substr($content,$start); if($str === false) { $this->mContents[$fd] = ''; } else { $this->mContents[$fd] = $str; } } } public function onError($stream, $event) { $fd = intval($stream); if ($event == EV_TIMEOUT) { $ret = $this->write($fd,''); if($ret == false) { $this->close($fd); } } else { Log::record("onError stream={$stream} flag={$event}",Log::DEBUG); $this->close($fd); } } public function write($fd,$data) { if(!is_string($data)) { Log::record(__METHOD__ . " write data is not string.",Log::ERR); return false; } if(array_key_exists($fd,$this->mStreams)) { $stream = $this->mStreams[$fd]; $header = sprintf("%010d",strlen($data)); $data = $header . $data; $ret = socket_write($stream,$data,strlen($data)); return ($ret == strlen($data)); } else { return false; } } public function close($fd) { if(!array_key_exists($fd,$this->mStreams)) { Log::record(" close socket fd={$fd}.",Log::ERR); return false; } else { event_del($this->mEvSockets[$fd]); event_free($this->mEvSockets[$fd]); $stream = $this->mStreams[$fd]; @socket_shutdown($stream,STREAM_SHUT_RDWR); socket_close($stream); unset($this->mEvSockets[$fd]); unset($this->mStreams[$fd]); unset($this->mContents[$fd]); $this->mProcessor->onClose($fd); } } }