NetchkWriter.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import ENetPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['NetchkWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class NetchkWriter(DataWriteStream):
  9. def __init__(self, hfive):
  10. DataWriteStream.__init__(self, hfive)
  11. dim = pos_map.dim()
  12. self._cache = np.zeros((dim, 86400))
  13. def write(self, method, params):
  14. flush = True
  15. if method == 'net_succ':
  16. self._onSucc(params)
  17. elif method == 'net_fail':
  18. self._onFail(params)
  19. else:
  20. flush = False
  21. if flush:
  22. hfive = self.file
  23. hfive.flush()
  24. def write_batch(self, batches):
  25. for path, items in batches.items():
  26. self._write_batch(path, items)
  27. def _write_batch(self, path, messages):
  28. def _succ(input):
  29. return input['time']
  30. def _fail(input):
  31. return input['time']
  32. def _pos(time):
  33. today = day_stamp(time)
  34. return time - today
  35. dset = self._data_set(path=path)
  36. self._cache[:, :] = dset
  37. for method, params in messages:
  38. if method == 'net_succ':
  39. time = _succ(params)
  40. pos = _pos(time)
  41. self._cache[pos_map.succ_count, pos] += 1
  42. elif method == 'net_fail':
  43. time = _fail(params)
  44. pos = _pos(time)
  45. self._cache[pos_map.fail_count, pos] += 1
  46. dset[:, :] = self._cache
  47. hfive = self.file
  48. hfive.flush()
  49. def _onSucc(self, params):
  50. def parse(input):
  51. return input['channel_name'], input['time']
  52. chname, time = parse(params)
  53. dset, pos = self.path_pos(chname, time)
  54. dset[pos_map.succ_count, pos] += 1
  55. pass
  56. def _onFail(self, params):
  57. def parse(input):
  58. return input['channel_name'], input['time']
  59. chname, time = parse(params)
  60. dset, pos = self.path_pos(chname, time)
  61. dset[pos_map.fail_count, pos] += 1
  62. pass
  63. def get_path(self, method, params):
  64. def _path(chname, time):
  65. today = day_stamp(time)
  66. path = f'/{self._version}/{today}/{chname}'
  67. return path
  68. def _succ(input):
  69. return input['channel_name'], input['time']
  70. def _fail(input):
  71. return input['channel_name'], input['time']
  72. if method == 'net_succ':
  73. func = _succ
  74. elif method == 'net_fail':
  75. func = _fail
  76. else:
  77. func = None
  78. if func is not None:
  79. chname, time = func(params)
  80. path = _path(chname, time)
  81. return path
  82. else:
  83. return None
  84. def path_pos(self, chname, time):
  85. today = day_stamp(time)
  86. path = f'/{self._version}/{today}/{chname}'
  87. log.debug("%s,%s", 'NetchkWriter', path)
  88. dset = self._data_set(path=path)
  89. return dset, time - today
  90. def _data_set(self, path):
  91. hfive = self.file
  92. if path not in hfive:
  93. dim = pos_map.dim()
  94. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600))
  95. dset[:, :] = np.zeros((dim, 86400))
  96. hfive.flush()
  97. else:
  98. dset = hfive[path]
  99. return dset