EventLooper.php 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. <?php
  2. namespace event;
  3. use Log;
  4. use scope_trace;
  5. use Event;
  6. use EventBase;
  7. use EventListener;
  8. use EventBufferEvent;
  9. use Exception;
  10. class SocketReader
  11. {
  12. const body_header_len = 10;
  13. private $mBufferEvent;
  14. protected $mContent;
  15. private $mProcessor;
  16. private $mFd;
  17. private $mParent;
  18. public function __construct($base,$fd,$processor,$parent)
  19. {
  20. $this->mProcessor = $processor;
  21. $this->mFd = $fd;
  22. $this->mParent = $parent;
  23. $this->mBufferEvent = new EventBufferEvent($base, $fd, EventBufferEvent::OPT_CLOSE_ON_FREE);
  24. $this->mBufferEvent->setCallbacks([$this, "onRead"], NULL, [$this, "onEvent"], NULL);
  25. if (!$this->mBufferEvent->enable(Event::READ)) {
  26. Log::record("Failed to enable READ",Log::ERR);
  27. throw new Exception("Failed to enable READ");
  28. }
  29. }
  30. public function __destruct()
  31. {
  32. $scope = new scope_trace(__METHOD__);
  33. }
  34. public function onRead($bev, $ctx)
  35. {
  36. new scope_trace(__METHOD__);
  37. while($read = $this->mBufferEvent->read(1024)) {
  38. Log::record("read={$read}",Log::DEBUG);
  39. $this->mContent .= $read;
  40. $this->proc();
  41. }
  42. }
  43. public function onEvent($bev, $events, $ctx)
  44. {
  45. $scope = new scope_trace(__METHOD__);
  46. if ($events & EventBufferEvent::ERROR) {
  47. Log::record('Error from bufferevent');
  48. }
  49. if ($events & EventBufferEvent::EOF) {
  50. Log::record('Socker recv EOF.');
  51. }
  52. if ($events & (EventBufferEvent::EOF | EventBufferEvent::ERROR)) {
  53. $this->mBufferEvent->disable(Event::READ);
  54. $this->mBufferEvent->setCallbacks(NULL, NULL, NULL, NULL); //这行加上了,才能让对象释放。
  55. $this->mBufferEvent->free();
  56. $this->mParent->remove($this->mFd);
  57. $this->mParent = null;
  58. $this->mProcessor = null;
  59. }
  60. }
  61. public function write($data)
  62. {
  63. $scope = new scope_trace(__METHOD__);
  64. return $this->mBufferEvent->write($data);
  65. }
  66. private function proc()
  67. {
  68. $content = &$this->mContent;
  69. $start = 0;
  70. $left = strlen($content);
  71. while($left > self::body_header_len)
  72. {
  73. $header = substr($content,$start,self::body_header_len);
  74. if(!is_numeric($header)) {
  75. $this->mBufferEvent->close();
  76. return;
  77. }
  78. $body_len = intval($header);
  79. if($body_len == 0) { //这是一个心跳包
  80. $start += self::body_header_len;
  81. $left -= self::body_header_len;
  82. }
  83. else
  84. {
  85. if($left >= self::body_header_len + $body_len)
  86. {
  87. $body = substr($content,$start + self::body_header_len,$body_len);
  88. $this->mProcessor->onRequest($this->mFd,$body);
  89. $start += self::body_header_len + $body_len;
  90. $left -= self::body_header_len + $body_len;
  91. }
  92. else {
  93. break;
  94. }
  95. }
  96. }
  97. if($start > 0)
  98. {
  99. $str = substr($content,$start);
  100. if($str === false) {
  101. $this->mContent = '';
  102. }
  103. else {
  104. $this->mContent = $str;
  105. }
  106. }
  107. }
  108. }
  109. class EventLooper
  110. {
  111. protected $mEvBase;
  112. protected $mEvSignals;
  113. protected $mEvTimeouts;
  114. protected $mListener;
  115. protected $mReaders;
  116. protected $mProcessor;
  117. public function __construct()
  118. {
  119. $this->mEvSignals = [];
  120. $this->mEvTimeouts = [];
  121. $this->mReaders = [];
  122. $this->mEvBase = new EventBase();
  123. }
  124. public function init(IProcessor $processor)
  125. {
  126. $this->mProcessor = $processor;
  127. $this->mProcessor->onStart();
  128. }
  129. public function run_loop()
  130. {
  131. $this->add_signal(SIGINT);
  132. $this->add_signal(SIGQUIT);
  133. $this->add_signal(SIGTERM);
  134. Log::record('EventLooper start run loop....',Log::DEBUG);
  135. $this->mEvBase->loop();
  136. }
  137. public function add_listen($stream)
  138. {
  139. $this->mListener = new EventListener($this->mEvBase, [$this, 'onAccept'], $this->mEvBase,
  140. EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $stream);
  141. if (!$this->mListener) {
  142. Log::record("Couldn't create listener",Log::ERR);
  143. return false;
  144. }
  145. $this->mListener->setErrorCallback([$this, 'onAcceptError']);
  146. }
  147. private function add_signal($val)
  148. {
  149. $base = $this->mEvBase;
  150. $this->mEvSignals[$val] = Event::signal($base, $val, [$this, 'onSignal']);
  151. $this->mEvSignals[$val]->add();
  152. }
  153. public function onSignal($sig, $flag)
  154. {
  155. $scope = new scope_trace(__METHOD__);
  156. Log::record("ev_signal sig={$sig},flag={$flag}", Log::DEBUG);
  157. $this->mEvBase->exit();
  158. }
  159. public function onAccept($listener, $fd, $address, $ctx)
  160. {
  161. $scope = new scope_trace(__METHOD__);
  162. Log::record("fd = {$fd}",Log::DEBUG);
  163. $this->mReaders[$fd] = new SocketReader($this->mEvBase, $fd,$this->mProcessor,$this);
  164. }
  165. public function onAcceptError($listener, $ctx)
  166. {
  167. $scope = new scope_trace(__METHOD__);
  168. $errno = EventUtil::getLastSocketErrno();
  169. $errstr = EventUtil::getLastSocketError();
  170. Log::record("Got an error {$errno} {$errstr} on the listener shutting down.");
  171. $this->mEvBase->exit(NULL);
  172. }
  173. public function write($bufid,$data)
  174. {
  175. $scope = new scope_trace(__METHOD__);
  176. if(!is_string($data)) {
  177. Log::record(__METHOD__ . " write data is not string.",Log::ERR);
  178. }
  179. if(array_key_exists($bufid,$this->mReaders)) {
  180. $buffer = $this->mReaders[$bufid];
  181. $header = sprintf("%010d",strlen($data));
  182. $data = $header . $data;
  183. $ret = $buffer->write($data);
  184. if($ret == false) {
  185. Log::record(__METHOD__ . " event_buffer_write return false.",Log::ERR);
  186. }
  187. return $ret;
  188. } else {
  189. return false;
  190. }
  191. }
  192. public function close($bufid)
  193. {
  194. $scope = new scope_trace(__METHOD__);
  195. if(!array_key_exists($bufid,$this->mReaders)) {
  196. return false;
  197. }
  198. $buffer = $this->mReaders[$bufid];
  199. $this->mReaders[$bufid] = null;
  200. unset($buffer);
  201. unset($this->mReaders[$bufid]);
  202. if($this->mProcessor != null) {
  203. $this->mProcessor->onClose($bufid);
  204. }
  205. }
  206. public function remove($fd)
  207. {
  208. $scope = new scope_trace(__METHOD__);
  209. Log::record("fd = {$fd}",Log::DEBUG);
  210. unset($this->mReaders[$fd]);
  211. }
  212. }