123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- <?php
- /**
- * Created by PhpStorm.
- * User: stanley-king
- * Date: 2018/7/23
- * Time: 下午10:58
- */
- namespace event;
- use Log;
- class buffer_looper
- {
- const body_header_len = 10;
- const read_time_out = 600;
- const write_time_out = 30;
- protected $mEvBase;
- protected $mEvAccepts;
- protected $mEvSignals;
- protected $mEvConnects;
- protected $mStreams;
- protected $mBuffers;
- protected $mContents;
- protected $mProcessor;
- public function __construct()
- {
- $this->mStreams = [];
- $this->mBuffers = [];
- $this->mContents = [];
- $this->mEvAccepts = [];
- $this->mEvSignals = [];
- $this->mEvConnects = [];
- $this->mEvTimeouts = [];
- //event_init(); //for libevent C 库 2.16 版本
- $this->mEvBase = event_base_new();
- }
- public function init(IProcessor $processor)
- {
- $this->mProcessor = $processor;
- $this->mProcessor->onStart();
- }
- public function handle_error($level, $message, $file, $line)
- {
- if($level == E_NOTICE) return;
- $trace = "buffer_looper: level={$level},msg={$message} file={$file},line={$line}\n";
- $backtrace = debug_backtrace();
- foreach ($backtrace as $item) {
- $trace .= "{$item['file']}\t{$item['line']}\t{$item['function']}\n";
- }
- Log::record($trace,Log::ERR);
- }
- public function run_loop()
- {
- set_error_handler([$this, 'handle_error']);
- $this->add_signal(SIGINT);
- $this->add_signal(SIGQUIT);
- $this->add_signal(SIGTERM);
- Log::record("event_base_loop running....",Log::DEBUG);
- $ret = event_base_loop($this->mEvBase);
- Log::record("event_base_loop ret={$ret}",Log::DEBUG);
- }
- public function add_listen($stream)
- {
- $fd = intval($stream);
- if($fd < 0) return false;
- $this->mEvAccepts[$fd] = event_new();
- $this->mStreams[$fd] = $stream;
- if(event_set($this->mEvAccepts[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) {
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- if(event_base_set($this->mEvAccepts[$fd], $this->mEvBase) == false) {
- Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- if(event_add($this->mEvAccepts[$fd]) == false) {
- Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
- }
- }
- private function add_signal($val)
- {
- $this->mEvSignals[$val] = event_new();
- if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
- Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
- }
- if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
- Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
- }
- if(event_add($this->mEvSignals[$val]) == false) {
- Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
- }
- }
- public function onSignal($sig, $flag)
- {
- Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
- if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
- {
- event_base_loopexit($this->mEvBase);
- foreach ($this->mEvTimeouts as $event) {
- event_del($event);
- event_free($event);
- }
- $this->mEvTimeouts = [];
- foreach ($this->mBuffers as $bufid => $buffer) {
- event_buffer_disable($buffer, EV_READ | EV_WRITE);
- event_buffer_free($buffer);
- }
- $this->mBuffers = [];
- foreach ($this->mStreams as $fd => $stream) {
- socket_close($this->mStreams[$fd]);
- }
- $this->mStreams = [];
- $this->mContents = [];
- foreach ($this->mEvSignals as $event) {
- event_del($event);
- event_free($event);
- }
- $this->mEvSignals = [];
- }
- }
- public function onAccept($socket, $event)
- {
- Log::record("onAccept",Log::DEBUG);
- Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
- $stream = socket_accept($socket);
- if($stream == false) {
- Log::record("socket_accept return false socket_fd={$socket}",Log::DEBUG);
- return;
- } else {
- Log::record("socket_accept stream={$stream}",Log::DEBUG);
- $this->add_stream($stream);
- }
- }
- public function onWrite($buffer, $bufid)
- {
- }
- private function add_stream($stream)
- {
- $bufid = intval($stream);
- socket_set_nonblock($stream);
- $buffer = event_buffer_new($stream, [$this,'onRead'], NULL, [$this,'onError'], $bufid);
- if($buffer == false) {
- socket_close($stream);
- return false;
- }
- else
- {
- event_buffer_base_set($buffer,$this->mEvBase);
- event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
- event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
- event_buffer_priority_set($buffer, 10);
- event_buffer_enable($buffer, EV_READ | EV_PERSIST);
- $this->mStreams[$bufid] = $stream;
- $this->mBuffers[$bufid] = $buffer;
- $this->mContents[$bufid] = "";
- return $bufid;
- }
- }
- public function block($bufid)
- {
- $stream = $this->mStreams[$bufid];
- $buffer = $this->mBuffers[$bufid];
- $success = event_buffer_disable($buffer, EV_READ | EV_PERSIST);
- Log::record("event_buffer_disable success={$success}",Log::DEBUG);
- $success = socket_set_block($stream);
- Log::record("socket_set_block success={$success}",Log::DEBUG);
- }
- public function unblock($bufid)
- {
- $stream = $this->mStreams[$bufid];
- $buffer = $this->mBuffers[$bufid];
- $success = socket_set_nonblock($stream);
- Log::record("socket_set_nonblock success={$success}",Log::DEBUG);
- $success = event_buffer_enable($buffer, EV_READ | EV_PERSIST);
- Log::record("event_buffer_enable success={$success}",Log::DEBUG);
- }
- public function onConnectTimer($stream, $event, $params)
- {
- $error = socket_last_error();
- Log::record("onConnectTimer sig={$stream},flag={$event},error={$error}",Log::DEBUG);
- $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
- if($stream != false) {
- $bufid = $this->add_stream($stream);
- $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
- }
- }
- public function onConnect($stream, $event, $params)
- {
- $error = socket_last_error();
- Log::record("onConnect sig={$stream},flag={$event},socket error={$error}",Log::DEBUG);
- if($event == EV_WRITE)
- {
- $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
- if($ret == 0) {
- Log::record("onConnect EV_WRITE success ",Log::DEBUG);
- $fd = intval($stream);
- event_del($this->mEvConnects[$fd]);
- event_free($this->mEvConnects[$fd]);
- event_del($this->mEvTimeouts[$fd]);
- event_free($this->mEvTimeouts[$fd]);
- unset($this->mEvConnects[$fd]);
- unset($this->mEvTimeouts[$fd]);
- $bufid = $this->add_stream($stream);
- $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
- }
- else {
- Log::record("onConnect EV_WRITE Error",Log::DEBUG);
- }
- }
- elseif($event == EV_WRITE | EV_READ)
- {
- Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
- }
- else {
- Log::record("onConnect ETIMEOUT",Log::DEBUG);
- }
- }
- private function do_connect($host,$port,$args,$old_stream = false)
- {
- $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
- if($stream === false) return false;
- socket_set_nonblock($stream);
- $fd = intval($stream);
- if($old_stream == false)
- {
- $this->mEvConnects[$fd] = event_new();
- $this->mEvTimeouts[$fd] = event_timer_new();
- }
- else {
- $oldfd = intval($old_stream);
- socket_close($old_stream);
- Log::record("close old stream={$old_stream}",Log::DEBUG);
- $this->mEvConnects[$fd] = $this->mEvConnects[$oldfd];
- $this->mEvTimeouts[$fd] = $this->mEvTimeouts[$oldfd];
- unset($this->mEvConnects[$oldfd]);
- unset($this->mEvTimeouts[$oldfd]);
- }
- $ret = socket_connect($stream,$host,$port);
- if($ret == false)
- {
- if(event_set($this->mEvConnects[$fd], $stream, EV_WRITE | EV_READ, [$this, 'onConnect'],['host' => $host,'port' => $port,'args' => $args]) == false) {
- Log::record("event_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_base_set($this->mEvConnects[$fd], $this->mEvBase) == false) {
- Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_add($this->mEvConnects[$fd]) == false) {
- Log::record("event_add error connect fd={$stream}",Log::DEBUG);
- }
- if(event_timer_set($this->mEvTimeouts[$fd], [$this, 'onConnectTimer'],['stream' => $stream, 'host' => $host,'port' => $port,'args' => $args]) == false) {
- Log::record("event_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_base_set($this->mEvTimeouts[$fd], $this->mEvBase) == false) {
- Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
- }
- if(event_timer_add($this->mEvTimeouts[$fd],1000000) == false) {
- Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
- }
- return false;
- }
- else {
- event_del($this->mEvConnects[$fd]);
- event_free($this->mEvConnects[$fd]);
- event_del($this->mEvTimeouts[$fd]);
- event_free($this->mEvTimeouts[$fd]);
- unset($this->mEvConnects[$fd]);
- unset($this->mEvTimeouts[$fd]);
- return $stream;
- }
- }
- public function connect($host,$port,$args)
- {
- $stream = $this->do_connect($host,$port,$args,false);
- if($stream != false) {
- $bufid = $this->add_stream($stream);
- $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
- }
- }
- public function onRead($buffer, $bufid)
- {
- Log::record("onRead bufid={$bufid}",Log::DEBUG);
- while($read = event_buffer_read($buffer, 1024)) {
- Log::record("read={$read}",Log::DEBUG);
- $this->mContents[$bufid] .= $read;
- $this->proc($bufid);
- }
- }
- public function onError($buffer, $event, $bufid)
- {
- Log::record("onError bufid={$bufid} flag={$event}",Log::DEBUG);
- if ($event == (EVBUFFER_READ | EVBUFFER_TIMEOUT)) {
- event_buffer_enable($buffer, EV_READ);
- $ret = $this->write($bufid,'');
- if($ret == false) {
- $this->close($bufid);
- }
- }
- else
- {
- event_buffer_disable($buffer, EV_READ | EV_WRITE);
- event_buffer_free($buffer);
- if(array_key_exists($bufid,$this->mStreams)) {
- socket_close($this->mStreams[$bufid]);
- unset($this->mStreams[$bufid]);
- }
- if(array_key_exists($bufid,$this->mBuffers)) {
- unset($this->mBuffers[$bufid]);
- }
- if(array_key_exists($bufid,$this->mContents)) {
- unset($this->mContents[$bufid]);
- }
- if($this->mProcessor != null) {
- $this->mProcessor->onClose($bufid);
- }
- }
- }
- private function proc($bufid)
- {
- $content = $this->mContents[$bufid];
- $start = 0;
- $left = strlen($content);
- while($left > self::body_header_len)
- {
- $header = substr($content,$start,self::body_header_len);
- if(!is_numeric($header)) {
- $this->close($bufid);
- return;
- }
- $body_len = intval($header);
- if($body_len == 0) { //这是一个心跳包
- $start += self::body_header_len;
- $left -= self::body_header_len;
- }
- elseif($left >= self::body_header_len + $body_len)
- {
- $body = substr($content,$start + self::body_header_len,$body_len);
- $this->mProcessor->onRequest($bufid,$body);
- $start += self::body_header_len + $body_len;
- $left -= self::body_header_len + $body_len;
- }
- else {
- break;
- }
- }
- if($start > 0)
- {
- $str = substr($content,$start);
- if($str === false) {
- $this->mContents[$bufid] = '';
- }
- else {
- $this->mContents[$bufid] = $str;
- }
- }
- }
- public function write($bufid,$data)
- {
- if(!is_string($data)) {
- Log::record(__METHOD__ . " write data is not string.",Log::ERR);
- }
- if(array_key_exists($bufid,$this->mBuffers)) {
- $buffer = $this->mBuffers[$bufid];
- $header = sprintf("%010d",strlen($data));
- $data = $header . $data;
- $ret = event_buffer_write($buffer,$data,strlen($data));
- if($ret == false) {
- Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR);
- }
- return $ret;
- } else {
- return false;
- }
- }
- public function close($bufid)
- {
- if(!array_key_exists($bufid,$this->mBuffers)) return false;
- Log::record(__METHOD__ . " close socket.",Log::DEBUG);
- $buffer = $this->mBuffers[$bufid];
- event_buffer_disable($buffer, EV_READ | EV_WRITE);
- event_buffer_free($buffer);
- if(array_key_exists($bufid,$this->mStreams)) {
- socket_close($this->mStreams[$bufid]);
- unset($this->mStreams[$bufid]);
- }
- if(array_key_exists($bufid,$this->mBuffers)) {
- unset($this->mBuffers[$bufid]);
- }
- if(array_key_exists($bufid,$this->mContents)) {
- unset($this->mContents[$bufid]);
- }
- if($this->mProcessor != null) {
- $this->mProcessor->onClose($bufid);
- }
- }
- }
|