|
@@ -1,30 +1,28 @@
|
|
|
from threading import Thread
|
|
|
-from collections import deque
|
|
|
+from queue import Queue,Empty
|
|
|
import logging
|
|
|
-import time
|
|
|
|
|
|
log = logging.getLogger('consumer')
|
|
|
|
|
|
-
|
|
|
class WriterConsumer(Thread):
|
|
|
def __init__(self, handler, name):
|
|
|
Thread.__init__(self)
|
|
|
- self._messages = deque()
|
|
|
+ self._messages = Queue()
|
|
|
self._stopped = False
|
|
|
self._handler = handler
|
|
|
- self._max_threshold = 10000
|
|
|
+ self._max_threshold = 3000
|
|
|
self._max_batch_size = 200000
|
|
|
self._name = name
|
|
|
|
|
|
def run(self):
|
|
|
while True:
|
|
|
- size = len(self._messages)
|
|
|
+ size = self._messages.qsize()
|
|
|
log.debug("%s messages size=%d", self._name, size)
|
|
|
if size > self._max_threshold:
|
|
|
self._batch()
|
|
|
else:
|
|
|
self._single()
|
|
|
- if self._stopped and len(self._messages) == 0:
|
|
|
+ if self._stopped and self._messages.empty():
|
|
|
break
|
|
|
|
|
|
def _batch(self):
|
|
@@ -33,15 +31,16 @@ class WriterConsumer(Thread):
|
|
|
batches = dict()
|
|
|
count = 0;
|
|
|
while count < self._max_batch_size:
|
|
|
- method, params = self._messages.pop()
|
|
|
+ method, params = self._messages.get(False)
|
|
|
+ self._messages.task_done()
|
|
|
path = self._handler.get_path(method, params)
|
|
|
if path is None:
|
|
|
continue
|
|
|
if path not in batches:
|
|
|
batches[path] = list()
|
|
|
- batches[path].append((method, params))
|
|
|
+ batches[path].append((method,params))
|
|
|
count += 1
|
|
|
- except IndexError as ex:
|
|
|
+ except Empty as ex:
|
|
|
log.info("%s consumer queue is empty", self._name)
|
|
|
finally:
|
|
|
log.info("%s collect batch count=%d", self._name, count)
|
|
@@ -51,16 +50,17 @@ class WriterConsumer(Thread):
|
|
|
|
|
|
def _single(self):
|
|
|
try:
|
|
|
- method, params = self._messages.pop()
|
|
|
+ method, params = self._messages.get(timeout=1)
|
|
|
self._handler.write(method, params)
|
|
|
- except IndexError as ex:
|
|
|
+ self._messages.task_done()
|
|
|
+ except Empty as ex:
|
|
|
log.info("%s consumer queue is empty", self._name)
|
|
|
- time.sleep(1)
|
|
|
|
|
|
- def put(self, method, params):
|
|
|
- if self._stopped == False:
|
|
|
- self._messages.append((method, params))
|
|
|
+ def put(self, method,params):
|
|
|
+ pass
|
|
|
+ # self._messages.put((method, params))
|
|
|
|
|
|
def quit(self):
|
|
|
log.debug("%s thread has quit", self._name)
|
|
|
- self._stopped = True
|
|
|
+ self._stopped = True
|
|
|
+ self._messages.join()
|