ChannelWriter.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EChPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['ChannelWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class ChannelWriter(DataWriteStream):
  9. def __init__(self, hfive):
  10. DataWriteStream.__init__(self, hfive)
  11. dim = pos_map.dim()
  12. self._cache = np.zeros((dim, 86400))
  13. def write(self, method, params):
  14. flush = True
  15. if method == 'ch_commit':
  16. self._onCommit(params)
  17. elif method == 'ch_succ':
  18. self._onSucc(params)
  19. elif method == 'ch_fail':
  20. self._onFail(params)
  21. else:
  22. flush = False
  23. if flush:
  24. hfive = self.file
  25. hfive.flush()
  26. pass
  27. def write_batch(self, batches):
  28. for path, items in batches.items():
  29. self._write_batch(path, items)
  30. def _write_batch(self, path, messages):
  31. def _commit(input):
  32. return input['time'], input['channel_amount']
  33. def _succ(input):
  34. return input['time'], input['channel_amount'], input['period']
  35. def _fail(input):
  36. return input['time'], input['channel_amount'], input['period']
  37. def _pos(time):
  38. today = day_stamp(time)
  39. return time - today
  40. dset = self._data_set(path=path)
  41. self._cache[:, :] = dset
  42. for method, params in messages:
  43. if method == 'ch_commit':
  44. time, channel_amount = _commit(params)
  45. pos = _pos(time)
  46. rows = [pos_map.commit_count, pos_map.commit_amounts]
  47. self._cache[rows, pos] += [1, channel_amount]
  48. elif method == 'ch_succ':
  49. time, channel_amount, period = _succ(params)
  50. pos = _pos(time)
  51. rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
  52. self._cache[rows, pos] += [1, channel_amount, period]
  53. elif method == 'ch_fail':
  54. time, channel_amount, period = _fail(params)
  55. pos = _pos(time)
  56. rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
  57. self._cache[rows, pos] += [1, channel_amount, period]
  58. dset[:, :] = self._cache
  59. hfive = self.file
  60. hfive.flush()
  61. def _onCommit(self, params):
  62. def parse(input):
  63. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
  64. chname, time, spec, card_type, channel_amount = parse(params)
  65. dset, pos = self.path_pos(chname, time, spec, card_type)
  66. rows = [pos_map.commit_count, pos_map.commit_amounts]
  67. vals = [1, channel_amount]
  68. dset[rows, pos] += vals
  69. pass
  70. def _onSucc(self, params):
  71. def parse(input):
  72. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  73. chname, time, spec, card_type, channel_amount, period = parse(params)
  74. dset, pos = self.path_pos(chname, time, spec, card_type)
  75. rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
  76. vals = [1, channel_amount, period]
  77. dset[rows, pos] += vals
  78. pass
  79. def _onFail(self, params):
  80. def parse(input):
  81. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  82. chname, time, spec, card_type, channel_amount, period = parse(params)
  83. dset, pos = self.path_pos(chname, time, spec, card_type)
  84. rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
  85. vals = [1, channel_amount, period]
  86. dset[rows, pos] += vals
  87. pass
  88. def get_path(self, method, params):
  89. def _parse(input):
  90. return input['channel_name'], input['time'], input['spec'], input['card_type']
  91. def _path(input):
  92. chname, time, spec, card_type = _parse(input)
  93. today = day_stamp(time)
  94. path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
  95. return path
  96. if method in ['ch_commit','ch_succ','ch_fail']:
  97. path = _path(params)
  98. return path
  99. else:
  100. return None
  101. def path_pos(self, chname, time, spec, card_type):
  102. today = day_stamp(time)
  103. path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
  104. # log.debug("%s,%s", 'ChannelWriter', path)
  105. dset = self._data_set(path=path)
  106. return dset, time - today
  107. def _data_set(self, path):
  108. hfive = self.file
  109. if path not in hfive:
  110. dim = pos_map.dim()
  111. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 900))
  112. dset[:, :] = np.zeros((dim, 86400))
  113. hfive.flush()
  114. else:
  115. dset = hfive[path]
  116. return dset