MerchantCalc.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from redis.client import Redis
  2. from .DataStream import EMchPosmap as pos_map, span_days
  3. from .MerchantReader import MerchantReader
  4. from .algorithm import calc_morder_lack
  5. import time as time
  6. import logging
  7. import json
  8. import redis
  9. logger = logging.getLogger('merchantcalc')
  10. def detail_paths(reader: MerchantReader, tuple_pathes: dict, days: list):
  11. count = len(days)
  12. for mchid, tup in tuple_pathes.items():
  13. for _card_type, _spec in tup:
  14. detail_datas = reader.init_data(count)
  15. for i, day in enumerate(days):
  16. data = reader.read(day, mchid, _card_type, _spec)
  17. if data is not None:
  18. column_pos = i * 86400
  19. view = detail_datas[:, column_pos:column_pos + 86400]
  20. view += data
  21. yield mchid, _card_type, _spec, detail_datas
  22. def earliest_time(rclient: redis.client):
  23. result = {}
  24. min_time = int(time.time())
  25. data = rclient.get('nc_refill-stat-earliest-ordertime')
  26. if data is not None:
  27. mchid_times = json.loads(data)
  28. for mchid,order_time in mchid_times.items():
  29. mchid = int(mchid)
  30. order_time = int(order_time)
  31. result[mchid] = order_time
  32. if order_time < min_time:
  33. min_time = order_time
  34. return result, min_time
  35. else:
  36. return result, min_time
  37. def mixed_ratio(rclient: redis.client):
  38. result = list()
  39. data = rclient.get('nc_refill-stat-merchant-mixed')
  40. if data is not None:
  41. mch_cfgs = json.loads(data)
  42. for mchid,val in mch_cfgs.items():
  43. mchid = int(mchid)
  44. period = int(val['period'])
  45. formula = val['profit_formula'].strip()
  46. result.append((mchid, period, formula))
  47. return result
  48. class MerchantCalc(object):
  49. def __init__(self):
  50. self._mQuit = False
  51. self._mRHost = ''
  52. self._mRPort = 6379
  53. pass
  54. def set_redis(self, rhost, rport):
  55. self._mRHost = rhost
  56. self._mRPort = rport
  57. def run(self):
  58. def redis_client():
  59. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  60. client = redis.Redis(connection_pool=pool)
  61. return client
  62. def calc_send_amounts(client):
  63. send_amounts = self._send_amounts(client)
  64. val = json.dumps({'send_amounts': send_amounts, 'time': int(time.time())})
  65. client.set('nc_refill-stat-merchant-sendamount',val)
  66. pass
  67. def calc_profit_ratio(client):
  68. profit_ratio = self._profit_ratio(client)
  69. pass
  70. client = None
  71. loop = 0;
  72. while self._mQuit == False:
  73. try:
  74. if client is None:
  75. client = redis_client()
  76. #每10秒计算一次欠费金额
  77. # if loop % 10 == 0:
  78. # calc_send_amounts(client)
  79. if loop % 2 == 0:
  80. calc_profit_ratio(client)
  81. pass
  82. except redis.RedisError as ex:
  83. logger.error(ex)
  84. except Exception as ex:
  85. logger.error(ex)
  86. finally:
  87. time.sleep(1)
  88. loop += 1
  89. def calc_time(self, start_time: int, end_time: int):
  90. reader = MerchantReader()
  91. end_time = reader.near_stamp(end_time, False)
  92. if end_time is None:
  93. raise Exception('end_time data is empty')
  94. start_time = reader.near_stamp(start_time, True)
  95. if start_time is None:
  96. raise Exception('start_time data is empty')
  97. strtime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))
  98. logger.debug("near_stamp start_time %s end_time=%s", strtime(start_time), strtime(end_time))
  99. if start_time >= end_time:
  100. raise Exception('start_time equal endtime')
  101. days = span_days(start_time, end_time)
  102. strtime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))
  103. sdays = [strtime(day) for day in days]
  104. logger.debug(sdays)
  105. return reader, days, start_time, end_time
  106. pass
  107. def _send_amounts(self, rclient):
  108. mchid_times,earliest = earliest_time(rclient)
  109. end_time = int(time.time())
  110. reader, days, start_time, end_time = self.calc_time(earliest, end_time)
  111. day_stamp = days[0]
  112. tuple_pathes = reader.many_tuple_path(days)
  113. gen = detail_paths(reader, tuple_pathes, days)
  114. mamounts = dict()
  115. for _mchid, _card_type, _spec, _data in gen:
  116. if _mchid not in mchid_times:
  117. continue
  118. else:
  119. _start_time = mchid_times[_mchid]
  120. send_amounts, lack_amounts = calc_morder_lack(_data, pos_map, start_time - day_stamp, end_time - day_stamp)
  121. if _mchid not in mamounts:
  122. mamounts[_mchid] = {'send_amounts': send_amounts, 'lack_amounts': lack_amounts}
  123. else:
  124. mamounts[_mchid]['send_amounts'] += send_amounts
  125. mamounts[_mchid]['lack_amounts'] += lack_amounts
  126. result = dict()
  127. for _mchid, _val in mamounts.items():
  128. _send_amounts = round(_val['send_amounts'], 2)
  129. _lack_amounts = round(_val['lack_amounts'], 2)
  130. result[_mchid] = {'send_amounts': _send_amounts, 'lack_amounts': _lack_amounts}
  131. logger.debug(result)
  132. return result
  133. def _profit_ratio(self, rclient):
  134. mixed_ratios = mixed_ratio(rclient)
  135. for mchid, period, formula in mixed_ratios:
  136. end_time = int(time.time())
  137. reader, days, start_time, end_time = self.calc_time(end_time - period, end_time)
  138. day_stamp = days[0]
  139. tuple_pathes = reader.many_tuple_path(days, mchids={mchid})
  140. gen = detail_paths(reader, tuple_pathes, days)
  141. pass