EventLooper.php 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. <?php
  2. namespace event;
  3. use Log;
  4. use scope_trace;
  5. use Event;
  6. use EventBase;
  7. use EventListener;
  8. use EventBufferEvent;
  9. use Exception;
  10. class SocketReader
  11. {
  12. const body_header_len = 10;
  13. private $mBufferEvent;
  14. protected $mContent;
  15. private $mProcessor;
  16. private $mFd;
  17. private $mParent;
  18. public function __construct($base,$fd,$processor,$parent)
  19. {
  20. $this->mProcessor = $processor;
  21. $this->mFd = $fd;
  22. $this->mParent = $parent;
  23. $this->mBufferEvent = new EventBufferEvent($base, $fd, EventBufferEvent::OPT_CLOSE_ON_FREE);
  24. $this->mBufferEvent->setCallbacks([$this, "onRead"], NULL, [$this, "onEvent"], NULL);
  25. if (!$this->mBufferEvent->enable(Event::READ)) {
  26. Log::record("Failed to enable READ",Log::ERR);
  27. throw new Exception("Failed to enable READ");
  28. }
  29. }
  30. public function __destruct()
  31. {
  32. $scope = new scope_trace(__METHOD__);
  33. }
  34. public function onRead($bev, $ctx)
  35. {
  36. new scope_trace(__METHOD__);
  37. while($read = $this->mBufferEvent->read(1024)) {
  38. Log::record("read={$read}",Log::DEBUG);
  39. $this->mContent .= $read;
  40. $this->proc();
  41. }
  42. }
  43. public function onEvent($bev, $events, $ctx)
  44. {
  45. $scope = new scope_trace(__METHOD__);
  46. if ($events & EventBufferEvent::ERROR) {
  47. Log::record('Error from bufferevent');
  48. }
  49. if ($events & EventBufferEvent::EOF) {
  50. Log::record('Socker recv EOF.');
  51. }
  52. if ($events & (EventBufferEvent::EOF | EventBufferEvent::ERROR)) {
  53. $this->mBufferEvent->disable(Event::READ);
  54. $this->mBufferEvent->close();
  55. $this->mBufferEvent->free();
  56. $this->mBufferEvent = null;
  57. $this->mParent->remove($this->mFd);
  58. $this->mParent = null;
  59. $this->mProcessor = null;
  60. }
  61. }
  62. public function write($data)
  63. {
  64. $scope = new scope_trace(__METHOD__);
  65. return $this->mBufferEvent->write($data);
  66. }
  67. private function proc()
  68. {
  69. $content = &$this->mContent;
  70. $start = 0;
  71. $left = strlen($content);
  72. while($left > self::body_header_len)
  73. {
  74. $header = substr($content,$start,self::body_header_len);
  75. if(!is_numeric($header)) {
  76. $this->mBufferEvent->close();
  77. return;
  78. }
  79. $body_len = intval($header);
  80. if($body_len == 0) { //这是一个心跳包
  81. $start += self::body_header_len;
  82. $left -= self::body_header_len;
  83. }
  84. elseif($left >= self::body_header_len + $body_len)
  85. {
  86. $body = substr($content,$start + self::body_header_len,$body_len);
  87. $this->mProcessor->onRequest($this->mFd,$body);
  88. $start += self::body_header_len + $body_len;
  89. $left -= self::body_header_len + $body_len;
  90. }
  91. else {
  92. break;
  93. }
  94. }
  95. if($start > 0)
  96. {
  97. $str = substr($content,$start);
  98. if($str === false) {
  99. $this->mContent = '';
  100. }
  101. else {
  102. $this->mContent = $str;
  103. }
  104. }
  105. }
  106. }
  107. class EventLooper
  108. {
  109. protected $mEvBase;
  110. protected $mEvSignals;
  111. protected $mEvTimeouts;
  112. protected $mListener;
  113. protected $mReaders;
  114. protected $mProcessor;
  115. public function __construct()
  116. {
  117. $this->mEvSignals = [];
  118. $this->mEvTimeouts = [];
  119. $this->mReaders = [];
  120. $this->mEvBase = new EventBase();
  121. }
  122. public function init(IProcessor $processor)
  123. {
  124. $this->mProcessor = $processor;
  125. $this->mProcessor->onStart();
  126. }
  127. public function run_loop()
  128. {
  129. $scope = new scope_trace(__METHOD__);
  130. // $this->add_signal(SIGINT);
  131. // $this->add_signal(SIGQUIT);
  132. // $this->add_signal(SIGTERM);
  133. Log::record('EventLooper start run loop....',Log::DEBUG);
  134. // $this->mEvBase->loop();
  135. $this->mEvBase->dispatch();
  136. }
  137. public function add_listen($stream)
  138. {
  139. $this->mListener = new EventListener($this->mEvBase, [$this, 'onAccept'], $this->mEvBase,
  140. EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $stream);
  141. if (!$this->mListener) {
  142. Log::record("Couldn't create listener",Log::ERR);
  143. return false;
  144. }
  145. $this->mListener->setErrorCallback([$this, 'onAcceptError']);
  146. }
  147. private function add_signal($val)
  148. {
  149. $base = $this->mEvBase;
  150. $this->mEvSignals[$val] = Event::signal($base, $val, [$this, 'onSignal']);
  151. $this->mEvSignals[$val]->add();
  152. }
  153. public function onSignal($sig, $flag)
  154. {
  155. $scope = new scope_trace(__METHOD__);
  156. Log::record("ev_signal sig={$sig},flag={$flag}", Log::DEBUG);
  157. $this->mEvBase->exit(NULL);
  158. }
  159. public function onAccept($listener, $fd, $address, $ctx)
  160. {
  161. $scope = new scope_trace(__METHOD__);
  162. Log::record("fd = {$fd}",Log::DEBUG);
  163. $this->mReaders[$fd] = new SocketReader($this->mEvBase, $fd,$this->mProcessor,$this);
  164. }
  165. public function onAcceptError($listener, $ctx)
  166. {
  167. $scope = new scope_trace(__METHOD__);
  168. $errno = EventUtil::getLastSocketErrno();
  169. $errstr = EventUtil::getLastSocketError();
  170. Log::record("Got an error {$errno} {$errstr} on the listener shutting down.");
  171. $this->mEvBase->exit(NULL);
  172. }
  173. public function write($bufid,$data)
  174. {
  175. $scope = new scope_trace(__METHOD__);
  176. if(!is_string($data)) {
  177. Log::record(__METHOD__ . " write data is not string.",Log::ERR);
  178. }
  179. if(array_key_exists($bufid,$this->mReaders)) {
  180. $buffer = $this->mReaders[$bufid];
  181. $header = sprintf("%010d",strlen($data));
  182. $data = $header . $data;
  183. $ret = $buffer->write($data);
  184. if($ret == false) {
  185. Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR);
  186. }
  187. return $ret;
  188. } else {
  189. return false;
  190. }
  191. }
  192. public function close($bufid)
  193. {
  194. $scope = new scope_trace(__METHOD__);
  195. if(!array_key_exists($bufid,$this->mReaders)) {
  196. return false;
  197. }
  198. $buffer = $this->mReaders[$bufid];
  199. $this->mReaders[$bufid] = null;
  200. unset($buffer);
  201. unset($this->mReaders[$bufid]);
  202. if($this->mProcessor != null) {
  203. $this->mProcessor->onClose($bufid);
  204. }
  205. }
  206. public function remove($fd)
  207. {
  208. $scope = new scope_trace(__METHOD__);
  209. Log::record("fd = {$fd}",Log::DEBUG);
  210. unset($this->mReaders[$fd]);
  211. }
  212. }