WriterConsumer.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from threading import Thread
  2. from queue import Queue,Empty
  3. import logging
  4. log = logging.getLogger('consumer')
  5. class WriterConsumer(Thread):
  6. def __init__(self, handler, name):
  7. Thread.__init__(self)
  8. self._messages = Queue(1000000)
  9. self._stopped = False
  10. self._handler = handler
  11. self._max_threshold = 50000
  12. self._max_batch_size = 200000
  13. self._name = name
  14. def run(self):
  15. while True:
  16. size = self._messages.qsize()
  17. log.debug("%s messages size=%d", self._name, size)
  18. self._single()
  19. # if size > self._max_threshold:
  20. # self._batch()
  21. # else:
  22. # self._single()
  23. if self._stopped and self._messages.empty():
  24. break
  25. def _batch(self):
  26. log.debug("%s start collect batch count", self._name)
  27. try:
  28. batches = dict()
  29. count = 0;
  30. while count < self._max_batch_size:
  31. method, params = self._messages.get(False)
  32. self._messages.task_done()
  33. path = self._handler.get_path(method, params)
  34. if path is None:
  35. continue
  36. if path not in batches:
  37. batches[path] = list()
  38. batches[path].append((method, params))
  39. count += 1
  40. except Empty as ex:
  41. log.info("%s consumer queue is empty", self._name)
  42. finally:
  43. log.info("%s collect batch count=%d", self._name, count)
  44. if count > 0:
  45. self._handler.write_batch(batches)
  46. log.info("%s write batch count=%d", self._name, count)
  47. def _single(self):
  48. try:
  49. method, params = self._messages.get(timeout=1)
  50. self._messages.task_done()
  51. self._handler.write(method, params)
  52. except Empty as ex:
  53. log.info("%s consumer queue is empty", self._name)
  54. def put(self, method,params):
  55. self._messages.put_nowait((method, params))
  56. def quit(self):
  57. log.debug("%s thread has quit", self._name)
  58. self._stopped = True
  59. self._messages.join()