from threading import Thread from collections import deque import logging import time log = logging.getLogger('consumer') class WriterConsumer(Thread): def __init__(self, handler, name): Thread.__init__(self) self._messages = deque() self._stopped = False self._handler = handler self._max_threshold = 10000 self._max_batch_size = 200000 self._name = name def run(self): while True: size = len(self._messages) log.debug("%s messages size=%d", self._name, size) if size > self._max_threshold: self._batch() else: self._single() if self._stopped and len(self._messages) == 0: break def _batch(self): log.debug("%s start collect batch count", self._name) try: batches = dict() count = 0; while count < self._max_batch_size: method, params = self._messages.pop() path = self._handler.get_path(method, params) if path is None: continue if path not in batches: batches[path] = list() batches[path].append((method, params)) count += 1 except IndexError as ex: log.info("%s consumer queue is empty", self._name) finally: log.info("%s collect batch count=%d", self._name, count) if count > 0: self._handler.write_batch(batches) log.info("%s write batch count=%d", self._name, count) def _single(self): try: method, params = self._messages.pop() self._handler.write(method, params) except IndexError as ex: log.info("%s consumer queue is empty", self._name) time.sleep(1) def put(self, method, params): if self._stopped == False: self._messages.appendleft((method, params)) def quit(self): log.debug("%s thread has quit", self._name) self._stopped = True