ChannelWriter.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EChPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['ChannelWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class ChannelWriter(DataWriteStream):
  9. def write(self, method, params):
  10. self._lock.acquire()
  11. flush = True
  12. if method == 'ch_commit':
  13. self._onCommit(params)
  14. elif method == 'ch_succ':
  15. self._onSucc(params)
  16. elif method == 'ch_fail':
  17. self._onFail(params)
  18. else:
  19. flush = False
  20. if flush:
  21. hfive = self.file
  22. hfive.flush()
  23. self._lock.release()
  24. pass
  25. def _onCommit(self, params):
  26. def parse(input):
  27. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
  28. chname, time, spec, card_type, channel_amount = parse(params)
  29. dset, pos = self.path_pos(chname, time, spec, card_type)
  30. rows = [pos_map.commit_count, pos_map.commit_amounts]
  31. vals = [1, channel_amount]
  32. dset[rows, pos] += vals
  33. # dset[pos_map.commit_count, pos] += 1
  34. # dset[pos_map.commit_amounts, pos] += channel_amount
  35. pass
  36. def _onSucc(self, params):
  37. def parse(input):
  38. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  39. chname, time, spec, card_type, channel_amount, period = parse(params)
  40. dset, pos = self.path_pos(chname, time, spec, card_type)
  41. rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
  42. vals = [1, channel_amount, period]
  43. dset[rows, pos] += vals
  44. # dset[pos_map.succ_count, pos] += 1
  45. # dset[pos_map.succ_amounts, pos] += channel_amount
  46. # dset[pos_map.succ_periods, pos] += period
  47. pass
  48. def _onFail(self, params):
  49. def parse(input):
  50. return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
  51. chname, time, spec, card_type, channel_amount, period = parse(params)
  52. dset, pos = self.path_pos(chname, time, spec, card_type)
  53. rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
  54. vals = [1, channel_amount, period]
  55. dset[rows, pos] += vals
  56. # dset[pos_map.fail_count, pos] += 1
  57. # dset[pos_map.fail_amounts, pos] += channel_amount
  58. # dset[pos_map.fail_periods, pos] += period
  59. pass
  60. def path_pos(self, chname, time, spec, card_type):
  61. today = day_stamp(time)
  62. path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
  63. hfive = self.file
  64. if path not in hfive:
  65. dim = pos_map.dim()
  66. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600))
  67. dset[:, :] = np.zeros((dim, 86400))
  68. hfive.flush()
  69. else:
  70. dset = hfive[path]
  71. return dset, time - today