ChannelWriter.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EChPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['ChannelWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class ChannelWriter(DataWriteStream):
  9. def write(self, method, params):
  10. if method == 'ch_commit':
  11. self._onCommit(params)
  12. elif method == 'ch_succ':
  13. self._onSucc(params)
  14. elif method == 'ch_fail':
  15. self._onFail(params)
  16. else:
  17. pass
  18. def _onCommit(self, params):
  19. def parse(input):
  20. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
  21. chname, time, spec, card_type, channel_amount = parse(params)
  22. path, pos = self.path_pos(chname, time, spec, card_type)
  23. self.file[path][pos_map.commit_count, pos] += 1
  24. self.file[path][pos_map.commit_amounts, pos] += channel_amount
  25. pass
  26. def _onSucc(self, params):
  27. def parse(input):
  28. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  29. chname, time, spec, card_type, channel_amount, period = parse(params)
  30. path, pos = self.path_pos(chname, time, spec, card_type)
  31. self.file[path][pos_map.succ_count, pos] += 1
  32. self.file[path][pos_map.succ_amounts, pos] += channel_amount
  33. self.file[path][pos_map.succ_periods, pos] += period
  34. pass
  35. def _onFail(self, params):
  36. def parse(input):
  37. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  38. chname, time, spec, card_type, channel_amount, period = parse(params)
  39. path, pos = self.path_pos(chname, time, spec, card_type)
  40. self.file[path][pos_map.fail_count, pos] += 1
  41. self.file[path][pos_map.fail_amounts, pos] += channel_amount
  42. self.file[path][pos_map.fail_periods, pos] += period
  43. pass
  44. def path_pos(self, chname, time, spec, card_type):
  45. today = day_stamp(time)
  46. path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
  47. hfive = self.file
  48. if path not in hfive:
  49. dim = pos_map.dim()
  50. hfive[path] = np.zeros((dim, 86400))
  51. return path, time - today