WriterConsumerEx.py 977 B

123456789101112131415161718192021222324252627282930313233
  1. from threading import Thread, Condition
  2. from queue import Queue
  3. class WriterConsumerEx(Thread):
  4. def __init__(self, handler):
  5. Thread.__init__(self)
  6. self._cond = Condition()
  7. self._messages = Queue()
  8. self._stopped = False
  9. self._handler = handler
  10. def run(self):
  11. while True:
  12. with self._cond:
  13. while self._messages.empty():
  14. self._cond.wait()
  15. while self._messages.empty() is False:
  16. method, params = self._messages.get()
  17. self._handler.write(method, params)
  18. self._messages.task_done()
  19. if self._stopped == True:
  20. break
  21. def put(self, method,params):
  22. with self._cond:
  23. self._messages.put((method, params))
  24. self._cond.notify()
  25. def quit(self):
  26. with self._cond:
  27. self._stopped = True
  28. self._cond.notify()