co.cc 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. #include <iostream>
  2. #include <list>
  3. #include <algorithm>
  4. #include <vector>
  5. #include "swoole_coroutine.h"
  6. #include "swoole_coroutine_socket.h"
  7. #include "swoole_coroutine_system.h"
  8. using swoole::Coroutine;
  9. using swoole::coroutine::Socket;
  10. using swoole::coroutine::System;
  11. using namespace std;
  12. list<string> q;
  13. list<Socket *> slaves;
  14. size_t qs;
  15. int main(int argc, char **argv) {
  16. swoole_event_init(SW_EVENTLOOP_WAIT_EXIT);
  17. signal(SIGPIPE, SIG_IGN);
  18. Coroutine::create([](void *arg) {
  19. System::sleep(2.0);
  20. cout << "CO-1, sleep 2\n";
  21. });
  22. Coroutine::create([](void *arg) {
  23. System::sleep(1);
  24. cout << "CO-2, sleep 1\n";
  25. });
  26. Coroutine::create([](void *arg) {
  27. cout << "CO-3, listen tcp:0.0.0.0:9001\n";
  28. Socket s(SW_SOCK_TCP);
  29. s.bind("0.0.0.0", 9001);
  30. s.listen();
  31. while (1) {
  32. Socket *_client = s.accept();
  33. Coroutine::create(
  34. [](void *arg) {
  35. Socket *client = (Socket *) arg;
  36. while (1) {
  37. char buf[1024];
  38. auto retval = client->recv(buf, sizeof(buf));
  39. if (retval == 0) {
  40. cout << "connection close\n";
  41. break;
  42. } else {
  43. if (strncasecmp("push", buf, 4) == 0) {
  44. q.push_back(string(buf + 5, retval - 5));
  45. qs += retval - 5;
  46. string resp("OK\n");
  47. client->send(resp.c_str(), resp.length());
  48. for (auto it = slaves.begin(); it != slaves.end();) {
  49. auto sc = *it;
  50. auto n = sc->send(buf, retval);
  51. if (n <= 0) {
  52. it = slaves.erase(it);
  53. delete sc;
  54. } else {
  55. it++;
  56. }
  57. }
  58. } else if (strncasecmp("pop", buf, 3) == 0) {
  59. if (q.empty()) {
  60. string resp("EMPTY\n");
  61. client->send(resp.c_str(), resp.length());
  62. } else {
  63. auto data = q.front();
  64. q.pop_front();
  65. qs -= data.length();
  66. client->send(data.c_str(), data.length());
  67. }
  68. } else if (strncasecmp("stat", buf, 4) == 0) {
  69. char stat_buf[64];
  70. int n = snprintf(stat_buf, sizeof(stat_buf), "count=%ld,bytes=%ld\n", q.size(), qs);
  71. client->send(stat_buf, n);
  72. } else {
  73. string resp("ERROR\n");
  74. client->send(resp.c_str(), resp.length());
  75. }
  76. }
  77. }
  78. delete client;
  79. },
  80. _client);
  81. }
  82. });
  83. Coroutine::create([](void *arg) {
  84. Socket s(SW_SOCK_TCP);
  85. s.bind("0.0.0.0", 9002);
  86. s.listen();
  87. while (1) {
  88. Socket *_client = s.accept();
  89. for (auto data : q) {
  90. _client->send(data.c_str(), data.length());
  91. }
  92. slaves.push_back(_client);
  93. }
  94. });
  95. swoole_event_wait();
  96. return 0;
  97. }