zmq.php 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. <?php
  2. $serv = new swoole_server("0.0.0.0", 9501);
  3. $context = new ZMQContext();
  4. $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
  5. $sender->bind("tcp://*:5557");
  6. $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
  7. $receiver->bind("tcp://*:5558");
  8. function onZMQR()
  9. {
  10. global $receiver;
  11. $string = $receiver->recv();
  12. echo $string, PHP_EOL;
  13. }
  14. $serv->set(array(
  15. //'tcp_defer_accept' => 5,
  16. 'worker_num' => 1,
  17. 'reactor_num' => 1,
  18. //'daemonize' => true,
  19. //'log_file' => '/tmp/swoole.log'
  20. ));
  21. $serv->on('workerStart', function($serv, $worker_id) {
  22. global $sender;
  23. global $receiver;
  24. $rfd = $receiver->getsockopt(ZMQ::SOCKOPT_FD);
  25. swoole_event_add($rfd, 'onZMQR', NULL , SWOOLE_EVENT_READ);
  26. echo "worker start\n";
  27. });
  28. $serv->on('connect', function ($serv, $fd, $reactor_id){
  29. echo "[#".posix_getpid()."]\tClient@[$fd:$reactor_id]: Connect.\n";
  30. });
  31. $serv->on('receive', function (swoole_server $serv, $fd, $reactor_id, $data) {
  32. $cmd = trim($data);
  33. echo "[#".posix_getpid()."]\tClient[$fd]: $data\n";
  34. if($cmd == "zmqtest")
  35. {
  36. echo 'aaaaaaaaaaaa'. PHP_EOL;
  37. $sender->send("msg to zmq");
  38. }
  39. $serv->send($fd, 'OK'.PHP_EOL);
  40. //$serv->close($fd);
  41. });
  42. $serv->on('close', function ($serv, $fd, $reactor_id) {
  43. echo "[#".posix_getpid()."]\tClient@[$fd:$reactor_id]: Close.\n";
  44. });
  45. //$serv->start();