MerchantWriter.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EMchPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['MerchantWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class MerchantWriter(DataWriteStream):
  9. def write(self, method, params):
  10. flush = True
  11. if method == 'mch_submit':
  12. self._onSubmit(params)
  13. elif method == 'mch_succ':
  14. self._onSucc(params)
  15. elif method == 'mch_fail':
  16. self._onFail(params)
  17. else:
  18. flush = False
  19. if flush:
  20. hfive = self.file
  21. hfive.flush()
  22. def _onSubmit(self, params):
  23. def parse(input):
  24. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  25. mchid, time, spec, card_type, mch_amount = parse(params)
  26. dset, pos = self.path_pos(mchid, time, spec, card_type)
  27. rows = [pos_map.submit_count, pos_map.submit_amounts]
  28. vals = [1, mch_amount]
  29. dset[rows, pos] += vals
  30. #数据维度调换试验
  31. # dset[pos,rows] += vals
  32. # dset[pos_map.submit_count, pos] += 1
  33. # dset[pos_map.submit_amounts, pos] += mch_amount
  34. pass
  35. def _onSucc(self, params):
  36. def parse(input):
  37. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount']
  38. mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
  39. dset, pos = self.path_pos(mchid, time, spec, card_type)
  40. rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
  41. vals = [1, mch_amount, channel_amount]
  42. dset[rows, pos] += vals
  43. # dset[pos_map.succ_count, pos] += 1
  44. # dset[pos_map.succ_mch_amounts, pos] += mch_amount
  45. # dset[pos_map.succ_ch_amounts, pos] += channel_amount
  46. pass
  47. def _onFail(self, params):
  48. def parse(input):
  49. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  50. mchid, time, spec, card_type, mch_amount = parse(params)
  51. dset, pos = self.path_pos(mchid, time, spec, card_type)
  52. rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
  53. vals = [1, mch_amount]
  54. dset[rows, pos] += vals
  55. # dset[pos_map.fail_count, pos] += 1
  56. # dset[pos_map.fail_mch_amounts, pos] += mch_amount
  57. pass
  58. def path_pos(self, mchid, time, spec, card_type):
  59. today = day_stamp(time)
  60. path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
  61. log.debug("%s,%s", 'MerchantWriter', path)
  62. hfive = self.file
  63. if path not in hfive:
  64. dim = pos_map.dim()
  65. # 数据维度调换试验
  66. # dset = hfive.create_dataset(path, (86400,dim), chunks=(3600,dim))
  67. # dset[:, :] = np.zeros((86400,dim))
  68. dset = hfive.create_dataset(path, (dim,86400), chunks=(dim,3600))
  69. dset[:, :] = np.zeros((dim,86400))
  70. hfive.flush()
  71. else:
  72. dset = hfive[path]
  73. return dset, time - today