WriterConsumer.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from threading import Thread
  2. from queue import Queue,Empty
  3. import logging
  4. log = logging.getLogger('consumer')
  5. class WriterConsumer(Thread):
  6. def __init__(self, handler, name,threshold):
  7. Thread.__init__(self)
  8. self._messages = Queue(1000000)
  9. self._stopped = False
  10. self._handler = handler
  11. self._max_threshold = threshold
  12. self._max_batch_size = 200000
  13. self._name = name
  14. def run(self):
  15. while True:
  16. size = self._messages.qsize()
  17. log.debug("%s messages size=%d", self._name, size)
  18. if size > self._max_threshold:
  19. self._batch()
  20. else:
  21. self._single()
  22. if self._stopped and self._messages.empty():
  23. break
  24. def _batch(self):
  25. log.debug("%s start collect batch count", self._name)
  26. try:
  27. batches = dict()
  28. count = 0;
  29. while count < self._max_batch_size:
  30. method, params = self._messages.get(False)
  31. self._messages.task_done()
  32. path = self._handler.get_path(method, params)
  33. if path is None:
  34. continue
  35. if path not in batches:
  36. batches[path] = list()
  37. batches[path].append((method, params))
  38. count += 1
  39. except Empty as ex:
  40. log.info("%s consumer queue is empty", self._name)
  41. finally:
  42. log.info("%s collect batch count=%d", self._name, count)
  43. if count > 0:
  44. self._handler.write_batch(batches)
  45. log.info("%s write batch count=%d", self._name, count)
  46. def _single(self):
  47. try:
  48. method, params = self._messages.get(timeout=1)
  49. self._messages.task_done()
  50. self._handler.write(method, params)
  51. except Empty as ex:
  52. log.info("%s consumer queue is empty", self._name)
  53. def put(self, method,params):
  54. self._messages.put((method, params))
  55. def quit(self):
  56. log.debug("%s thread has quit", self._name)
  57. self._stopped = True
  58. self._messages.join()