ChannelWriter.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EChPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['ChannelWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class ChannelWriter(DataWriteStream):
  9. def write(self, method, params):
  10. flush = True
  11. if method == 'ch_commit':
  12. self._onCommit(params)
  13. elif method == 'ch_succ':
  14. self._onSucc(params)
  15. elif method == 'ch_fail':
  16. self._onFail(params)
  17. else:
  18. flush = False
  19. if flush:
  20. hfive = self.file
  21. hfive.flush()
  22. pass
  23. def _onCommit(self, params):
  24. def parse(input):
  25. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
  26. chname, time, spec, card_type, channel_amount = parse(params)
  27. dset, pos = self.path_pos(chname, time, spec, card_type)
  28. rows = [pos_map.commit_count, pos_map.commit_amounts]
  29. vals = [1, channel_amount]
  30. dset[rows, pos] += vals
  31. # dset[pos_map.commit_count, pos] += 1
  32. # dset[pos_map.commit_amounts, pos] += channel_amount
  33. pass
  34. def _onSucc(self, params):
  35. def parse(input):
  36. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  37. chname, time, spec, card_type, channel_amount, period = parse(params)
  38. dset, pos = self.path_pos(chname, time, spec, card_type)
  39. rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
  40. vals = [1, channel_amount, period]
  41. dset[rows, pos] += vals
  42. # dset[pos_map.succ_count, pos] += 1
  43. # dset[pos_map.succ_amounts, pos] += channel_amount
  44. # dset[pos_map.succ_periods, pos] += period
  45. pass
  46. def _onFail(self, params):
  47. def parse(input):
  48. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  49. chname, time, spec, card_type, channel_amount, period = parse(params)
  50. dset, pos = self.path_pos(chname, time, spec, card_type)
  51. rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
  52. vals = [1, channel_amount, period]
  53. dset[rows, pos] += vals
  54. # dset[pos_map.fail_count, pos] += 1
  55. # dset[pos_map.fail_amounts, pos] += channel_amount
  56. # dset[pos_map.fail_periods, pos] += period
  57. pass
  58. def path_pos(self, chname, time, spec, card_type):
  59. today = day_stamp(time)
  60. path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
  61. log.debug("%s,%s", 'ChannelWriter', path)
  62. hfive = self.file
  63. if path not in hfive:
  64. dim = pos_map.dim()
  65. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600))
  66. dset[:, :] = np.zeros((dim, 86400))
  67. hfive.flush()
  68. else:
  69. dset = hfive[path]
  70. return dset, time - today