WriterConsumerDeque.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from threading import Thread
  2. from collections import deque
  3. import logging
  4. import time
  5. log = logging.getLogger('consumer')
  6. class WriterConsumer(Thread):
  7. def __init__(self, handler, name):
  8. Thread.__init__(self)
  9. self._messages = deque()
  10. self._stopped = False
  11. self._handler = handler
  12. self._max_threshold = 10000
  13. self._max_batch_size = 200000
  14. self._name = name
  15. def run(self):
  16. while True:
  17. size = len(self._messages)
  18. log.debug("%s messages size=%d", self._name, size)
  19. if size > self._max_threshold:
  20. self._batch()
  21. else:
  22. self._single()
  23. if self._stopped and len(self._messages) == 0:
  24. break
  25. def _batch(self):
  26. log.debug("%s start collect batch count", self._name)
  27. try:
  28. batches = dict()
  29. count = 0;
  30. while count < self._max_batch_size:
  31. method, params = self._messages.pop()
  32. path = self._handler.get_path(method, params)
  33. if path is None:
  34. continue
  35. if path not in batches:
  36. batches[path] = list()
  37. batches[path].append((method, params))
  38. count += 1
  39. except IndexError as ex:
  40. log.info("%s consumer queue is empty", self._name)
  41. finally:
  42. log.info("%s collect batch count=%d", self._name, count)
  43. if count > 0:
  44. self._handler.write_batch(batches)
  45. log.info("%s write batch count=%d", self._name, count)
  46. def _single(self):
  47. try:
  48. method, params = self._messages.pop()
  49. self._handler.write(method, params)
  50. except IndexError as ex:
  51. log.info("%s consumer queue is empty", self._name)
  52. time.sleep(1)
  53. def put(self, method, params):
  54. if self._stopped == False:
  55. self._messages.appendleft((method, params))
  56. def quit(self):
  57. log.debug("%s thread has quit", self._name)
  58. self._stopped = True