NetchkWriter.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import ENetPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['NetchkWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class NetchkWriter(DataWriteStream):
  9. def write(self, method, params):
  10. self._lock.acquire()
  11. flush = True
  12. if method == 'net_succ':
  13. self._onSucc(params)
  14. elif method == 'net_fail':
  15. self._onFail(params)
  16. else:
  17. flush = False
  18. if flush:
  19. hfive = self.file
  20. hfive.flush()
  21. self._lock.release()
  22. def _onSucc(self, params):
  23. def parse(input):
  24. return input['channel_name'], input['time']
  25. chname, time = parse(params)
  26. dset, pos = self.path_pos(chname, time)
  27. dset[pos_map.succ_count, pos] += 1
  28. pass
  29. def _onFail(self, params):
  30. def parse(input):
  31. return input['channel_name'], input['time']
  32. chname, time = parse(params)
  33. dset, pos = self.path_pos(chname, time)
  34. dset[pos_map.fail_count, pos] += 1
  35. pass
  36. def path_pos(self, chname, time):
  37. today = day_stamp(time)
  38. path = f'/{self._version}/{today}/{chname}'
  39. log.debug("%s,%s", 'NetchkWriter', path)
  40. hfive = self.file
  41. if path not in hfive:
  42. dim = pos_map.dim()
  43. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600))
  44. dset[:, :] = np.zeros((dim, 86400))
  45. hfive.flush()
  46. else:
  47. dset = hfive[path]
  48. return dset, time - today
  49. pass