Browse Source

fix webacc

stanley-king 6 năm trước cách đây
mục cha
commit
dbcc69bcd6

+ 28 - 7
core/framework/libraries/log.php

@@ -70,20 +70,16 @@ class Log
 
         if($lev == self::SQL) {
             $level = 'SQL';
-            $content = "[{$pid} {$now}] {$level}: {$message}\r\n";
             if(self::open_sql) {
-                $log_file = BASE_DATA_PATH.'/log/'.date('Ymd',time()).'.log';
-                file_put_contents($log_file,$content, FILE_APPEND);
+                self::write($message,$level);
             }
-            self::add_sql_log($message);
+            //self::add_sql_log($message);
             return;
         }
 
         if($lev >= self::cur_level && $lev <= self::RUN) {
             $level = self::get_level($lev);
-            $log_file = BASE_DATA_PATH . '/log/' . date('Ymd', time()) . '.log';
-            $content = "[{$pid} {$now}] {$level}: {$message}\r\n";
-            file_put_contents($log_file, $content, FILE_APPEND);
+            self::write($message,$level);
         }
 
         if($lev == self::ERR) {
@@ -91,6 +87,31 @@ class Log
         }
     }
 
+    private static $cur_file_name;
+    private static $cur_file = null;
+
+    private static function write($message,$level)
+    {
+        $now = @date('Y-m-d H:i:s', time());
+        $pid = posix_getpid();
+
+        $log_file = BASE_DATA_PATH . '/log/' . date('Ymd', time()) . '.log';
+        if(self::$cur_file_name != $log_file)
+        {
+            if(self::$cur_file != null) {
+                fclose(self::$cur_file);
+            }
+            self::$cur_file = fopen($log_file,'a');
+        }
+
+        $content = "[{$pid} {$now}] {$level}: {$message}\r\n";
+        $ret = fwrite(self::$cur_file,$content);
+        if($ret === false) {
+            self::$cur_file = fopen($log_file,'a');
+            fwrite(self::$cur_file,$content);
+        }
+    }
+
     public static function endl($lev = self::ERR)
     {
         $content = "\r\n";

+ 1 - 0
helper/bonus/scaler.php

@@ -35,6 +35,7 @@ class scaler
             }
         }
         uasort($moneys,['\bonus\scaler','rate_desc']);
+        //todo modify usort
 
         $this->mMoneys = [];
         foreach ($moneys as $item) {

+ 17 - 0
helper/event/IProcessor.php

@@ -0,0 +1,17 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: stanley-king
+ * Date: 2018/7/23
+ * Time: 下午10:56
+ */
+
+namespace event;
+
+interface IProcessor
+{
+    public function onStart();
+    public function onRequest($bufid, $body);
+    public function onClose($bufid);
+    public function onConnected($bufid,$stream,$host,$port,$args);
+}

+ 452 - 0
helper/event/buffer_looper.php

@@ -0,0 +1,452 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: stanley-king
+ * Date: 2018/7/23
+ * Time: 下午10:58
+ */
+
+namespace event;
+
+use Log;
+
+class buffer_looper
+{
+    const body_header_len = 10;
+    const read_time_out   = 600;
+    const write_time_out  = 30;
+
+    protected $mEvBase;
+    protected $mEvAccepts;
+    protected $mEvSignals;
+    protected $mEvConnects;
+
+    protected $mStreams;
+    protected $mBuffers;
+    protected $mContents;
+    protected $mProcessor;
+    protected $mBufferID;
+
+    public function __construct()
+    {
+        $this->mStreams  = [];
+        $this->mBuffers  = [];
+        $this->mContents = [];
+        $this->mEvAccepts = [];
+        $this->mEvSignals = [];
+        $this->mEvConnects = [];
+        $this->mEvTimeouts = [];
+
+        $this->mBufferID = 1;
+        event_init();
+        $this->mEvBase   = event_base_new();
+    }
+
+    public function init(IProcessor $processor)
+    {
+        $this->mProcessor = $processor;
+        $this->mProcessor->onStart();
+    }
+
+    public function run_loop()
+    {
+        $this->add_signal(SIGINT);
+        $this->add_signal(SIGQUIT);
+        $this->add_signal(SIGTERM);
+
+        $ret = event_base_loop($this->mEvBase);
+        Log::record("event_base_loop ret={$ret}",Log::DEBUG);
+    }
+
+    public function add_listen($stream)
+    {
+        $fd = intval($stream);
+        if($fd < 0) return false;
+
+        $this->mEvAccepts[$fd] = event_new();
+        $this->mStreams[$fd] = $stream;
+
+        if(event_set($this->mEvAccepts[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) {
+            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
+        }
+
+        if(event_base_set($this->mEvAccepts[$fd], $this->mEvBase) == false) {
+            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
+        }
+
+        if(event_add($this->mEvAccepts[$fd]) == false) {
+            Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
+        }
+    }
+
+    private function add_signal($val)
+    {
+        $this->mEvSignals[$val] = event_new();
+
+        if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
+            Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
+        }
+
+        if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
+            Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
+        }
+
+        if(event_add($this->mEvSignals[$val]) == false) {
+            Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
+        }
+    }
+
+    public function onSignal($sig, $flag)
+    {
+        Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
+        if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
+        {
+            event_base_loopexit($this->mEvBase);
+
+            foreach ($this->mEvTimeouts as $event) {
+                event_del($event);
+                event_free($event);
+            }
+            $this->mEvTimeouts = [];
+
+            foreach ($this->mBuffers as $bufid => $buffer) {
+                event_buffer_disable($buffer, EV_READ | EV_WRITE);
+                event_buffer_free($buffer);
+            }
+            $this->mBuffers = [];
+
+            foreach ($this->mStreams as $fd => $stream) {
+                @socket_shutdown($this->mStreams[$fd],STREAM_SHUT_RDWR);
+                @socket_close($this->mStreams[$fd]);
+            }
+            $this->mStreams = [];
+
+            $this->mContents = [];
+
+            foreach ($this->mEvSignals as $event) {
+                event_del($event);
+                event_free($event);
+            }
+            $this->mEvSignals = [];
+        }
+    }
+    public function onAccept($socket, $event)
+    {
+        Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
+        $stream = socket_accept($socket);
+        if($stream == false) {
+            Log::record("stream_socket_accept return false socket_fd={$socket}",Log::DEBUG);
+            return;
+        } else {
+            Log::record("stream_socket_accept stream={$stream}",Log::DEBUG);
+            $this->add_stream($stream);
+        }
+    }
+
+    public function onWrite($buffer, $bufid)
+    {
+
+    }
+
+    private function add_stream($stream)
+    {
+        $bufid = $this->mBufferID;
+
+        socket_set_nonblock($stream);
+        $buffer = event_buffer_new($stream, [$this,'onRead'], NULL, [$this,'onError'], $bufid);
+
+        if($buffer == false) {
+            socket_close($stream);
+            return false;
+        }
+        else
+        {
+            event_buffer_base_set($buffer,$this->mEvBase);
+            event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
+            event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
+            event_buffer_priority_set($buffer, 10);
+            event_buffer_enable($buffer, EV_READ | EV_PERSIST);
+
+            $this->mStreams[$bufid] = $stream;
+            $this->mBuffers[$bufid] = $buffer;
+            $this->mContents[$bufid] = "";
+
+            $this->mBufferID++;
+            if($this->mBufferID < 0) $this->mBufferID = 1;
+
+            return $bufid;
+        }
+    }
+
+    public function block($bufid)
+    {
+        $stream = $this->mStreams[$bufid];
+        $buffer = $this->mBuffers[$bufid];
+
+        $success = event_buffer_disable($buffer, EV_READ | EV_PERSIST);
+        Log::record("event_buffer_disable success={$success}",Log::DEBUG);
+
+        $success = socket_set_block($stream);
+        Log::record("socket_set_block success={$success}",Log::DEBUG);
+    }
+
+    public function unblock($bufid)
+    {
+        $stream = $this->mStreams[$bufid];
+        $buffer = $this->mBuffers[$bufid];
+
+        $success = socket_set_nonblock($stream);
+        Log::record("socket_set_nonblock success={$success}",Log::DEBUG);
+
+        $success = event_buffer_enable($buffer, EV_READ | EV_PERSIST);
+        Log::record("event_buffer_enable success={$success}",Log::DEBUG);
+    }
+
+    public function onConnectTimer($stream, $event, $params)
+    {
+        $error = socket_last_error();
+        Log::record("onConnectTimer sig={$stream},flag={$event},error={$error}",Log::DEBUG);
+        $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
+        if($stream != false) {
+            $bufid = $this->add_stream($stream);
+            $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
+        }
+    }
+
+    public function onConnect($stream, $event, $params)
+    {
+        $error = socket_last_error();
+        Log::record("onConnect sig={$stream},flag={$event},socket error={$error}",Log::DEBUG);
+
+        if($event == EV_WRITE) {
+            $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
+            if($ret == 0) {
+                Log::record("onConnect EV_WRITE success ",Log::DEBUG);
+                $fd = intval($stream);
+                event_del($this->mEvConnects[$fd]);
+                event_free($this->mEvConnects[$fd]);
+                event_del($this->mEvTimeouts[$fd]);
+                event_free($this->mEvTimeouts[$fd]);
+                unset($this->mEvConnects[$fd]);
+                unset($this->mEvTimeouts[$fd]);
+                $bufid = $this->add_stream($stream);
+                $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
+            }
+            else {
+                Log::record("onConnect EV_WRITE Error",Log::DEBUG);
+            }
+        }
+        elseif($event == EV_WRITE | EV_READ) {
+            Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
+        }
+        else {
+            Log::record("onConnect ETIMEOUT",Log::DEBUG);
+        }
+    }
+
+    private function do_connect($host,$port,$args,$old_stream = false)
+    {
+        $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
+        if($stream === false) return false;
+
+        socket_set_nonblock($stream);
+        $fd = intval($stream);
+
+        if($old_stream == false)
+        {
+            $this->mEvConnects[$fd] = event_new();
+            $this->mEvTimeouts[$fd] = event_timer_new();
+        }
+        else {
+            $oldfd = intval($old_stream);
+            $this->mEvConnects[$fd] = $this->mEvConnects[$oldfd];
+            $this->mEvTimeouts[$fd] = $this->mEvTimeouts[$oldfd];
+            unset($this->mEvConnects[$oldfd]);
+            unset($this->mEvTimeouts[$oldfd]);
+        }
+
+        $ret = socket_connect($stream,$host,$port);
+        if($ret == false)
+        {
+            if(event_set($this->mEvConnects[$fd], $stream, EV_WRITE | EV_READ, [$this, 'onConnect'],['host' => $host,'port' => $port,'args' => $args]) == false) {
+                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_base_set($this->mEvConnects[$fd], $this->mEvBase) == false) {
+                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_timer_set($this->mEvTimeouts[$fd], [$this, 'onConnectTimer'],['stream' => $stream, 'host' => $host,'port' => $port,'args' => $args]) == false) {
+                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_base_set($this->mEvTimeouts[$fd], $this->mEvBase) == false) {
+                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_add($this->mEvConnects[$fd]) == false) {
+                Log::record("event_add error connect fd={$stream}",Log::DEBUG);
+            }
+
+            if(event_timer_add($this->mEvTimeouts[$fd],1000000) == false) {
+                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
+            }
+            return false;
+        }
+        else {
+            event_del($this->mEvConnects[$fd]);
+            event_free($this->mEvConnects[$fd]);
+            event_del($this->mEvTimeouts[$fd]);
+            event_free($this->mEvTimeouts[$fd]);
+
+            unset($this->mEvConnects[$fd]);
+            unset($this->mEvTimeouts[$fd]);
+
+            return $stream;
+        }
+    }
+    public function connect($host,$port,$args)
+    {
+        $stream = $this->do_connect($host,$port,$args,false);
+        if($stream != false) {
+            $bufid = $this->add_stream($stream);
+            $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
+        }
+
+    }
+
+    public function onRead($buffer, $bufid)
+    {
+        Log::record("onRead bufid={$bufid}",Log::DEBUG);
+        while($read = event_buffer_read($buffer, 1024)) {
+            Log::record("read={$read}",Log::DEBUG);
+            $this->mContents[$bufid] .= $read;
+            $this->proc($bufid);
+        }
+    }
+
+    public function onError($buffer, $event, $bufid)
+    {
+        Log::record("onError bufid={$bufid} flag={$event}",Log::DEBUG);
+
+        if ($event == (EVBUFFER_READ | EVBUFFER_TIMEOUT)) {
+            event_buffer_enable($buffer, EV_READ);
+            $ret = $this->write($bufid,'');
+            if($ret == false) {
+                $this->close($bufid);
+            }
+        }
+        else
+        {
+            event_buffer_disable($buffer, EV_READ | EV_WRITE);
+            event_buffer_free($buffer);
+
+            if(array_key_exists($bufid,$this->mStreams)) {
+                @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
+                socket_close($this->mStreams[$bufid]);
+                unset($this->mStreams[$bufid]);
+            }
+
+            if(array_key_exists($bufid,$this->mBuffers)) {
+                unset($this->mBuffers[$bufid]);
+            }
+            if(array_key_exists($bufid,$this->mContents)) {
+                unset($this->mContents[$bufid]);
+            }
+
+            if($this->mProcessor != null) {
+                $this->mProcessor->onClose($bufid);
+            }
+        }
+    }
+
+    private function proc($bufid)
+    {
+        $content = $this->mContents[$bufid];
+
+        $start = 0;
+        $left = strlen($content);
+
+        while($left > self::body_header_len)
+        {
+            $header = substr($content,$start,self::body_header_len);
+            if(!is_numeric($header)) {
+                $this->close($bufid);
+                return;
+            }
+
+            $body_len = intval($header);
+            if($body_len == 0) { //这是一个心跳包
+                $start += self::body_header_len;
+                $left  -= self::body_header_len;
+            }
+            else
+            {
+                if($left >= self::body_header_len + $body_len)
+                {
+                    $body = substr($content,$start + self::body_header_len,$body_len);
+                    $this->mProcessor->onRequest($bufid,$body);
+
+                    $start += self::body_header_len + $body_len;
+                    $left  -= self::body_header_len + $body_len;
+                }
+                else {
+                    break;
+                }
+            }
+        }
+
+        if($start > 0)
+        {
+            $str = substr($content,$start);
+            if($str === false) {
+                $this->mContents[$bufid] = '';
+            }
+            else {
+                $this->mContents[$bufid] = $str;
+            }
+        }
+    }
+
+    public function write($bufid,$data)
+    {
+        if(!is_string($data)) {
+            Log::record(__METHOD__ . " write data is not string.",Log::ERR);
+        }
+        if(array_key_exists($bufid,$this->mBuffers)) {
+            $buffer = $this->mBuffers[$bufid];
+            $header = sprintf("%010d",strlen($data));
+            $data = $header . $data;
+            $ret = event_buffer_write($buffer,$data,strlen($data));
+            if($ret == false) {
+                Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR);
+            }
+            return $ret;
+        } else {
+            return false;
+        }
+    }
+
+    public function close($bufid)
+    {
+        if(!array_key_exists($bufid,$this->mBuffers)) return false;
+
+        Log::record(__METHOD__ . " close socket.",Log::DEBUG);
+
+        $buffer = $this->mBuffers[$bufid];
+        event_buffer_disable($buffer, EV_READ | EV_WRITE);
+        event_buffer_free($buffer);
+
+        if(array_key_exists($bufid,$this->mStreams)) {
+            @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
+            socket_close($this->mStreams[$bufid]);
+            unset($this->mStreams[$bufid]);
+        }
+
+        if(array_key_exists($bufid,$this->mBuffers)) {
+            unset($this->mBuffers[$bufid]);
+        }
+        if(array_key_exists($bufid,$this->mContents)) {
+            unset($this->mContents[$bufid]);
+        }
+        if($this->mProcessor != null) {
+            $this->mProcessor->onClose($bufid);
+        }
+    }
+}

+ 421 - 0
helper/event/event_looper.php

@@ -0,0 +1,421 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: stanley-king
+ * Date: 2018/7/23
+ * Time: 下午10:56
+ */
+
+namespace event;
+
+use Log;
+
+class event_looper
+{
+    const body_header_len  = 10;
+    const read_micotime    = 6000000000;
+    const connect_micotime = 5000000;
+
+    protected $mEvBase;
+    protected $mEvSignals;
+    protected $mEvSockets;
+    protected $mEvTimeouts;
+
+    protected $mStreams;
+    protected $mContents;
+
+    protected $mProcessor;
+
+    public function __construct()
+    {
+        $this->mStreams  = [];
+        $this->mContents = [];
+
+        $this->mEvSignals = [];
+        $this->mEvSockets = [];
+
+        $this->mEvTimeouts = [];
+
+        event_init();
+        $this->mEvBase   = event_base_new();
+    }
+
+    public function init(IProcessor $processor)
+    {
+        $this->mProcessor = $processor;
+        $this->mProcessor->onStart();
+    }
+
+    public function run_loop()
+    {
+        $this->add_signal(SIGINT);
+        $this->add_signal(SIGQUIT);
+        $this->add_signal(SIGTERM);
+
+        $ret = event_base_loop($this->mEvBase);
+        Log::record("event_base_loop ret={$ret}",Log::DEBUG);
+    }
+
+    public function add_listen($stream)
+    {
+        $fd = intval($stream);
+        if($fd < 0) return false;
+
+        $this->mEvSockets[$fd] = event_new();
+        if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) {
+            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
+        }
+
+        if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
+            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
+        }
+
+        if(event_add($this->mEvSockets[$fd]) == false) {
+            Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
+        }
+    }
+
+    private function add_signal($val)
+    {
+        $this->mEvSignals[$val] = event_new();
+
+        if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
+            Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
+        }
+
+        if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
+            Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
+        }
+
+        if(event_add($this->mEvSignals[$val]) == false) {
+            Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
+        }
+    }
+
+    public function onSignal($sig, $flag)
+    {
+        Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
+        if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
+        {
+            event_base_loopexit($this->mEvBase);
+
+            foreach ($this->mEvTimeouts as $event) {
+                event_del($event);
+                event_free($event);
+            }
+            $this->mEvTimeouts = [];
+
+            foreach ($this->mEvSockets as $fd => $event) {
+                event_del($event);
+                event_free($event);
+            }
+            $this->mEvSockets = [];
+
+            foreach ($this->mStreams as $fd => $stream) {
+                @socket_shutdown($this->mStreams[$fd],STREAM_SHUT_RDWR);
+                @socket_close($this->mStreams[$fd]);
+            }
+            $this->mStreams  = [];
+
+            $this->mContents = [];
+
+            foreach ($this->mEvSignals as $event) {
+                event_del($event);
+                event_free($event);
+            }
+            $this->mEvSignals = [];
+        }
+    }
+    public function onAccept($socket, $event)
+    {
+        Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
+        $stream = socket_accept($socket);
+        if($stream == false) {
+            Log::record("stream_socket_accept return false socket_fd={$socket}",Log::DEBUG);
+            return;
+        }
+        Log::record("stream_socket_accept stream={$stream}",Log::DEBUG);
+        $this->add_stream($stream);
+    }
+
+    private function add_stream($stream)
+    {
+        $fd = intval($stream);
+        socket_set_nonblock($stream);
+
+        $this->mEvSockets[$fd] = event_new();
+        $this->mStreams[$fd] = $stream;
+        $this->mContents[$fd] = '';
+
+        if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_TIMEOUT, [$this, 'onRead']) == false) {
+            Log::record("event_set error connect fd={$stream}",Log::DEBUG);
+        }
+        if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
+            Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
+        }
+        if(event_add($this->mEvSockets[$fd],self::read_micotime) == false) {
+            Log::record("event_add error connect fd={$stream}",Log::DEBUG);
+        }
+
+        return $fd;
+    }
+
+    public function block($fd)
+    {
+        $stream = $this->mStreams[$fd];
+        $event = $this->mEvSockets[$fd];
+
+        event_del($event);
+        socket_set_block($stream);
+    }
+
+    public function unblock($fd)
+    {
+        $stream = $this->mStreams[$fd];
+        $event = $this->mEvSockets[$fd];
+
+        event_add($event);
+        socket_set_nonblock($stream);
+    }
+
+    public function onTimer($stream, $event, $params)
+    {
+
+    }
+
+    public function onConnect($stream, $event, $params)
+    {
+        $error = socket_last_error();
+        Log::record("onConnect stream={$stream},event={$event},socket error={$error}",Log::DEBUG);
+
+        if($event == EV_TIMEOUT)
+        {
+            $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
+            if($stream != false) {
+                $fd = $this->add_stream($stream);
+                $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
+            }
+        }
+        elseif($event == EV_WRITE)
+        {
+            $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
+            if($ret == 0) {
+                Log::record("onConnect EV_WRITE success ",Log::DEBUG);
+                $fd = intval($stream);
+                event_del($this->mEvSockets[$fd]);
+                event_free($this->mEvSockets[$fd]);
+                unset($this->mEvSockets[$fd]);
+
+                $fd = $this->add_stream($stream);
+                $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
+            }
+            else {
+                Log::record("onConnect EV_WRITE Error",Log::DEBUG);
+            }
+        }
+        elseif($event == EV_WRITE | EV_READ)
+        {
+            Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
+
+            $fd = intval($stream);
+            event_del($this->mEvSockets[$fd]);
+
+            if(event_set($this->mEvSockets[$fd], $stream, EV_TIMEOUT, [$this, 'onConnect'],$params) == false) {
+                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
+                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
+                Log::record("event_add error connect fd={$stream}",Log::DEBUG);
+            }
+        }
+        else {
+            Log::record("onConnect ETIMEOUT",Log::DEBUG);
+        }
+    }
+
+    private function do_connect($host,$port,$args,$old_stream = false)
+    {
+        $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
+        socket_set_nonblock($stream);
+        $fd = intval($stream);
+
+        if($old_stream == false) {
+            $this->mEvSockets[$fd] = event_new();
+        }
+        else {
+            $oldfd = intval($old_stream);
+            $this->mEvSockets[$fd] = $this->mEvSockets[$oldfd];
+            unset($this->mEvSockets[$oldfd]);
+        }
+
+        $ret = socket_connect($stream,$host,$port);
+        if($ret == false)
+        {
+            if(event_set($this->mEvSockets[$fd], $stream, EV_WRITE | EV_READ | EV_TIMEOUT, [$this, 'onConnect'],
+                    ['host' => $host,'port' => $port,'args' => $args]) == false) {
+                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
+                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
+            }
+            if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
+                Log::record("event_add error connect fd={$stream}",Log::DEBUG);
+            }
+            return false;
+        }
+        else {
+            event_del($this->mEvSockets[$fd]);
+            event_free($this->mEvSockets[$fd]);
+            unset($this->mEvSockets[$fd]);
+
+            return $stream;
+        }
+    }
+
+    public function connect($host,$port,$args)
+    {
+        $stream = $this->do_connect($host,$port,$args,false);
+        if($stream != false) {
+            $bufid = $this->add_stream($stream);
+            $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
+        }
+    }
+
+    public function onRead($stream, $event)
+    {
+        $fd = intval($stream);
+        if($event == EV_TIMEOUT) {
+            Log::record("onRead stream={$stream} event=EV_TIMEOUT",Log::DEBUG);
+            event_add($this->mEvSockets[$fd],self::read_micotime);
+        }
+        elseif($event == EV_READ)
+        {
+            $read = @socket_read($stream, 1024);
+            Log::record("onRead read={$read}",Log::DEBUG);
+
+            if($read === false) { //WOULD_BLOCK INPROCESS 事件
+                event_add($this->mEvSockets[$fd],self::read_micotime);
+                return;
+            }
+            else
+            {
+                if(strlen($read) == 0) {
+                    $this->close($fd);
+                }
+                else {
+                    $this->mContents[$fd] .= $read;
+                    $this->proc($fd);
+                    event_add($this->mEvSockets[$fd],self::read_micotime);
+                }
+            }
+        }
+        else {
+            Log::record("onRead stream={$stream} event={$event}",Log::DEBUG);
+        }
+    }
+
+    private function proc($fd)
+    {
+        $content = &$this->mContents[$fd];
+        $start = 0;
+        $left = strlen($content);
+
+        while($left > self::body_header_len)
+        {
+            $header = substr($content,$start,self::body_header_len);
+            if(!is_numeric($header)) {
+                $this->close($fd);
+                return;
+            }
+
+            $body_len = intval($header);
+            if($body_len == 0) { //这是一个心跳包
+                $start += self::body_header_len;
+                $left  -= self::body_header_len;
+            }
+            else
+            {
+                if($left >= self::body_header_len + $body_len)
+                {
+                    $body = substr($content,$start + self::body_header_len,$body_len);
+                    $this->mProcessor->onRequest($fd,$body);
+
+                    $start += self::body_header_len + $body_len;
+                    $left  -= self::body_header_len + $body_len;
+                }
+                else {
+                    break;
+                }
+            }
+        }
+
+        if($start > 0)
+        {
+            $str = substr($content,$start);
+            if($str === false) {
+                $this->mContents[$fd] = '';
+            }
+            else {
+                $this->mContents[$fd] = $str;
+            }
+        }
+    }
+
+    public function onError($stream, $event)
+    {
+        $fd = intval($stream);
+        if ($event == EV_TIMEOUT)
+        {
+            $ret = $this->write($fd,'');
+            if($ret == false) {
+                $this->close($fd);
+            }
+        }
+        else
+        {
+            Log::record("onError stream={$stream} flag={$event}",Log::DEBUG);
+            $this->close($fd);
+        }
+    }
+
+    public function write($fd,$data)
+    {
+        if(!is_string($data)) {
+            Log::record(__METHOD__ . " write data is not string.",Log::ERR);
+            return false;
+        }
+
+        if(array_key_exists($fd,$this->mStreams)) {
+            $stream = $this->mStreams[$fd];
+            $header = sprintf("%010d",strlen($data));
+            $data = $header . $data;
+            $ret = socket_write($stream,$data,strlen($data));
+            return ($ret == strlen($data));
+        } else {
+            return false;
+        }
+    }
+
+    public function close($fd)
+    {
+        if(!array_key_exists($fd,$this->mStreams)) {
+            Log::record(" close socket fd={$fd}.",Log::ERR);
+            return false;
+        }
+        else
+        {
+            event_del($this->mEvSockets[$fd]);
+            event_free($this->mEvSockets[$fd]);
+            $stream = $this->mStreams[$fd];
+            @socket_shutdown($stream,STREAM_SHUT_RDWR);
+            socket_close($stream);
+            unset($this->mEvSockets[$fd]);
+            unset($this->mStreams[$fd]);
+            unset($this->mContents[$fd]);
+
+            $this->mProcessor->onClose($fd);
+        }
+    }
+}

+ 4 - 870
helper/event_looper.php

@@ -6,877 +6,11 @@
  * Time: 下午2:28
  */
 
-interface IProcessor
-{
-    public function onStart();
-    public function onRequest($bufid, $body);
-    public function onClose($bufid);
-    public function onConnected($bufid,$stream,$host,$port,$args);
-}
-
-class event_looper
-{
-    const body_header_len  = 10;
-    const read_micotime    = 6000000000;
-    const connect_micotime = 5000000;
-
-    protected $mEvBase;
-    protected $mEvSignals;
-    protected $mEvSockets;
-    protected $mEvTimeouts;
-
-    protected $mStreams;
-    protected $mContents;
-
-    protected $mProcessor;
-
-    public function __construct()
-    {
-        $this->mStreams  = [];
-        $this->mContents = [];
-
-        $this->mEvSignals = [];
-        $this->mEvSockets = [];
-
-        $this->mEvTimeouts = [];
-
-        event_init();
-        $this->mEvBase   = event_base_new();
-    }
-
-    public function init(IProcessor $processor)
-    {
-        $this->mProcessor = $processor;
-        $this->mProcessor->onStart();
-    }
-
-    public function run_loop()
-    {
-        $this->add_signal(SIGINT);
-        $this->add_signal(SIGQUIT);
-        $this->add_signal(SIGTERM);
-
-        $ret = event_base_loop($this->mEvBase);
-        Log::record("event_base_loop ret={$ret}",Log::DEBUG);
-    }
-
-    public function add_listen($stream)
-    {
-        $fd = intval($stream);
-        if($fd < 0) return false;
-
-        $this->mEvSockets[$fd] = event_new();
-        if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) {
-            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
-        }
-
-        if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
-            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
-        }
-
-        if(event_add($this->mEvSockets[$fd]) == false) {
-            Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
-        }
-    }
-
-    private function add_signal($val)
-    {
-        $this->mEvSignals[$val] = event_new();
-
-        if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
-            Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
-        }
-
-        if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
-            Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
-        }
-
-        if(event_add($this->mEvSignals[$val]) == false) {
-            Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
-        }
-    }
-
-    public function onSignal($sig, $flag)
-    {
-        Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
-        if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
-        {
-            event_base_loopexit($this->mEvBase);
-
-            foreach ($this->mEvTimeouts as $event) {
-                event_del($event);
-                event_free($event);
-            }
-            $this->mEvTimeouts = [];
-
-            foreach ($this->mEvSockets as $fd => $event) {
-                event_del($event);
-                event_free($event);
-
-                @socket_shutdown($this->mStreams[$fd],STREAM_SHUT_RDWR);
-                @socket_close($this->mStreams[$fd]);
-            }
-            $this->mEvSockets = [];
-            $this->mStreams  = [];
-            $this->mContents = [];
-
-            foreach ($this->mEvSignals as $event) {
-                event_del($event);
-                event_free($event);
-            }
-            $this->mEvSignals = [];
-        }
-    }
-    public function onAccept($socket, $event)
-    {
-        Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
-        $stream = socket_accept($socket);
-        if($stream == false) {
-            Log::record("stream_socket_accept return false socket_fd={$socket}",Log::DEBUG);
-            return;
-        }
-        Log::record("stream_socket_accept stream={$stream}",Log::DEBUG);
-        $this->add_stream($stream);
-    }
-
-    private function add_stream($stream)
-    {
-        $fd = intval($stream);
-        socket_set_nonblock($stream);
-
-        $this->mEvSockets[$fd] = event_new();
-        $this->mStreams[$fd] = $stream;
-        $this->mContents[$fd] = '';
-
-        if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_TIMEOUT, [$this, 'onRead']) == false) {
-            Log::record("event_set error connect fd={$stream}",Log::DEBUG);
-        }
-        if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
-            Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
-        }
-        if(event_add($this->mEvSockets[$fd],self::read_micotime) == false) {
-            Log::record("event_add error connect fd={$stream}",Log::DEBUG);
-        }
-
-        return $fd;
-    }
-
-    public function block($fd)
-    {
-        $stream = $this->mStreams[$fd];
-        $event = $this->mEvSockets[$fd];
-
-        event_del($event);
-        socket_set_block($stream);
-    }
-
-    public function unblock($fd)
-    {
-        $stream = $this->mStreams[$fd];
-        $event = $this->mEvSockets[$fd];
-
-        event_add($event);
-        socket_set_nonblock($stream);
-    }
-
-    public function onTimer($stream, $event, $params)
-    {
-
-    }
-
-    public function onConnect($stream, $event, $params)
-    {
-        $error = socket_last_error();
-        Log::record("onConnect stream={$stream},event={$event},socket error={$error}",Log::DEBUG);
-
-        if($event == EV_TIMEOUT)
-        {
-            $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
-            if($stream != false) {
-                $fd = $this->add_stream($stream);
-                $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
-            }
-        }
-        elseif($event == EV_WRITE)
-        {
-            $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
-            if($ret == 0) {
-                Log::record("onConnect EV_WRITE success ",Log::DEBUG);
-                $fd = intval($stream);
-                event_del($this->mEvSockets[$fd]);
-                event_free($this->mEvSockets[$fd]);
-                unset($this->mEvSockets[$fd]);
-
-                $fd = $this->add_stream($stream);
-                $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
-            }
-            else {
-                Log::record("onConnect EV_WRITE Error",Log::DEBUG);
-            }
-        }
-        elseif($event == EV_WRITE | EV_READ)
-        {
-            Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
-
-            $fd = intval($stream);
-            event_del($this->mEvSockets[$fd]);
-
-            if(event_set($this->mEvSockets[$fd], $stream, EV_TIMEOUT, [$this, 'onConnect'],$params) == false) {
-                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
-                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
-                Log::record("event_add error connect fd={$stream}",Log::DEBUG);
-            }
-        }
-        else {
-            Log::record("onConnect ETIMEOUT",Log::DEBUG);
-        }
-    }
-
-    private function do_connect($host,$port,$args,$old_stream = false)
-    {
-        $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
-        socket_set_nonblock($stream);
-        $fd = intval($stream);
-
-        if($old_stream == false) {
-            $this->mEvSockets[$fd] = event_new();
-        }
-        else {
-            $oldfd = intval($old_stream);
-            $this->mEvSockets[$fd] = $this->mEvSockets[$oldfd];
-            unset($this->mEvSockets[$oldfd]);
-        }
-
-        $ret = socket_connect($stream,$host,$port);
-        if($ret == false)
-        {
-            if(event_set($this->mEvSockets[$fd], $stream, EV_WRITE | EV_READ | EV_TIMEOUT, [$this, 'onConnect'],
-                    ['host' => $host,'port' => $port,'args' => $args]) == false) {
-                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
-                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
-                Log::record("event_add error connect fd={$stream}",Log::DEBUG);
-            }
-            return false;
-        }
-        else {
-            event_del($this->mEvSockets[$fd]);
-            event_free($this->mEvSockets[$fd]);
-            unset($this->mEvSockets[$fd]);
-
-            return $stream;
-        }
-    }
-
-    public function connect($host,$port,$args)
-    {
-        $stream = $this->do_connect($host,$port,$args,false);
-        if($stream != false) {
-            $bufid = $this->add_stream($stream);
-            $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
-        }
-    }
-
-    public function onRead($stream, $event)
-    {
-        $fd = intval($stream);
-        if($event == EV_TIMEOUT) {
-            Log::record("onRead stream={$stream} event=EV_TIMEOUT",Log::DEBUG);
-            event_add($this->mEvSockets[$fd],self::read_micotime);
-        }
-        elseif($event == EV_READ)
-        {
-            Log::record("onRead stream={$stream} event=EV_READ",Log::DEBUG);
-            $read = @socket_read($stream, 1024);
-            if($read === false) { //WOULD_BLOCK INPROCESS 事件
-                event_add($this->mEvSockets[$fd],self::read_micotime);
-                return;
-            }
-            else
-            {
-                if(strlen($read) == 0) {
-                    $this->close($fd);
-                }
-                else {
-                    $this->mContents[$fd] .= $read;
-                    $this->proc($fd);
-                    event_add($this->mEvSockets[$fd],self::read_micotime);
-                }
-            }
-        }
-        else {
-            Log::record("onRead stream={$stream} event={$event}",Log::DEBUG);
-        }
-    }
-
-    static private function all_digit($str)
-    {
-        $len = strlen($str);
-        $base = ord('0');
-        for($offset = 0; $offset < $len; $offset++)
-        {
-            $code = ord(substr($str, $offset,1)) - $base;
-            if($code < 0 || $code > 9) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private function proc($fd)
-    {
-        $content = &$this->mContents[$fd];
-        $start = 0;
-        $left = strlen($content);
-
-        while($left > self::body_header_len)
-        {
-            $header = substr($content,$start,self::body_header_len);
-            if(!self::all_digit($header)) {
-                $this->close($fd);
-                return;
-            }
-
-            $body_len = intval($header);
-            if($body_len == 0) { //这是一个心跳包
-                $start += self::body_header_len;
-                $left  -= self::body_header_len;
-            }
-            else
-            {
-                if($left >= self::body_header_len + $body_len)
-                {
-                    $body = substr($content,$start + self::body_header_len,$body_len);
-                    $this->mProcessor->onRequest($fd,$body);
-
-                    $start += self::body_header_len + $body_len;
-                    $left  -= self::body_header_len + $body_len;
-                }
-                else {
-                    break;
-                }
-            }
-        }
-
-        if($start > 0)
-        {
-            $str = substr($content,$start);
-            if($str == false) {
-                $this->mContents[$fd] = '';
-            }
-            else {
-                $this->mContents[$fd] = $str;
-            }
-        }
-    }
-
-    public function onError($stream, $event)
-    {
-        $fd = intval($stream);
-        if ($event == EV_TIMEOUT)
-        {
-            $ret = $this->write($fd,'');
-            if($ret == false) {
-                $this->close($fd);
-            }
-        }
-        else
-        {
-            Log::record("onError stream={$stream} flag={$event}",Log::DEBUG);
-            $this->close($fd);
-        }
-    }
-
-    public function write($fd,$data)
-    {
-        if(!is_string($data)) {
-            Log::record(__METHOD__ . " write data is not string.",Log::ERR);
-            return false;
-        }
-
-        if(array_key_exists($fd,$this->mStreams)) {
-            $stream = $this->mStreams[$fd];
-            $header = sprintf("%010d",strlen($data));
-            $data = $header . $data;
-            $ret = socket_write($stream,$data,strlen($data));
-            return ($ret == strlen($data));
-        } else {
-            return false;
-        }
-    }
-
-    public function close($fd)
-    {
-        if(!array_key_exists($fd,$this->mStreams)) {
-            Log::record(" close socket fd={$fd}.",Log::ERR);
-            return false;
-        }
-        else
-        {
-            event_del($this->mEvSockets[$fd]);
-            event_free($this->mEvSockets[$fd]);
-            $stream = $this->mStreams[$fd];
-            @socket_shutdown($stream,STREAM_SHUT_RDWR);
-            socket_close($stream);
-            unset($this->mEvSockets[$fd]);
-            unset($this->mStreams[$fd]);
-            unset($this->mContents[$fd]);
-
-            $this->mProcessor->onClose($fd);
-        }
-    }
-}
-
-
-class eventbuffer_looper
-{
-    const body_header_len = 10;
-    const read_time_out   = 1800;
-    const write_time_out  = 5;
-
-    protected $mEvBase;
-    protected $mEvAccepts;
-    protected $mEvSignals;
-    protected $mEvConnects;
-
-    protected $mStreams;
-    protected $mBuffers;
-    protected $mContents;
-    protected $mProcessor;
-    protected $mBufferID;
-
-    public function __construct()
-    {
-        $this->mStreams  = [];
-        $this->mBuffers  = [];
-        $this->mContents = [];
-        $this->mEvAccepts = [];
-        $this->mEvSignals = [];
-        $this->mEvConnects = [];
-        $this->mEvTimeouts = [];
-
-        $this->mBufferID = 1;
-        event_init();
-        $this->mEvBase   = event_base_new();
-    }
-
-    public function init(IProcessor $processor)
-    {
-        $this->mProcessor = $processor;
-        $this->mProcessor->onStart();
-    }
-
-    public function run_loop()
-    {
-        $this->add_signal(SIGINT);
-        $this->add_signal(SIGQUIT);
-        $this->add_signal(SIGTERM);
-
-        $ret = event_base_loop($this->mEvBase);
-        Log::record("event_base_loop ret={$ret}",Log::DEBUG);
-    }
-
-    public function add_listen($stream)
-    {
-        $fd = intval($stream);
-        if($fd < 0) return false;
-
-        $this->mEvAccepts[$fd] = event_new();
-        if(event_set($this->mEvAccepts[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) {
-            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
-        }
-
-        if(event_base_set($this->mEvAccepts[$fd], $this->mEvBase) == false) {
-            Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
-        }
-
-        if(event_add($this->mEvAccepts[$fd]) == false) {
-            Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
-        }
-    }
-
-    private function add_signal($val)
-    {
-        $this->mEvSignals[$val] = event_new();
-
-        if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
-            Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
-        }
-
-        if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
-            Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
-        }
-
-        if(event_add($this->mEvSignals[$val]) == false) {
-            Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
-        }
-    }
-
-    public function onSignal($sig, $flag)
-    {
-        Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
-        if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
-        {
-            event_base_loopexit($this->mEvBase);
-
-            foreach ($this->mBuffers as $bufid => $buffer) {
-                event_buffer_disable($buffer, EV_READ | EV_WRITE);
-                event_buffer_free($buffer);
-
-                @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
-                socket_close($this->mStreams[$bufid]);
-            }
-            $this->mStreams = [];
-            $this->mContents = [];
-            $this->mBuffers = [];
-        }
-    }
-    public function onAccept($socket, $event)
-    {
-        Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
-        $stream = socket_accept($socket);
-        if($stream == false) {
-            Log::record("stream_socket_accept return false socket_fd={$socket}",Log::DEBUG);
-            return;
-        }
-        Log::record("stream_socket_accept stream={$stream}",Log::DEBUG);
-        $this->add_stream($stream);
-    }
-
-    public function onWrite($buffer, $bufid)
-    {
-
-    }
-
-    private function add_stream($stream)
-    {
-        $bufid = $this->mBufferID;
-        socket_set_nonblock($stream);
-        $buffer = event_buffer_new($stream, [$this,'onRead'], [$this,'onWrite'], [$this,'onError'], $bufid);
-
-        if($buffer == false) {
-            socket_close($stream);
-            return false;
-        }
-        else
-        {
-            event_buffer_base_set($buffer,$this->mEvBase);
-            event_buffer_timeout_set($buffer, self::read_time_out, self::write_time_out);
-            event_buffer_watermark_set($buffer, EV_READ, 0, 1024*1024);
-            event_buffer_watermark_set($buffer, EV_WRITE, 0, 1024*1024);
-            event_buffer_priority_set($buffer, 10);
-            event_buffer_enable($buffer, EV_READ | EV_PERSIST);
-
-            $this->mStreams[$bufid] = $stream;
-            $this->mBuffers[$bufid] = $buffer;
-            $this->mContents[$bufid] = "";
-
-            $this->mBufferID++;
-            if($this->mBufferID < 0) $this->mBufferID = 1;
-
-            return $bufid;
-        }
-    }
-
-    public function block($bufid)
-    {
-        $stream = $this->mStreams[$bufid];
-        $buffer = $this->mBuffers[$bufid];
-
-        $success = event_buffer_disable($buffer, EV_READ | EV_PERSIST);
-        Log::record("event_buffer_disable success={$success}",Log::DEBUG);
-
-        $success = socket_set_block($stream);
-        Log::record("socket_set_block success={$success}",Log::DEBUG);
-    }
-
-    public function unblock($bufid)
-    {
-        $stream = $this->mStreams[$bufid];
-        $buffer = $this->mBuffers[$bufid];
-
-        $success = socket_set_nonblock($stream);
-        Log::record("socket_set_nonblock success={$success}",Log::DEBUG);
-
-        $success = event_buffer_enable($buffer, EV_READ | EV_PERSIST);
-        Log::record("event_buffer_enable success={$success}",Log::DEBUG);
-    }
-
-    public function onConnectTimer($stream, $event, $params)
-    {
-        $error = socket_last_error();
-        Log::record("onConnectTimer sig={$stream},flag={$event},error={$error}",Log::DEBUG);
-        $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
-        if($stream != false) {
-            $bufid = $this->add_stream($stream);
-            $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
-        }
-    }
-
-    public function onConnect($stream, $event, $params)
-    {
-        $error = socket_last_error();
-        Log::record("onConnect sig={$stream},flag={$event},socket error={$error}",Log::DEBUG);
-
-        if($event == EV_WRITE) {
-            $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
-            if($ret == 0) {
-                Log::record("onConnect EV_WRITE success ",Log::DEBUG);
-                $fd = intval($stream);
-                event_del($this->mEvConnects[$fd]);
-                event_free($this->mEvConnects[$fd]);
-                event_del($this->mEvTimeouts[$fd]);
-                event_free($this->mEvTimeouts[$fd]);
-                unset($this->mEvConnects[$fd]);
-                unset($this->mEvTimeouts[$fd]);
-                $bufid = $this->add_stream($stream);
-                $this->mProcessor->onConnected($bufid, $stream,$params['host'],$params['port'],$params['args']);
-            }
-            else {
-                Log::record("onConnect EV_WRITE Error",Log::DEBUG);
-            }
-        }
-        elseif($event == EV_WRITE | EV_READ) {
-            Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
-        }
-        else {
-            Log::record("onConnect ETIMEOUT",Log::DEBUG);
-        }
-    }
-
-    private function do_connect($host,$port,$args,$old_stream = false)
-    {
-        $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
-        socket_set_nonblock($stream);
-        $fd = intval($stream);
-
-        if($old_stream == false)
-        {
-            $this->mEvConnects[$fd] = event_new();
-            $this->mEvTimeouts[$fd] = event_timer_new();
-        }
-        else {
-            $oldfd = intval($old_stream);
-            $this->mEvConnects[$fd] = $this->mEvConnects[$oldfd];
-            $this->mEvTimeouts[$fd] = $this->mEvTimeouts[$oldfd];
-            unset($this->mEvConnects[$oldfd]);
-            unset($this->mEvTimeouts[$oldfd]);
-        }
-
-        $ret = socket_connect($stream,$host,$port);
-        if($ret == false)
-        {
-            if(event_set($this->mEvConnects[$fd], $stream, EV_WRITE | EV_READ, [$this, 'onConnect'],['host' => $host,'port' => $port,'args' => $args]) == false) {
-                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_base_set($this->mEvConnects[$fd], $this->mEvBase) == false) {
-                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_timer_set($this->mEvTimeouts[$fd], [$this, 'onConnectTimer'],['stream' => $stream, 'host' => $host,'port' => $port,'args' => $args]) == false) {
-                Log::record("event_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_base_set($this->mEvTimeouts[$fd], $this->mEvBase) == false) {
-                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
-            }
-            if(event_add($this->mEvConnects[$fd]) == false) {
-                Log::record("event_add error connect fd={$stream}",Log::DEBUG);
-            }
-
-            if(event_timer_add($this->mEvTimeouts[$fd],1000000) == false) {
-                Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
-            }
-            return false;
-        }
-        else {
-            event_del($this->mEvConnects[$fd]);
-            event_free($this->mEvConnects[$fd]);
-            event_del($this->mEvTimeouts[$fd]);
-            event_free($this->mEvTimeouts[$fd]);
-
-            unset($this->mEvConnects[$fd]);
-            unset($this->mEvTimeouts[$fd]);
-
-            return $stream;
-        }
-    }
-    public function connect($host,$port,$args)
-    {
-        $stream = $this->do_connect($host,$port,$args,false);
-        if($stream != false) {
-            $bufid = $this->add_stream($stream);
-            $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
-        }
-    }
-
-    static private function all_digit($str)
-    {
-        $len = strlen($str);
-        $base = ord('0');
-        for($offset = 0; $offset < $len; $offset++)
-        {
-            $code = ord(substr($str, $offset,1)) - $base;
-            if($code < 0 || $code > 9) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public function onRead($buffer, $bufid)
-    {
-        Log::record("onRead bufid={$bufid}",Log::DEBUG);
-
-        $content = &$this->mContents[$bufid];
-//        $read = event_buffer_read($this->mBuffers[$bufid], 1024 * 1024);
-        $read = event_buffer_read_all($this->mBuffers[$bufid]);
-        if(!empty($read)) {
-            Log::record("read={$read}",Log::DEBUG);
-            $read = substr($read,23);
-            $content .= $read;
-        }
-        else {
-            return;
-        }
-//        while (true)
-//        {
-////            $read = event_buffer_read($buffer, 1024);
-//            $read = event_buffer_read($this->mBuffers[$bufid], 1024 * 1024);
-//            if(empty($read)) {
-//                break;
-//            } else {
-//                Log::record("read={$read}",Log::DEBUG);
-//                $read = substr($read,12);
-//                $content .= $read;
-//                break;
-//            }
-//        }
-
-        $start = 0;
-        $left = strlen($content);
-        do
-        {
-            if($left > self::body_header_len)
-            {
-                $header = substr($content,$start,self::body_header_len);
-                $body_len = intval($header);
-
-                if(self::all_digit($header) == false) {
-                    Log::record("content={$content}",Log::DEBUG);
-                    break;
-                }
-
-                if($left >= self::body_header_len + $body_len)
-                {
-                    $body = substr($content,$start + self::body_header_len,$body_len);
-                    if($this->mProcessor != null && !empty($body)) {
-//                        Log::record("head={$header} body={$body}",Log::DEBUG);
-                        $this->mProcessor->onRequest($bufid,$body);
-                    }
-
-                    $start += self::body_header_len + $body_len;
-                    $left = $left - self::body_header_len - $body_len;
-                }
-                else {
-                    break;
-                }
-            }
-            else {
-                break;
-            }
-        } while ($left > 0);
-
-        if($start > 0) {
-            $content = substr($content,$start);
-        }
-    }
-
-    public function onError($buffer, $event, $bufid)
-    {
-        Log::record("onError bufid={$bufid} flag={$event}",Log::DEBUG);
-
-        if ($event == (EVBUFFER_READ | EVBUFFER_TIMEOUT)) {
-            event_buffer_enable($buffer, EV_READ);
-            $this->write($bufid,'');
-        }
-        else
-        {
-            event_buffer_disable($buffer, EV_READ | EV_WRITE);
-            event_buffer_free($buffer);
-
-            if(array_key_exists($bufid,$this->mStreams)) {
-                @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
-                socket_close($this->mStreams[$bufid]);
-                unset($this->mStreams[$bufid]);
-            }
-
-            if(array_key_exists($bufid,$this->mBuffers)) {
-                unset($this->mBuffers[$bufid]);
-            }
-            if(array_key_exists($bufid,$this->mContents)) {
-                unset($this->mContents[$bufid]);
-            }
-
-            if($this->mProcessor != null) {
-                $this->mProcessor->onClose($bufid);
-            }
-        }
-    }
-
-    public function write($bufid,$data)
-    {
-        if(!is_string($data)) {
-            Log::record(__METHOD__ . " write data is not string.",Log::ERR);
-        }
-        if(array_key_exists($bufid,$this->mBuffers)) {
-            $buffer = $this->mBuffers[$bufid];
-            $header = sprintf("%010d",strlen($data));
-            $data = $header . $data;
-            $ret = event_buffer_write($buffer,$data,strlen($data));
-            if($ret == false) {
-                Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR);
-            }
-            return $ret;
-        } else {
-            return false;
-        }
-    }
-
-    public function close($bufid)
-    {
-        if(!array_key_exists($bufid,$this->mBuffers)) return false;
-
-        Log::record(__METHOD__ . " close socket.",Log::DEBUG);
-
-        $buffer = $this->mBuffers[$bufid];
-        event_buffer_disable($buffer, EV_READ | EV_WRITE);
-        event_buffer_free($buffer);
-
-        if(array_key_exists($bufid,$this->mStreams)) {
-            @socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
-            socket_close($this->mStreams[$bufid]);
-            unset($this->mStreams[$bufid]);
-        }
-
-        if(array_key_exists($bufid,$this->mBuffers)) {
-            unset($this->mBuffers[$bufid]);
-        }
-        if(array_key_exists($bufid,$this->mContents)) {
-            unset($this->mContents[$bufid]);
-        }
-        if($this->mProcessor != null) {
-            $this->mProcessor->onClose($bufid);
-        }
-    }
-}
+require_once(BASE_ROOT_PATH . '/helper/event/IProcessor.php');
+require_once(BASE_ROOT_PATH . '/helper/event/event_looper.php');
+require_once(BASE_ROOT_PATH . '/helper/event/buffer_looper.php');
 
-class process_looper extends event_looper
+class process_looper extends event\buffer_looper
 {
     private static $stInstance;
     public static function instance()

+ 13 - 5
helper/room/base_room.php

@@ -120,18 +120,23 @@ abstract class base_room extends base_info
         $user = $input['user'];
 
         $userinfo = $this->find($user);
-        if($userinfo == false) return false;
+        if($userinfo == false) {
+            return false;
+        }
 
         $type = $this->validate_type($input['type']);
         $content = $this->validate_content($input['content']);
+        $seq = $input['seq'];
+        if(empty($seq)) $seq = "";
 
-        if($type == false || $content == false) return false;
+        if($type == false || $content == false) {
+            return false;
+        }
 
         $msgid = $this->record_message($userinfo['userid'],$type,$content);
         if($msgid > 0) {
-            $this->relay_broadcast('message',['msgid' => $msgid, 'from' => $userinfo,'type' => $input['type'],'content' => $content,'send_time' => time()]);
-//            $this->relay_users([$user],'message',['msgid' => $msgid,'from' => $userinfo,'type' => $input['type'],'content' => $content]);
-            return [''];
+            $this->relay_broadcast('message',['msgid' => $msgid, 'from' => $userinfo,'type' => $input['type'],'content' => $content,'send_time' => time(),'seq' => $seq]);
+            return ['act' => 'room','op' => 'message','room' => $this->room_id()];
         }
         else {
             return false;
@@ -147,6 +152,9 @@ abstract class base_room extends base_info
         elseif($stype == proto_type::msg_stype_pic) {
             return proto_type::msg_type_text;
         }
+        elseif($stype == proto_type::msg_stype_audio) {
+            return proto_type::msg_type_text;
+        }
         elseif($stype == proto_type::msg_stype_bargain) {
             return proto_type::msg_type_bargain;
         }

+ 1 - 1
helper/room/factory_processor.php

@@ -8,7 +8,7 @@
 
 namespace room;
 
-use IProcessor;
+use event\IProcessor;
 use process_looper;
 use errcode;
 use uniquer;

+ 2 - 1
helper/room/proto_type.php

@@ -52,7 +52,8 @@ class proto_type
 
     //[1,20) 基本的消息
     const msg_stype_text = 'text';
-    const msg_stype_pic  = 'pic';
+    const msg_stype_pic  = 'image';
+    const msg_stype_audio  = 'audio';
 
     //[20...) 其它自定义类型消息
     const msg_stype_bargain = 'bargain';

+ 15 - 0
helper/room/room_client.php

@@ -132,6 +132,21 @@ class room_client extends tcp_client
             return $result['data'];
         }
     }
+    public function message($room,$user,$type,$content)
+    {
+        $param = ["act" => 'room','op' => 'message','room' => $room,'user' => $user,'type' => $type,'content' => $content];
+        $result = $this->request($param);
+        if(empty($result)) return false;
+
+        $code = intval($result['code']);
+        if($code != 200) {
+            return false;
+        }
+        else {
+            return true;
+        }
+    }
+
     public function push($content)
     {
         $param = ["act" => 'factory','op' => 'push','content' => $content];

+ 12 - 10
helper/room/room_processor.php

@@ -8,7 +8,7 @@
 
 namespace room;
 
-use IProcessor;
+use event\IProcessor;
 use process_looper;
 use errcode;
 use algorithm;
@@ -56,15 +56,19 @@ class room_processor implements IProcessor
         elseif($act == proto_type::act_access) {
             $ret = $this->onAccess($bufid,$input);
             process_looper::instance()->write($bufid,$ret);
-            return $ret;
+            return true;
         }
         elseif($act == proto_type::act_room) {
             $ret = $this->onRoom($bufid,$input);
-            return $ret;
+            process_looper::instance()->write($bufid,$ret);
+            return true;
         }
         elseif($act == proto_type::act_chatwo) {
             $ret = $this->onChatwo($bufid,$input);
-            return $ret;
+            return true;
+        }
+        else {
+            return false;
         }
     }
 
@@ -159,8 +163,6 @@ class room_processor implements IProcessor
                 $room = $this->mFactory->build($roomid);
                 if($room != false) {
                     $this->mRooms[$roomid] = $room;
-                }
-                else {
                     $this->broad_access(self::build_message($roomid,$room->room_type()));
                 }
             }
@@ -269,16 +271,16 @@ class room_processor implements IProcessor
             $function = $input['op'] . 'Op';
             if (method_exists($room,$function))
             {
-                $success = $room->$function($input);
-                if($success) {
+                $ret = $room->$function($input);
+                if($ret != false) {
                     $this->write_roomsg($bufid,$room);
                 }
 
-                return true;
+                return $this->success($ret);
             }
         }
 
-        return false;
+        return $this->error(errcode::ErrRoom);
     }
     ////////////////////////////////////////Peer Message Handler////////////////////////////////////////////////////////
     private function onChatwo($bufid, $input)

+ 1 - 1
helper/search/processor.php

@@ -15,7 +15,7 @@ require_once (BASE_ROOT_PATH . '/helper/message/subscriber.php');
 
 use StatesHelper;
 use Log;
-use IProcessor;
+use event\IProcessor;
 use process_looper;
 
 class processor implements IProcessor

BIN
mac_webacc


+ 2 - 2
mobile/control/login.php

@@ -59,8 +59,8 @@ class loginControl extends mobileHomeControl
             $has_author = wechat_helper::has_userinfo();
         }
 
-        $ret = ['ismember'   => $has_mobile, 'isauthor' => $has_author, //for前向兼容
-                'hasmobile' => $has_mobile,  'hasauthor'=> $has_author,
+        $ret = ['ismember'  => $has_mobile, 'isauthor' => $has_author, //for前向兼容
+                'hasmobile' => $has_mobile, 'hasauthor'=> $has_author,
                 'member_id'  => session_helper::memberid(),
                 'HPHPSESSID' => session_helper::session_id(),
                 'userinfo' => $this->userinfo()];

+ 2 - 1
mobile/control/member_talk.php

@@ -282,7 +282,8 @@ class member_talkControl extends mobileControl //mbMemberControl
         $format = [];
         if($msg_clue['type'] == 'room')
         {
-            if(!empty($list)){
+            if(!empty($list))
+            {
                 foreach ($list as $item){
                     $format[] = [
                         "act"     => $msg_clue['type'],

+ 28 - 6
mobile/templates/default/game/test_shake.php

@@ -10,7 +10,10 @@
 <div class="maincontent">
     <input type="text" placeholder="输入内容" id="msg">
     <input type="text" placeholder="输入内容" id="to_user">
-    <button id="room_msg">房间消息</button>
+
+    <div>
+     <button id="room_single_msg">单条房间消息</button>      <button id="room_millon_msg">压力房间消息</button>
+    </div>
     <button id="peer_msg">单点消息</button>
 
     <div class="msg_list"></div>
@@ -39,7 +42,7 @@
             var msg = {
                 act: 'access',
                 op: 'login',
-                seq : seq++,
+                seq : "" + seq++,
                 token: token,
                 msgtype: 'message',
             };
@@ -53,7 +56,6 @@
             console.log(JSON.parse(datas.data));
         };
 
-
         client.onerror = function () {
             console.log('error');
         };
@@ -69,7 +71,27 @@
 
     WebStreamer();
 
-    document.getElementById('room_msg').addEventListener('click', function ()
+    document.getElementById('room_single_msg').addEventListener('click', function ()
+    {
+        var inputVal = $('#msg').val();
+        if(!inputVal) {
+            return;
+        }
+
+        var msg = {
+            act: 'room',
+            op:  'message',
+            msgtype: 'message',
+            seq : "" + seq++,
+            room:room,
+            user:user,
+            type : "text",
+            content:inputVal
+        };
+        client.send(JSON.stringify(msg));
+    });
+
+    document.getElementById('room_millon_msg').addEventListener('click', function ()
     {
         var inputVal = $('#msg').val();
         if(!inputVal) {
@@ -82,7 +104,7 @@
                 act: 'room',
                 op:  'message',
                 msgtype: 'message',
-                seq : seq++,
+                seq : "" + seq++,
                 room:room,
                 user:user,
                 type : "text",
@@ -108,7 +130,7 @@
             act: 'chatwo',
             op:  'message',
             msgtype: 'message',
-            seq : seq++,
+            seq : "" + seq++,
             from:user,
             to: to_user,
             type : "text",

+ 332 - 2
test/TestEvent.php

@@ -23,7 +23,7 @@ require_once(BASE_ROOT_PATH . '/helper/room/factory.php');
 require_once(BASE_ROOT_PATH . '/helper/room/room_client.php');
 require_once(BASE_ROOT_PATH . '/helper/room/factory_client.php');
 
-class simp_processor implements IProcessor
+class simp_processor implements event\IProcessor
 {
     public function __construct()
     {
@@ -91,12 +91,13 @@ class TestServer extends PHPUnit_Framework_TestCase
         $socket = $this->open_socket($host,$port);
 
         $processor = new simp_processor();
-        $looper = new event_looper();
+        $looper = new event\buffer_looper();
 
         $looper->init($processor);
         $looper->add_listen($socket);
         $looper->run_loop();
     }
+
     public function testSyncFactroy()
     {
         global $config;
@@ -118,17 +119,23 @@ class TestServer extends PHPUnit_Framework_TestCase
             }
         }
     }
+
     public function testAsyncOrgFactroy()
     {
         global $config;
         $host = $config['room_factory']['host'];
         $port = $config['room_factory']['port'];
 
+        $contents = [];
+
         $socket = $this->open_socket($host,$port,false);
         while (true)
         {
             $stream = socket_accept($socket);
+            $fd = intval($stream);
+            $contents[$fd] = '';
             socket_set_nonblock($stream);
+
             while (true)
             {
                 $rfds = [$stream];
@@ -139,6 +146,8 @@ class TestServer extends PHPUnit_Framework_TestCase
 
                 if (in_array($stream, $rfds))
                 {
+                    $rfd = intval($stream);
+
                     $read = @socket_read($stream, 1024);
                     if($read === false) {
                         continue;
@@ -148,7 +157,16 @@ class TestServer extends PHPUnit_Framework_TestCase
                         if (strlen($read) == 0)
                             break;
                         else {
+                            $contents[$rfd] .= $read;
                             Log::record("read={$read}", Log::DEBUG);
+
+                            $ret = $this->proc($contents[$fd]);
+                            if($ret === false) {
+                                $x = 0;
+                            }
+                            else {
+                                $contents[$rfd] = $ret;
+                            }
                         }
                     }
                 }
@@ -198,4 +216,316 @@ class TestServer extends PHPUnit_Framework_TestCase
         return $sock;
     }
 
+    const body_header_len = 10;
+
+    public function testProc()
+    {
+        $content = $this->get_content();
+        $this->proc($content);
+    }
+
+    private function proc($content)
+    {
+        $start = 0;
+        $left = strlen($content);
+
+        $i = 0;
+        while($left > self::body_header_len)
+        {
+            $header = substr($content,$start,self::body_header_len);
+            if(!is_numeric($header)) {
+                return false;
+            }
+
+            $body_len = intval($header);
+            if($body_len == 0) { //这是一个心跳包
+                $start += self::body_header_len;
+                $left  -= self::body_header_len;
+                $i++;
+            }
+            elseif($body_len < 0) {
+                return false;
+            }
+            else
+            {
+                if($left >= self::body_header_len + $body_len)
+                {
+                    $i++;
+                    $body = substr($content,$start + self::body_header_len,$body_len);
+                    $start += self::body_header_len + $body_len;
+                    $left  -= self::body_header_len + $body_len;
+                }
+                else {
+                    break;
+                }
+            }
+        }
+
+        if($start > 0)
+        {
+            $str = substr($content,$start);
+            if($str === false) {
+                return '';
+            }
+            else {
+                return $str;
+            }
+        }
+        return $content;
+    }
+    private function get_content()
+    {
+        return '0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 88
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 89
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 90
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 91
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 92
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 93
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 94
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 95
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 96
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 97
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 98
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 99
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 100
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 101
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 102
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 103
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 104
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 105
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 106
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 107
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 108
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 109
+}0000000090{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 110
+}"room" : 37,
+	"set" : 69
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 70
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 71
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 72
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 73
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 74
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 75
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 76
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 77
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 78
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 79
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 80
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 81
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 82
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 83
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 84
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 85
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 86
+}0000000089{
+	"act" : "access",
+	"msgtype" : "message",
+	"op" : "build",
+	"room" : 37,
+	"set" : 87
+}0000000089{
+	"act" : "access",
+	"msgt';
+    }
+
 }

+ 5 - 0
test/TestLog.php

@@ -21,6 +21,11 @@ class TestLog extends PHPUnit_Framework_TestCase
     public function testPairlog()
     {
         $checker = new scope_trace(__METHOD__);
+        $input = '0';
+        $ret = empty($input);
+        if('0' === false) {
+            $ret = 10;
+        }
     }
 
     public function testLoc()

+ 9 - 1
test/TestRoomFactory.php

@@ -96,7 +96,15 @@ class TestRoomFactory extends PHPUnit_Framework_TestCase
     public function testSocketInvite()
     {
         $user = 39623;
-        for ($i = 0; $i < 10000; $i++) {
+        for ($i = 0; $i < 10000000; $i++) {
+            $ret = room\factory_client::instance()->invite(37,self::admin_member_id,[$user]);
+        }
+    }
+
+    public function testSocketCreate()
+    {
+        $user = 39623;
+        for ($i = 0; $i < 100000; $i++) {
             $ret = room\factory_client::instance()->invite(37,self::admin_member_id,[$user]);
         }
     }

+ 21 - 0
test/TestRoomSrv.php

@@ -89,4 +89,25 @@ class TestRoomSrv extends PHPUnit_Framework_TestCase
         $rinfo = new room\base_info($params);
         return $rinfo->creator();
     }
+
+    public function testBuild()
+    {
+        $client = new room\room_client("192.168.0.200",2001);
+        for ($i = 0; $i < 100000; $i++) {
+            $ret = $client->build(37);
+        }
+    }
+
+    public function testMessage()
+    {
+        $user = 39623;
+
+        $client = new room\room_client("192.168.0.200",2001);
+        $ret = $client->build(37);
+
+        for ($i = 0; $i < 100000; $i++) {
+            $ret = $client->message(37,$user,'text','wwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww');
+
+        }
+    }
 }

+ 101 - 0
test/event.php

@@ -0,0 +1,101 @@
+<?php
+$socket = stream_socket_server ('tcp://0.0.0.0:2002', $errno, $errstr);
+stream_set_blocking($socket, 0);
+event_init();
+$base = event_base_new();
+$event = event_new();
+event_set($event, $socket, EV_READ | EV_PERSIST, 'ev_accept', $base);
+event_base_set($event, $base);
+event_add($event);
+event_base_loop($base);
+
+$GLOBALS['connections'] = array();
+$GLOBALS['buffers'] = array();
+
+function ev_accept($socket, $flag, $base) {
+    static $id = 0;
+
+    $connection = stream_socket_accept($socket);
+    stream_set_blocking($connection, 0);
+
+    $id += 1;
+
+    $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $id);
+    event_buffer_base_set($buffer, $base);
+    event_buffer_timeout_set($buffer, 30, 30);
+    event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
+    event_buffer_priority_set($buffer, 10);
+    event_buffer_enable($buffer, EV_READ | EV_PERSIST);
+
+    // we need to save both buffer and connection outside
+    $GLOBALS['connections'][$id] = $connection;
+    $GLOBALS['buffers'][$id] = $buffer;
+}
+
+function ev_error($buffer, $error, $id) {
+    event_buffer_disable($GLOBALS['buffers'][$id], EV_READ | EV_WRITE);
+    event_buffer_free($GLOBALS['buffers'][$id]);
+    fclose($GLOBALS['connections'][$id]);
+    unset($GLOBALS['buffers'][$id], $GLOBALS['connections'][$id]);
+}
+
+
+function proc($content)
+{
+    $body_header_len = 10;
+
+    $start = 0;
+    $left = strlen($content);
+    while($left > $body_header_len)
+    {
+        $header = substr($content,$start,$body_header_len);
+        if(!is_numeric($header)) {
+            return;
+        }
+
+        $body_len = intval($header);
+        if($body_len == 0) { //这是一个心跳包
+            $start += $body_header_len;
+            $left  -= $body_header_len;
+        }
+        else
+        {
+            if($left >= $body_header_len + $body_len)
+            {
+                $body = substr($content,$start + $body_header_len,$body_len);
+//                    $this->mProcessor->onRequest($fd,$body);
+
+                $start += $body_header_len + $body_len;
+                $left  -= $body_header_len + $body_len;
+            }
+            else {
+                break;
+            }
+        }
+    }
+
+    if($start > 0)
+    {
+        $str = substr($content,$start);
+        if($str === false) {
+            return '';
+        }
+        else {
+            return $str;
+        }
+    }
+    return $content;
+}
+
+function ev_read($buffer, $id) {
+    $content = '';
+    while ($read = event_buffer_read($buffer, 1024))
+    {
+        echo $read;
+        $content .= $read;
+        $content = proc($content);
+        echo $content;
+//        var_dump($read);
+    }
+}
+?>

BIN
ugcman


BIN
webacc