NetchkWriter.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import ENetPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['NetchkWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class NetchkWriter(DataWriteStream):
  9. def __init__(self, hfive):
  10. DataWriteStream.__init__(self, hfive)
  11. dim = pos_map.dim()
  12. self._cache = np.zeros((dim, 86400))
  13. def write(self, method, params):
  14. flush = True
  15. if method == 'net_succ':
  16. self._onSucc(params)
  17. elif method == 'net_fail':
  18. self._onFail(params)
  19. else:
  20. flush = False
  21. if flush:
  22. hfive = self.file
  23. hfive.flush()
  24. def write_batch(self, batches):
  25. for path, items in batches.items():
  26. self._write_batch(path, items)
  27. def _write_batch(self, path, messages):
  28. def _succ(input):
  29. return input['time']
  30. def _fail(input):
  31. return input['time']
  32. def _pos(time):
  33. today = day_stamp(time)
  34. return time - today
  35. dset = self._data_set(path=path)
  36. self._cache[:, :] = dset
  37. for method, params in messages:
  38. if method == 'net_succ':
  39. time = _succ(params)
  40. pos = _pos(time)
  41. self._cache[pos_map.succ_count, pos] += 1
  42. elif method == 'net_fail':
  43. time = _fail(params)
  44. pos = _pos(time)
  45. self._cache[pos_map.fail_count, pos] += 1
  46. dset[:, :] = self._cache
  47. hfive = self.file
  48. hfive.flush()
  49. def _onSucc(self, params):
  50. def parse(input):
  51. return input['channel_name'], input['time']
  52. chname, time = parse(params)
  53. dset, pos = self.path_pos(chname, time)
  54. dset[pos_map.succ_count, pos] += 1
  55. pass
  56. def _onFail(self, params):
  57. def parse(input):
  58. return input['channel_name'], input['time']
  59. chname, time = parse(params)
  60. dset, pos = self.path_pos(chname, time)
  61. dset[pos_map.fail_count, pos] += 1
  62. pass
  63. def get_path(self, method, params):
  64. def _parse(input):
  65. return input['channel_name'], input['time']
  66. def _path(input):
  67. chname, time = _parse(input)
  68. today = day_stamp(time)
  69. path = f'/{self._version}/{today}/{chname}'
  70. return path
  71. if method in ['net_succ', 'net_fail']:
  72. path = _path(params)
  73. return path
  74. else:
  75. return None
  76. def path_pos(self, chname, time):
  77. today = day_stamp(time)
  78. path = f'/{self._version}/{today}/{chname}'
  79. dset = self._data_set(path=path)
  80. return dset, time - today
  81. def _data_set(self, path):
  82. hfive = self.file
  83. if path not in hfive:
  84. dim = pos_map.dim()
  85. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600))
  86. dset[:, :] = np.zeros((dim, 86400))
  87. hfive.flush()
  88. else:
  89. dset = hfive[path]
  90. return dset