123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- <?php
- /**
- * Created by PhpStorm.
- * User: stanley-king
- * Date: 2018/7/23
- * Time: 下午10:56
- */
- namespace event;
- use Log;
- class event_looper
- {
- const body_header_len = 10;
- const read_micotime = 6000000000;
- const connect_micotime = 5000000;
- protected $mEvBase;
- protected $mEvSignals;
- protected $mEvSockets;
- protected $mEvTimeouts;
- protected $mStreams;
- protected $mContents;
- protected $mProcessor;
- public function __construct()
- {
- $this->mStreams = [];
- $this->mContents = [];
- $this->mEvSignals = [];
- $this->mEvSockets = [];
- $this->mEvTimeouts = [];
- $this->mEvBase = event_base_new();
- }
- public function init(IProcessor $processor)
- {
- $this->mProcessor = $processor;
- $this->mProcessor->onStart();
- }
- public function run_loop()
- {
- $this->add_signal(SIGINT);
- $this->add_signal(SIGQUIT);
- $this->add_signal(SIGTERM);
- $ret = event_base_loop($this->mEvBase);
- Log::record("event_base_loop ret={$ret}",Log::DEBUG);
- }
- public function add_listen($stream)
- {
- $fd = intval($stream);
- if($fd < 0) return false;
- $this->mEvSockets[$fd] = event_new();
- if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) {
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- if(event_add($this->mEvSockets[$fd]) == false) {
- Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- }
- private function add_signal($val)
- {
- $this->mEvSignals[$val] = event_new();
- if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
- Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
- }
- if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
- Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
- }
- if(event_add($this->mEvSignals[$val]) == false) {
- Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
- }
- }
- public function onSignal($sig, $flag)
- {
- Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
- if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
- {
- event_base_loopexit($this->mEvBase);
- foreach ($this->mEvTimeouts as $event) {
- event_del($event);
- event_free($event);
- }
- $this->mEvTimeouts = [];
- foreach ($this->mEvSockets as $fd => $event) {
- event_del($event);
- event_free($event);
- }
- $this->mEvSockets = [];
- foreach ($this->mStreams as $fd => $stream) {
- @socket_shutdown($this->mStreams[$fd],STREAM_SHUT_RDWR);
- @socket_close($this->mStreams[$fd]);
- }
- $this->mStreams = [];
- $this->mContents = [];
- foreach ($this->mEvSignals as $event) {
- event_del($event);
- event_free($event);
- }
- $this->mEvSignals = [];
- }
- }
- public function onAccept($socket, $event)
- {
- Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
- $stream = socket_accept($socket);
- if($stream == false) {
- Log::record("socket_accept return false socket_fd={$socket}",Log::DEBUG);
- return;
- }
- Log::record("socket_accept stream={$stream}",Log::DEBUG);
- $this->add_stream($stream);
- }
- private function add_stream($stream)
- {
- $fd = intval($stream);
- socket_set_nonblock($stream);
- $this->mEvSockets[$fd] = event_new();
- $this->mStreams[$fd] = $stream;
- $this->mContents[$fd] = '';
- if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_TIMEOUT, [$this, 'onRead']) == false) {
- Log::record("event_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
- Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_add($this->mEvSockets[$fd],self::read_micotime) == false) {
- Log::record("event_add error connect fd={$stream}",Log::DEBUG);
- }
- return $fd;
- }
- public function block($fd)
- {
- $stream = $this->mStreams[$fd];
- $event = $this->mEvSockets[$fd];
- event_del($event);
- socket_set_block($stream);
- }
- public function unblock($fd)
- {
- $stream = $this->mStreams[$fd];
- $event = $this->mEvSockets[$fd];
- event_add($event);
- socket_set_nonblock($stream);
- }
- public function onTimer($stream, $event, $params)
- {
- }
- public function onConnect($stream, $event, $params)
- {
- $error = socket_last_error();
- Log::record("onConnect stream={$stream},event={$event},socket error={$error}",Log::DEBUG);
- if($event == EV_TIMEOUT)
- {
- $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
- if($stream != false) {
- $fd = $this->add_stream($stream);
- $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
- }
- }
- elseif($event == EV_WRITE)
- {
- $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
- if($ret == 0) {
- Log::record("onConnect EV_WRITE success ",Log::DEBUG);
- $fd = intval($stream);
- event_del($this->mEvSockets[$fd]);
- event_free($this->mEvSockets[$fd]);
- unset($this->mEvSockets[$fd]);
- $fd = $this->add_stream($stream);
- $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
- }
- else {
- Log::record("onConnect EV_WRITE Error",Log::DEBUG);
- }
- }
- elseif($event == EV_WRITE | EV_READ)
- {
- Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
- $fd = intval($stream);
- event_del($this->mEvSockets[$fd]);
- if(event_set($this->mEvSockets[$fd], $stream, EV_TIMEOUT, [$this, 'onConnect'],$params) == false) {
- Log::record("event_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
- Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
- Log::record("event_add error connect fd={$stream}",Log::DEBUG);
- }
- }
- else {
- Log::record("onConnect ETIMEOUT",Log::DEBUG);
- }
- }
- private function do_connect($host,$port,$args,$old_stream = false)
- {
- $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
- socket_set_nonblock($stream);
- $fd = intval($stream);
- if($old_stream == false) {
- $this->mEvSockets[$fd] = event_new();
- }
- else {
- $oldfd = intval($old_stream);
- socket_close($old_stream);
- $this->mEvSockets[$fd] = $this->mEvSockets[$oldfd];
- unset($this->mEvSockets[$oldfd]);
- }
- $ret = socket_connect($stream,$host,$port);
- if($ret == false)
- {
- if(event_set($this->mEvSockets[$fd], $stream, EV_WRITE | EV_READ | EV_TIMEOUT, [$this, 'onConnect'],
- ['host' => $host,'port' => $port,'args' => $args]) == false) {
- Log::record("event_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
- Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
- Log::record("event_add error connect fd={$stream}",Log::DEBUG);
- }
- return false;
- }
- else {
- event_del($this->mEvSockets[$fd]);
- event_free($this->mEvSockets[$fd]);
- unset($this->mEvSockets[$fd]);
- return $stream;
- }
- }
- public function connect($host,$port,$args)
- {
- $stream = $this->do_connect($host,$port,$args,false);
- if($stream != false) {
- $bufid = $this->add_stream($stream);
- $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
- }
- }
- public function onRead($stream, $event)
- {
- $fd = intval($stream);
- if($event == EV_TIMEOUT) {
- Log::record("onRead stream={$stream} event=EV_TIMEOUT",Log::DEBUG);
- event_add($this->mEvSockets[$fd],self::read_micotime);
- }
- elseif($event == EV_READ)
- {
- $read = @socket_read($stream, 1024);
- Log::record("onRead read={$read}",Log::DEBUG);
- if($read === false) { //WOULD_BLOCK INPROCESS 事件
- event_add($this->mEvSockets[$fd],self::read_micotime);
- return;
- }
- else
- {
- if(strlen($read) == 0) {
- $this->close($fd);
- }
- else {
- $this->mContents[$fd] .= $read;
- $this->proc($fd);
- event_add($this->mEvSockets[$fd],self::read_micotime);
- }
- }
- }
- else {
- Log::record("onRead stream={$stream} event={$event}",Log::DEBUG);
- }
- }
- private function proc($fd)
- {
- $content = &$this->mContents[$fd];
- $start = 0;
- $left = strlen($content);
- while($left > self::body_header_len)
- {
- $header = substr($content,$start,self::body_header_len);
- if(!is_numeric($header)) {
- $this->close($fd);
- return;
- }
- $body_len = intval($header);
- if($body_len == 0) { //这是一个心跳包
- $start += self::body_header_len;
- $left -= self::body_header_len;
- }
- elseif($left >= self::body_header_len + $body_len)
- {
- $body = substr($content,$start + self::body_header_len,$body_len);
- $this->mProcessor->onRequest($fd,$body);
- $start += self::body_header_len + $body_len;
- $left -= self::body_header_len + $body_len;
- }
- else {
- break;
- }
- }
- if($start > 0)
- {
- $str = substr($content,$start);
- if($str === false) {
- $this->mContents[$fd] = '';
- }
- else {
- $this->mContents[$fd] = $str;
- }
- }
- }
- public function onError($stream, $event)
- {
- $fd = intval($stream);
- if ($event == EV_TIMEOUT)
- {
- $ret = $this->write($fd,'');
- if($ret == false) {
- $this->close($fd);
- }
- }
- else
- {
- Log::record("onError stream={$stream} flag={$event}",Log::DEBUG);
- $this->close($fd);
- }
- }
- public function write($fd,$data)
- {
- if(!is_string($data)) {
- Log::record(__METHOD__ . " write data is not string.",Log::ERR);
- return false;
- }
- if(array_key_exists($fd,$this->mStreams)) {
- $stream = $this->mStreams[$fd];
- $header = sprintf("%010d",strlen($data));
- $data = $header . $data;
- $ret = socket_write($stream,$data,strlen($data));
- return ($ret == strlen($data));
- } else {
- return false;
- }
- }
- public function close($fd)
- {
- if(!array_key_exists($fd,$this->mStreams)) {
- Log::record(" close socket fd={$fd}.",Log::ERR);
- return false;
- }
- else
- {
- event_del($this->mEvSockets[$fd]);
- event_free($this->mEvSockets[$fd]);
- $stream = $this->mStreams[$fd];
- @socket_shutdown($stream,STREAM_SHUT_RDWR);
- socket_close($stream);
- unset($this->mEvSockets[$fd]);
- unset($this->mStreams[$fd]);
- unset($this->mContents[$fd]);
- $this->mProcessor->onClose($fd);
- }
- }
- }
|