123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- from threading import Thread
- from collections import deque
- import logging
- import time
- log = logging.getLogger('consumer')
- class WriterConsumer(Thread):
- def __init__(self, handler, name):
- Thread.__init__(self)
- self._messages = deque()
- self._stopped = False
- self._handler = handler
- self._max_threshold = 10000
- self._max_batch_size = 200000
- self._name = name
- def run(self):
- while True:
- size = len(self._messages)
- log.debug("%s messages size=%d", self._name, size)
- if size > self._max_threshold:
- self._batch()
- else:
- self._single()
- if self._stopped and len(self._messages) == 0:
- break
- def _batch(self):
- log.debug("%s start collect batch count", self._name)
- try:
- batches = dict()
- count = 0;
- while count < self._max_batch_size:
- method, params = self._messages.pop()
- path = self._handler.get_path(method, params)
- if path is None:
- continue
- if path not in batches:
- batches[path] = list()
- batches[path].append((method, params))
- count += 1
- except IndexError as ex:
- log.info("%s consumer queue is empty", self._name)
- finally:
- log.info("%s collect batch count=%d", self._name, count)
- if count > 0:
- self._handler.write_batch(batches)
- log.info("%s write batch count=%d", self._name, count)
- def _single(self):
- try:
- method, params = self._messages.pop()
- self._handler.write(method, params)
- except IndexError as ex:
- log.info("%s consumer queue is empty", self._name)
- time.sleep(1)
- def put(self, method, params):
- if self._stopped == False:
- self._messages.appendleft((method, params))
- def quit(self):
- log.debug("%s thread has quit", self._name)
- self._stopped = True
|