from threading import Thread from queue import Queue,Empty import logging log = logging.getLogger('consumer') class WriterConsumer(Thread): def __init__(self, handler, name,threshold): Thread.__init__(self) self._messages = Queue(1000000) self._stopped = False self._handler = handler self._max_threshold = threshold self._max_batch_size = 200000 self._name = name def run(self): while True: size = self._messages.qsize() log.debug("%s messages size=%d", self._name, size) if size > self._max_threshold: self._batch() else: self._single() if self._stopped and self._messages.empty(): break def _batch(self): log.debug("%s start collect batch count", self._name) try: batches = dict() count = 0; while count < self._max_batch_size: method, params = self._messages.get(False) self._messages.task_done() path = self._handler.get_path(method, params) if path is None: continue if path not in batches: batches[path] = list() batches[path].append((method, params)) count += 1 except Empty as ex: log.info("%s consumer queue is empty", self._name) finally: log.info("%s collect batch count=%d", self._name, count) if count > 0: self._handler.write_batch(batches) log.info("%s write batch count=%d", self._name, count) def _single(self): try: method, params = self._messages.get(timeout=1) self._messages.task_done() self._handler.write(method, params) except Empty as ex: log.info("%s consumer queue is empty", self._name) def put(self, method,params): self._messages.put((method, params)) def quit(self): log.debug("%s thread has quit", self._name) self._stopped = True self._messages.join()