Browse Source

fix searcher 僵死问题

stanley-king 8 years ago
parent
commit
05954e6cb3
4 changed files with 76 additions and 46 deletions
  1. 17 4
      centra_srv.php
  2. 54 40
      helper/search/server.php
  3. 1 1
      helper/search/tcp_client.php
  4. 4 1
      test/TestSearch.php

+ 17 - 4
centra_srv.php

@@ -56,6 +56,11 @@ function fork_subprocess($count,$listen_fd)
 {
     if (($child = pcntl_fork()) === 0)
     {
+//        ob_end_clean(); // Discard the output buffer and close
+//        fclose(STDIN);  // Close all of the standard
+//        fclose(STDOUT); // file descriptors as we
+//        fclose(STDERR); // are running as a daemon.
+
         echo "pid = {$child} count = {$count} " . __LINE__ ."\r\n";
         search_work($listen_fd);
         exit();
@@ -67,6 +72,8 @@ function fork_subprocess($count,$listen_fd)
     }
     else
     {
+
+
         echo "pid = {$child} count = {$count} " . __LINE__ ."\r\n";
         $ret = pcntl_waitpid($child,$status,WNOHANG);
         if($ret == 0) {
@@ -92,11 +99,17 @@ function remote_addr()
 }
 
 $listen_fd = stream_socket_server (remote_addr(), $errno, $errstr);
-
-$count = 1;
-while ($count-- > 0) {
-    fork_subprocess($count,$listen_fd);
+if($listen_fd != false)
+{
+    $count = 1;
+    while ($count-- > 0) {
+        fork_subprocess($count,$listen_fd);
+    }
+} else {
+    echo "无法创建socket,请退出之前进程.\n";
 }
 
 
 
+
+

+ 54 - 40
helper/search/server.php

@@ -7,6 +7,7 @@
  */
 
 namespace search;
+use Log;
 
 interface IProcessor
 {
@@ -19,16 +20,16 @@ class CentraHelper
     const read_time_out  = 600;
     const write_time_out = 5;
 
+    private $mListenSocket;
+    private $mEvbase;
+    private $mEv;
 
-    private $socket;
-    private $ev_base;
-    private $ev;
-    private $connections;
-    private $buffers;
-    private $contents;
-    private $connect_id;
+    private $mStreams;
+    private $mBuffers;
+    private $mContents;
+
+    private $mProcessor;
 
-    private $processor;
     private static $stInstance;
 
     public static function instance()
@@ -41,54 +42,57 @@ class CentraHelper
 
     private function __construct()
     {
-        $this->connect_id = 0;
-        $this->connections = array();
-        $this->buffers = array();
+        $this->mStreams = array();
+        $this->mBuffers = array();
+        $this->mContents = [];
     }
     public function init(IProcessor $processor)
     {
-        $this->processor = $processor;
+        $this->mProcessor = $processor;
     }
 
     public function run_loop($sockfd)
     {
-        $this->socket = $sockfd;
+        $this->mListenSocket = $sockfd;
         echo "stream_set_blocking \n";
-        if(stream_set_blocking($this->socket, 0) == false) {
+        if(stream_set_blocking($this->mListenSocket, 0) == false) {
             echo "stream_set_blocking error \n";
         }
-        $this->ev_base = event_base_new();
-        $this->ev = event_new();
+        $this->mEvbase = event_base_new();
+        $this->mEv = event_new();
 
         echo "event_set\n";
-        if(event_set($this->ev, $this->socket, EV_READ | EV_PERSIST, 'ev_accept', $this->ev_base) == false) {
+        if(event_set($this->mEv, $this->mListenSocket, EV_READ | EV_PERSIST, 'ev_accept', $this->mEvbase) == false) {
             echo "event_set error EV_READ | EV_PERSIST\n";
         }
 
         echo "event_base_set\n";
-        if(event_base_set($this->ev, $this->ev_base) == false) {
+        if(event_base_set($this->mEv, $this->mEvbase) == false) {
             echo "event_set error EV_READ | EV_PERSIST\n";
         }
 
         echo "event_add\n";
-        if(event_add($this->ev) == false) {
+        if(event_add($this->mEv) == false) {
             echo "event_add error EV_READ | EV_PERSIST\n";
         }
-        $ret = event_base_loop($this->ev_base);
 
+        $ret = event_base_loop($this->mEvbase);
         echo "event_base_loop ret={$ret}\n";
     }
 
     public function ev_accept($socket, $flag, $base)
     {
-        $this->connect_id += 1;
-        $connection = stream_socket_accept($socket);
-        if($connection == false) return;
+        Log::record("ev_accept socket_fd={$socket}",Log::DEBUG);
+
+        static $bufid = 1;
 
-        stream_set_blocking($connection, 0);
-        $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $this->connect_id);
+        $stream = stream_socket_accept($socket);
+        if($stream == false) return;
+
+        stream_set_blocking($stream, 0);
+        $buffer = event_buffer_new($stream, 'ev_read', NULL, 'ev_error', $bufid);
         if($buffer == false) {
-            fclose($connection);
+            fclose($stream);
             return;
         }
 
@@ -98,14 +102,17 @@ class CentraHelper
         event_buffer_priority_set($buffer, 10);
         event_buffer_enable($buffer, EV_READ | EV_PERSIST);
 
-        $this->connections[$this->connect_id] = $connection;
-        $this->buffers[$this->connect_id] = $buffer;
-        $this->contents[$this->connect_id] = "";
+        $this->mStreams[$bufid] = $stream;
+        $this->mBuffers[$bufid] = $buffer;
+        $this->mContents[$bufid] = "";
+
+        $bufid++;
+        if($bufid < 0) $bufid = 1;
     }
 
-    public function ev_read($buffer, $id)
+    public function ev_read($buffer, $bufid)
     {
-        $content = &$this->contents[$id];
+        $content = &$this->mContents[$bufid];
         while ($read = event_buffer_read($buffer, 256)) {
             $content .= $read;
         }
@@ -118,11 +125,12 @@ class CentraHelper
             {
                 $len = substr($content,$start,self::body_header_len);
                 $len = intval($len);
+
                 if($left >= self::body_header_len + $len)
                 {
                     $body = substr($content,$start + self::body_header_len,$len);
-                    if($this->processor != null) {
-                        $data = $this->processor->handle_input($body);
+                    if($this->mProcessor != null) {
+                        $data = $this->mProcessor->handle_input($body);
                         $header = sprintf("%010d",strlen($data));
                         $data = $header . $data;
                         event_buffer_write($buffer,$data,strlen($data));
@@ -142,16 +150,22 @@ class CentraHelper
         if($start > 0) {
             $content = substr($content,$start);
         }
+        ob_clean();
     }
 
-    public function ev_error($buffer, $error, $id)
+    public function ev_error($buffer, $error, $bufid)
     {
-        event_buffer_disable($this->buffers[$id], EV_READ | EV_WRITE);
-        event_buffer_free($this->buffers[$id]);
-        fclose($this->connections[$id]);
-        unset($this->connections[$id]);
-        unset($this->buffers[$id]);
-        unset($this->contents[$id]);
+        $error = socket_strerror($error);
+        Log::record("ev_error id={$bufid} error={$error}",Log::DEBUG);
+
+        event_buffer_disable($buffer, EV_READ | EV_WRITE);
+        event_buffer_free($buffer);
+        stream_socket_shutdown($this->mStreams[$bufid],STREAM_SHUT_RDWR);
+        fclose($this->mStreams[$bufid]);
+
+        unset($this->mStreams[$bufid]);
+        unset($this->mBuffers[$bufid]);
+        unset($this->mContents[$bufid]);
     }
 
     private function remote_addr()

+ 1 - 1
helper/search/tcp_client.php

@@ -127,7 +127,7 @@ class tcp_client
             Log::record("connect search server err --- code:{$errno} : {$err}");
             return false;
         } else {
-            //socket_set_timeout($this->mSocket,self::time_out);
+            socket_set_timeout($this->mSocket,self::time_out);
             return true;
         }
     }

+ 4 - 1
test/TestSearch.php

@@ -53,7 +53,10 @@ class TestSearch extends PHPUnit_Framework_TestCase
 
     public function testRequest()
     {
-        $result = search\tcp_client::instance()->get_result('眼影');
+        for($i = 0; $i < 10000; ++$i) {
+            $result = search\tcp_client::instance()->get_result('眼影');
+            Log::record("testRequest {$i}",Log::DEBUG);
+        }
     }
 
     public function testBrandSpecia()