MerchantWriter.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EMchPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['MerchantWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class MerchantWriter(DataWriteStream):
  9. def write(self, method, params):
  10. flush = True
  11. if method == 'mch_submit':
  12. self._onSubmit(params)
  13. elif method == 'mch_succ':
  14. self._onSucc(params)
  15. elif method == 'mch_fail':
  16. self._onFail(params)
  17. else:
  18. flush = False
  19. # if flush:
  20. # hfive = self.file
  21. # hfive.flush()
  22. def _onSubmit(self, params):
  23. def parse(input):
  24. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  25. mchid, time, spec, card_type, mch_amount = parse(params)
  26. dset, pos = self.path_pos(mchid, time, spec, card_type)
  27. rows = [pos_map.submit_count, pos_map.submit_amounts]
  28. vals = [1, mch_amount]
  29. # dset[pos,rows] += vals
  30. dset[rows,pos] += vals
  31. # dset[pos_map.submit_count, pos] += 1
  32. # dset[pos_map.submit_amounts, pos] += mch_amount
  33. pass
  34. def _onSucc(self, params):
  35. def parse(input):
  36. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount']
  37. mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
  38. dset, pos = self.path_pos(mchid, time, spec, card_type)
  39. rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
  40. vals = [1, mch_amount, channel_amount]
  41. dset[rows, pos] += vals
  42. # dset[pos_map.succ_count, pos] += 1
  43. # dset[pos_map.succ_mch_amounts, pos] += mch_amount
  44. # dset[pos_map.succ_ch_amounts, pos] += channel_amount
  45. pass
  46. def _onFail(self, params):
  47. def parse(input):
  48. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  49. mchid, time, spec, card_type, mch_amount = parse(params)
  50. dset, pos = self.path_pos(mchid, time, spec, card_type)
  51. rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
  52. vals = [1, mch_amount]
  53. dset[pos,rows] += vals
  54. # dset[pos_map.fail_count, pos] += 1
  55. # dset[pos_map.fail_mch_amounts, pos] += mch_amount
  56. pass
  57. def path_pos(self, mchid, time, spec, card_type):
  58. today = day_stamp(time)
  59. path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
  60. log.debug("%s,%s",'MerchantWriter',path)
  61. hfive = self.file
  62. if path not in hfive:
  63. dim = pos_map.dim()
  64. # dset = hfive.create_dataset(path, (86400,dim), chunks=(3600,dim))
  65. # dset[:, :] = np.zeros((86400,dim))
  66. dset = hfive.create_dataset(path, (dim,86400), chunks=(dim,3600))
  67. dset[:, :] = np.zeros((dim,86400))
  68. hfive.flush()
  69. else:
  70. dset = hfive[path]
  71. return dset, time - today