123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- from redis.client import Redis
- from .DataStream import EMchPosmap as pos_map, span_days
- from .MerchantReader import MerchantReader
- from .algorithm import calc_morder_lack
- import time as time
- import logging
- import json
- import redis
- logger = logging.getLogger('merchantcalc')
- def detail_paths(reader: MerchantReader, tuple_pathes: dict, days: list):
- count = len(days)
- for mchid, tup in tuple_pathes.items():
- for _card_type, _spec in tup:
- detail_datas = reader.init_data(count)
- for i, day in enumerate(days):
- data = reader.read(day, mchid, _card_type, _spec)
- if data is not None:
- column_pos = i * 86400
- view = detail_datas[:, column_pos:column_pos + 86400]
- view += data
- yield mchid, _card_type, _spec, detail_datas
- def earliest_time(rclient: redis.client):
- result = {}
- min_time = int(time.time())
- data = rclient.get('nc_refill-stat-earliest-ordertime')
- if data is not None:
- mchid_times = json.loads(data)
- for mchid,order_time in mchid_times.items():
- mchid = int(mchid)
- order_time = int(order_time)
- result[mchid] = order_time
- if order_time < min_time:
- min_time = order_time
- return result, min_time
- else:
- return result, min_time
- def mixed_ratio(rclient: redis.client):
- result = list()
- data = rclient.get('nc_refill-stat-merchant-mixed')
- if data is not None:
- mch_cfgs = json.loads(data)
- for mchid,val in mch_cfgs.items():
- mchid = int(mchid)
- period = int(val['period'])
- formula = val['profit_formula'].strip()
- result.append((mchid, period, formula))
- return result
- class MerchantCalc(object):
- def __init__(self):
- self._mQuit = False
- self._mRHost = ''
- self._mRPort = 6379
- pass
- def set_redis(self, rhost, rport):
- self._mRHost = rhost
- self._mRPort = rport
- def run(self):
- def redis_client():
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
- client = redis.Redis(connection_pool=pool)
- return client
- def calc_send_amounts(client):
- send_amounts = self._send_amounts(client)
- val = json.dumps({'send_amounts': send_amounts, 'time': int(time.time())})
- client.set('nc_refill-stat-merchant-sendamount',val)
- pass
- def calc_profit_ratio(client):
- profit_ratio = self._profit_ratio(client)
- pass
- client = None
- loop = 0;
- while self._mQuit == False:
- try:
- if client is None:
- client = redis_client()
- #每10秒计算一次欠费金额
- # if loop % 10 == 0:
- # calc_send_amounts(client)
- if loop % 2 == 0:
- calc_profit_ratio(client)
- pass
- except redis.RedisError as ex:
- logger.error(ex)
- except Exception as ex:
- logger.error(ex)
- finally:
- time.sleep(1)
- loop += 1
- def calc_time(self, start_time: int, end_time: int):
- reader = MerchantReader()
- end_time = reader.near_stamp(end_time, False)
- if end_time is None:
- raise Exception('end_time data is empty')
- start_time = reader.near_stamp(start_time, True)
- if start_time is None:
- raise Exception('start_time data is empty')
- strtime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))
- logger.debug("near_stamp start_time %s end_time=%s", strtime(start_time), strtime(end_time))
- if start_time >= end_time:
- raise Exception('start_time equal endtime')
- days = span_days(start_time, end_time)
- strtime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))
- sdays = [strtime(day) for day in days]
- logger.debug(sdays)
- return reader, days, start_time, end_time
- pass
- def _send_amounts(self, rclient):
- mchid_times,earliest = earliest_time(rclient)
- end_time = int(time.time())
- reader, days, start_time, end_time = self.calc_time(earliest, end_time)
- day_stamp = days[0]
- tuple_pathes = reader.many_tuple_path(days)
- gen = detail_paths(reader, tuple_pathes, days)
- mamounts = dict()
- for _mchid, _card_type, _spec, _data in gen:
- if _mchid not in mchid_times:
- continue
- else:
- _start_time = mchid_times[_mchid]
- send_amounts, lack_amounts = calc_morder_lack(_data, pos_map, start_time - day_stamp, end_time - day_stamp)
- if _mchid not in mamounts:
- mamounts[_mchid] = {'send_amounts': send_amounts, 'lack_amounts': lack_amounts}
- else:
- mamounts[_mchid]['send_amounts'] += send_amounts
- mamounts[_mchid]['lack_amounts'] += lack_amounts
- result = dict()
- for _mchid, _val in mamounts.items():
- _send_amounts = round(_val['send_amounts'], 2)
- _lack_amounts = round(_val['lack_amounts'], 2)
- result[_mchid] = {'send_amounts': _send_amounts, 'lack_amounts': _lack_amounts}
- logger.debug(result)
- return result
- def _profit_ratio(self, rclient):
- mixed_ratios = mixed_ratio(rclient)
- for mchid, period, formula in mixed_ratios:
- end_time = int(time.time())
- reader, days, start_time, end_time = self.calc_time(end_time - period, end_time)
- day_stamp = days[0]
- tuple_pathes = reader.many_tuple_path(days, mchids={mchid})
- gen = detail_paths(reader, tuple_pathes, days)
- pass
|