MerchantWriter.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EMchPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['MerchantWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class MerchantWriter(DataWriteStream):
  9. def __init__(self, hfive):
  10. DataWriteStream.__init__(self, hfive)
  11. dim = pos_map.dim()
  12. self._cache = np.zeros((dim, 86400))
  13. def write(self, method, params):
  14. flush = True
  15. if method == 'mch_submit':
  16. self._onSubmit(params)
  17. elif method == 'mch_succ':
  18. self._onSucc(params)
  19. elif method == 'mch_fail':
  20. self._onFail(params)
  21. else:
  22. flush = False
  23. if flush:
  24. hfive = self.file
  25. hfive.flush()
  26. def write_batch(self, batches):
  27. for path, items in batches.items():
  28. self._write_batch(path, items)
  29. def _write_batch(self, path, messages):
  30. def _submit(params):
  31. return params['time'], params['mch_amount']
  32. def _succ(params):
  33. return params['time'], params['mch_amount'], params['channel_amount']
  34. def _fail(params):
  35. return params['time'], params['mch_amount']
  36. def _pos(time):
  37. today = day_stamp(time)
  38. return time - today
  39. dset = self._data_set(path=path)
  40. self._cache[:, :] = dset
  41. for method, params in messages:
  42. if method == 'mch_submit':
  43. time, mch_amount = _submit(params)
  44. pos = _pos(time)
  45. rows = [pos_map.submit_count, pos_map.submit_amounts]
  46. self._cache[rows, pos] += [1, mch_amount]
  47. elif method == 'mch_succ':
  48. time, mch_amount, channel_amount = _succ(params)
  49. pos = _pos(time)
  50. rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
  51. self._cache[rows, pos] += [1, mch_amount, channel_amount]
  52. elif method == 'mch_fail':
  53. time, mch_amount = _fail(params)
  54. pos = _pos(time)
  55. rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
  56. self._cache[rows, pos] += [1, mch_amount]
  57. dset[:, :] = self._cache
  58. hfive = self.file
  59. hfive.flush()
  60. def _onSubmit(self, params):
  61. def parse(params):
  62. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  63. mchid, time, spec, card_type, mch_amount = parse(params)
  64. dset, pos = self.path_pos(mchid, time, spec, card_type)
  65. rows = [pos_map.submit_count, pos_map.submit_amounts]
  66. vals = [1, mch_amount]
  67. dset[rows, pos] += vals
  68. pass
  69. def _onSucc(self, params):
  70. def parse(params):
  71. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount']
  72. mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
  73. dset, pos = self.path_pos(mchid, time, spec, card_type)
  74. rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
  75. vals = [1, mch_amount, channel_amount]
  76. dset[rows, pos] += vals
  77. pass
  78. def _onFail(self, params):
  79. def parse(params):
  80. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  81. mchid, time, spec, card_type, mch_amount = parse(params)
  82. dset, pos = self.path_pos(mchid, time, spec, card_type)
  83. rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
  84. vals = [1, mch_amount]
  85. dset[rows, pos] += vals
  86. pass
  87. def get_path(self, method, params):
  88. def _parse(input):
  89. return input['mchid'], input['time'], input['spec'], input['card_type']
  90. def _path(input):
  91. mchid, time, spec, card_type = _parse(input)
  92. today = day_stamp(time)
  93. path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
  94. return path
  95. if method in ['mch_submit','mch_succ', 'mch_fail']:
  96. path = _path(params)
  97. return path
  98. else:
  99. return None
  100. def path_pos(self, mchid, time, spec, card_type):
  101. today = day_stamp(time)
  102. path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
  103. # log.debug("%s,%s", 'MerchantWriter', path)
  104. dset = self._data_set(path=path)
  105. return dset, time - today
  106. def _data_set(self, path):
  107. hfive = self.file
  108. if path not in hfive:
  109. dim = pos_map.dim()
  110. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 900))
  111. dset[:, :] = np.zeros((dim, 86400))
  112. hfive.flush()
  113. else:
  114. dset = hfive[path]
  115. return dset