event_looper.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: stanley-king
  5. * Date: 2018/7/23
  6. * Time: 下午10:56
  7. */
  8. namespace event;
  9. use Log;
  10. class event_looper
  11. {
  12. const body_header_len = 10;
  13. const read_micotime = 6000000000;
  14. const connect_micotime = 5000000;
  15. protected $mEvBase;
  16. protected $mEvSignals;
  17. protected $mEvSockets;
  18. protected $mEvTimeouts;
  19. protected $mStreams;
  20. protected $mContents;
  21. protected $mProcessor;
  22. public function __construct()
  23. {
  24. $this->mStreams = [];
  25. $this->mContents = [];
  26. $this->mEvSignals = [];
  27. $this->mEvSockets = [];
  28. $this->mEvTimeouts = [];
  29. $this->mEvBase = event_base_new();
  30. }
  31. public function init(IProcessor $processor)
  32. {
  33. $this->mProcessor = $processor;
  34. $this->mProcessor->onStart();
  35. }
  36. public function run_loop()
  37. {
  38. $this->add_signal(SIGINT);
  39. $this->add_signal(SIGQUIT);
  40. $this->add_signal(SIGTERM);
  41. $ret = event_base_loop($this->mEvBase);
  42. Log::record("event_base_loop ret={$ret}",Log::DEBUG);
  43. }
  44. public function add_listen($stream)
  45. {
  46. $fd = intval($stream);
  47. if($fd < 0) return false;
  48. $this->mEvSockets[$fd] = event_new();
  49. if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_PERSIST, [$this, 'onAccept']) == false) {
  50. Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
  51. }
  52. if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
  53. Log::record("event_set error EV_READ | EV_PERSIST",Log::DEBUG);
  54. }
  55. if(event_add($this->mEvSockets[$fd]) == false) {
  56. Log::record("event_add error EV_READ | EV_PERSIST",Log::DEBUG);
  57. }
  58. }
  59. private function add_signal($val)
  60. {
  61. $this->mEvSignals[$val] = event_new();
  62. if(event_set($this->mEvSignals[$val], $val, EV_SIGNAL, [$this, 'onSignal']) == false) {
  63. Log::record("event_set error EV_SIGNAL sig={$val}",Log::DEBUG);
  64. }
  65. if(event_base_set($this->mEvSignals[$val], $this->mEvBase) == false) {
  66. Log::record("event_base_set error EV_SIGNAL sig={$val}",Log::DEBUG);
  67. }
  68. if(event_add($this->mEvSignals[$val]) == false) {
  69. Log::record("event_add error EV_SIGNAL sig={$val}",Log::DEBUG);
  70. }
  71. }
  72. public function onSignal($sig, $flag)
  73. {
  74. Log::record("ev_signal sig={$sig},flag={$flag}",Log::DEBUG);
  75. if($sig == SIGINT || $sig == SIGQUIT || $sig == SIGTERM)
  76. {
  77. event_base_loopexit($this->mEvBase);
  78. foreach ($this->mEvTimeouts as $event) {
  79. event_del($event);
  80. event_free($event);
  81. }
  82. $this->mEvTimeouts = [];
  83. foreach ($this->mEvSockets as $fd => $event) {
  84. event_del($event);
  85. event_free($event);
  86. }
  87. $this->mEvSockets = [];
  88. foreach ($this->mStreams as $fd => $stream) {
  89. @socket_shutdown($this->mStreams[$fd],STREAM_SHUT_RDWR);
  90. @socket_close($this->mStreams[$fd]);
  91. }
  92. $this->mStreams = [];
  93. $this->mContents = [];
  94. foreach ($this->mEvSignals as $event) {
  95. event_del($event);
  96. event_free($event);
  97. }
  98. $this->mEvSignals = [];
  99. }
  100. }
  101. public function onAccept($socket, $event)
  102. {
  103. Log::record("onAccept socket_fd={$socket} flag={$event}",Log::DEBUG);
  104. $stream = socket_accept($socket);
  105. if($stream == false) {
  106. Log::record("socket_accept return false socket_fd={$socket}",Log::DEBUG);
  107. return;
  108. }
  109. Log::record("socket_accept stream={$stream}",Log::DEBUG);
  110. $this->add_stream($stream);
  111. }
  112. private function add_stream($stream)
  113. {
  114. $fd = intval($stream);
  115. socket_set_nonblock($stream);
  116. $this->mEvSockets[$fd] = event_new();
  117. $this->mStreams[$fd] = $stream;
  118. $this->mContents[$fd] = '';
  119. if(event_set($this->mEvSockets[$fd], $stream, EV_READ | EV_TIMEOUT, [$this, 'onRead']) == false) {
  120. Log::record("event_set error connect fd={$stream}",Log::DEBUG);
  121. }
  122. if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
  123. Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
  124. }
  125. if(event_add($this->mEvSockets[$fd],self::read_micotime) == false) {
  126. Log::record("event_add error connect fd={$stream}",Log::DEBUG);
  127. }
  128. return $fd;
  129. }
  130. public function block($fd)
  131. {
  132. $stream = $this->mStreams[$fd];
  133. $event = $this->mEvSockets[$fd];
  134. event_del($event);
  135. socket_set_block($stream);
  136. }
  137. public function unblock($fd)
  138. {
  139. $stream = $this->mStreams[$fd];
  140. $event = $this->mEvSockets[$fd];
  141. event_add($event);
  142. socket_set_nonblock($stream);
  143. }
  144. public function onTimer($stream, $event, $params)
  145. {
  146. }
  147. public function onConnect($stream, $event, $params)
  148. {
  149. $error = socket_last_error();
  150. Log::record("onConnect stream={$stream},event={$event},socket error={$error}",Log::DEBUG);
  151. if($event == EV_TIMEOUT)
  152. {
  153. $stream = $this->do_connect($params['host'],$params['port'],$params['args'],$params['stream']);
  154. if($stream != false) {
  155. $fd = $this->add_stream($stream);
  156. $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
  157. }
  158. }
  159. elseif($event == EV_WRITE)
  160. {
  161. $ret = socket_getopt($stream,SOL_SOCKET,SO_ERROR);
  162. if($ret == 0) {
  163. Log::record("onConnect EV_WRITE success ",Log::DEBUG);
  164. $fd = intval($stream);
  165. event_del($this->mEvSockets[$fd]);
  166. event_free($this->mEvSockets[$fd]);
  167. unset($this->mEvSockets[$fd]);
  168. $fd = $this->add_stream($stream);
  169. $this->mProcessor->onConnected($fd, $stream,$params['host'],$params['port'],$params['args']);
  170. }
  171. else {
  172. Log::record("onConnect EV_WRITE Error",Log::DEBUG);
  173. }
  174. }
  175. elseif($event == EV_WRITE | EV_READ)
  176. {
  177. Log::record("onConnect EV_WRITE | EV_READ",Log::DEBUG);
  178. $fd = intval($stream);
  179. event_del($this->mEvSockets[$fd]);
  180. if(event_set($this->mEvSockets[$fd], $stream, EV_TIMEOUT, [$this, 'onConnect'],$params) == false) {
  181. Log::record("event_set error connect fd={$stream}",Log::DEBUG);
  182. }
  183. if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
  184. Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
  185. }
  186. if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
  187. Log::record("event_add error connect fd={$stream}",Log::DEBUG);
  188. }
  189. }
  190. else {
  191. Log::record("onConnect ETIMEOUT",Log::DEBUG);
  192. }
  193. }
  194. private function do_connect($host,$port,$args,$old_stream = false)
  195. {
  196. $stream = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
  197. socket_set_nonblock($stream);
  198. $fd = intval($stream);
  199. if($old_stream == false) {
  200. $this->mEvSockets[$fd] = event_new();
  201. }
  202. else {
  203. $oldfd = intval($old_stream);
  204. socket_close($old_stream);
  205. $this->mEvSockets[$fd] = $this->mEvSockets[$oldfd];
  206. unset($this->mEvSockets[$oldfd]);
  207. }
  208. $ret = socket_connect($stream,$host,$port);
  209. if($ret == false)
  210. {
  211. if(event_set($this->mEvSockets[$fd], $stream, EV_WRITE | EV_READ | EV_TIMEOUT, [$this, 'onConnect'],
  212. ['host' => $host,'port' => $port,'args' => $args]) == false) {
  213. Log::record("event_set error connect fd={$stream}",Log::DEBUG);
  214. }
  215. if(event_base_set($this->mEvSockets[$fd], $this->mEvBase) == false) {
  216. Log::record("event_base_set error connect fd={$stream}",Log::DEBUG);
  217. }
  218. if(event_add($this->mEvSockets[$fd],self::connect_micotime) == false) {
  219. Log::record("event_add error connect fd={$stream}",Log::DEBUG);
  220. }
  221. return false;
  222. }
  223. else {
  224. event_del($this->mEvSockets[$fd]);
  225. event_free($this->mEvSockets[$fd]);
  226. unset($this->mEvSockets[$fd]);
  227. return $stream;
  228. }
  229. }
  230. public function connect($host,$port,$args)
  231. {
  232. $stream = $this->do_connect($host,$port,$args,false);
  233. if($stream != false) {
  234. $bufid = $this->add_stream($stream);
  235. $this->mProcessor->onConnected($bufid, $stream,$host,$port,$args);
  236. }
  237. }
  238. public function onRead($stream, $event)
  239. {
  240. $fd = intval($stream);
  241. if($event == EV_TIMEOUT) {
  242. Log::record("onRead stream={$stream} event=EV_TIMEOUT",Log::DEBUG);
  243. event_add($this->mEvSockets[$fd],self::read_micotime);
  244. }
  245. elseif($event == EV_READ)
  246. {
  247. $read = @socket_read($stream, 1024);
  248. Log::record("onRead read={$read}",Log::DEBUG);
  249. if($read === false) { //WOULD_BLOCK INPROCESS 事件
  250. event_add($this->mEvSockets[$fd],self::read_micotime);
  251. return;
  252. }
  253. else
  254. {
  255. if(strlen($read) == 0) {
  256. $this->close($fd);
  257. }
  258. else {
  259. $this->mContents[$fd] .= $read;
  260. $this->proc($fd);
  261. event_add($this->mEvSockets[$fd],self::read_micotime);
  262. }
  263. }
  264. }
  265. else {
  266. Log::record("onRead stream={$stream} event={$event}",Log::DEBUG);
  267. }
  268. }
  269. private function proc($fd)
  270. {
  271. $content = &$this->mContents[$fd];
  272. $start = 0;
  273. $left = strlen($content);
  274. while($left > self::body_header_len)
  275. {
  276. $header = substr($content,$start,self::body_header_len);
  277. if(!is_numeric($header)) {
  278. $this->close($fd);
  279. return;
  280. }
  281. $body_len = intval($header);
  282. if($body_len == 0) { //这是一个心跳包
  283. $start += self::body_header_len;
  284. $left -= self::body_header_len;
  285. }
  286. elseif($left >= self::body_header_len + $body_len)
  287. {
  288. $body = substr($content,$start + self::body_header_len,$body_len);
  289. $this->mProcessor->onRequest($fd,$body);
  290. $start += self::body_header_len + $body_len;
  291. $left -= self::body_header_len + $body_len;
  292. }
  293. else {
  294. break;
  295. }
  296. }
  297. if($start > 0)
  298. {
  299. $str = substr($content,$start);
  300. if($str === false) {
  301. $this->mContents[$fd] = '';
  302. }
  303. else {
  304. $this->mContents[$fd] = $str;
  305. }
  306. }
  307. }
  308. public function onError($stream, $event)
  309. {
  310. $fd = intval($stream);
  311. if ($event == EV_TIMEOUT)
  312. {
  313. $ret = $this->write($fd,'');
  314. if($ret == false) {
  315. $this->close($fd);
  316. }
  317. }
  318. else
  319. {
  320. Log::record("onError stream={$stream} flag={$event}",Log::DEBUG);
  321. $this->close($fd);
  322. }
  323. }
  324. public function write($fd,$data)
  325. {
  326. if(!is_string($data)) {
  327. Log::record(__METHOD__ . " write data is not string.",Log::ERR);
  328. return false;
  329. }
  330. if(array_key_exists($fd,$this->mStreams)) {
  331. $stream = $this->mStreams[$fd];
  332. $header = sprintf("%010d",strlen($data));
  333. $data = $header . $data;
  334. $ret = socket_write($stream,$data,strlen($data));
  335. return ($ret == strlen($data));
  336. } else {
  337. return false;
  338. }
  339. }
  340. public function close($fd)
  341. {
  342. if(!array_key_exists($fd,$this->mStreams)) {
  343. Log::record(" close socket fd={$fd}.",Log::ERR);
  344. return false;
  345. }
  346. else
  347. {
  348. event_del($this->mEvSockets[$fd]);
  349. event_free($this->mEvSockets[$fd]);
  350. $stream = $this->mStreams[$fd];
  351. @socket_shutdown($stream,STREAM_SHUT_RDWR);
  352. socket_close($stream);
  353. unset($this->mEvSockets[$fd]);
  354. unset($this->mStreams[$fd]);
  355. unset($this->mContents[$fd]);
  356. $this->mProcessor->onClose($fd);
  357. }
  358. }
  359. }