MerchantWriter.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataWriteStream, day_stamp
  3. from .DataStream import EMchPosmap as pos_map
  4. import numpy as np
  5. __all__ = ['MerchantWriter']
  6. import logging
  7. log = logging.getLogger('writer')
  8. class MerchantWriter(DataWriteStream):
  9. def write(self, method, params):
  10. self._lock.acquire()
  11. flush = True
  12. if method == 'mch_submit':
  13. self._onSubmit(params)
  14. elif method == 'mch_succ':
  15. self._onSucc(params)
  16. elif method == 'mch_fail':
  17. self._onFail(params)
  18. else:
  19. flush = False
  20. if flush:
  21. hfive = self.file
  22. hfive.flush()
  23. self._lock.release()
  24. def _onSubmit(self, params):
  25. def parse(input):
  26. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  27. mchid, time, spec, card_type, mch_amount = parse(params)
  28. dset, pos = self.path_pos(mchid, time, spec, card_type)
  29. rows = [pos_map.submit_count, pos_map.submit_amounts]
  30. vals = [1, mch_amount]
  31. dset[rows, pos] += vals
  32. # dset[pos_map.submit_count, pos] += 1
  33. # dset[pos_map.submit_amounts, pos] += mch_amount
  34. pass
  35. def _onSucc(self, params):
  36. def parse(input):
  37. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount']
  38. mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
  39. dset, pos = self.path_pos(mchid, time, spec, card_type)
  40. rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
  41. vals = [1, mch_amount, channel_amount]
  42. dset[rows, pos] += vals
  43. # dset[pos_map.succ_count, pos] += 1
  44. # dset[pos_map.succ_mch_amounts, pos] += mch_amount
  45. # dset[pos_map.succ_ch_amounts, pos] += channel_amount
  46. pass
  47. def _onFail(self, params):
  48. def parse(input):
  49. return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
  50. mchid, time, spec, card_type, mch_amount = parse(params)
  51. dset, pos = self.path_pos(mchid, time, spec, card_type)
  52. rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
  53. vals = [1, mch_amount]
  54. dset[rows, pos] += vals
  55. # dset[pos_map.fail_count, pos] += 1
  56. # dset[pos_map.fail_mch_amounts, pos] += mch_amount
  57. pass
  58. def path_pos(self, mchid, time, spec, card_type):
  59. today = day_stamp(time)
  60. path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
  61. log.debug("%s,%s",'MerchantWriter',path)
  62. hfive = self.file
  63. if path not in hfive:
  64. dim = pos_map.dim()
  65. dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600))
  66. dset[:, :] = np.zeros((dim, 86400))
  67. hfive.flush()
  68. else:
  69. dset = hfive[path]
  70. return dset, time - today