123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- import os
- import time as stime
- import redis
- import h5py
- from os import path
- import re
- from datetime import timedelta
- from datetime import datetime
- import numpy as np
- from matplotlib.figure import Figure
- from matplotlib import ticker
- from io import BytesIO
- import logging
- class MchDataCenter(object):
- latest_delta = 10
- pos_map = {
- 'commit': 0, 'success': 1, 'fail': 2
- }
- def __init__(self):
- self._mquit = False
- self._mRHost = ''
- self._mRPort = 6379
- self._file_name = '/var/www/html/data/stdata/user.hdf5'
- def set_redis(self, rhost, rport):
- self._mRHost = rhost
- self._mRPort = rport
- def stop(self):
- self._mquit = True
- pass
- def prepare_data(self):
- while self._mquit == False:
- try:
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
- r = redis.Redis(connection_pool=pool)
- if path.exists(self._file_name):
- hfive = h5py.File(self._file_name, 'a')
- else:
- hfive = h5py.File(self._file_name, 'w')
- latest_time = int(stime.time()) - self.latest_delta
- lt = stime.localtime(latest_time)
- now_str = stime.strftime('%Y-%m-%d %H:%M:%S', lt)
- print('start read',now_str)
- self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
- self.read_redis(hfive, r, 'nc_user_monitor_success', 'success')
- self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail')
- hfive.close()
- self.del_redis(r, 'nc_user_monitor_commit',latest_time)
- self.del_redis(r, 'nc_user_monitor_success',latest_time)
- self.del_redis(r, 'nc_user_monitor_fail',latest_time)
- except Exception as ex:
- print(ex)
- finally:
- stime.sleep(1)
- def del_redis(self, redis, name, latest_time):
- for item in redis.hscan_iter(name):
- key = str(item[0], encoding="utf-8")
- items = re.split(r'-', key)
- flag_del = True
- if len(items) == 5:
- (mchid, quality, card_type, amount, time) = items
- time = int(time)
- if latest_time <= time:
- flag_del = False
- else:
- print('delete one error key:', key)
- if flag_del:
- redis.hdel(name, key)
- pass
- def read_redis(self, hfive, redis, name, prefix):
- for item in redis.hscan_iter(name):
- key = str(item[0], encoding="utf-8")
- val = str(item[1], encoding="utf-8")
- self.parase(hfive, key, val, prefix)
- def parase(self, hfive, text, val, prefix):
- items = re.split(r'-', text)
- if len(items) != 5:
- return False
- (mchid, quality, card_type, amount, time) = items
- pos = self.pos_map[f'{prefix}']
- time = int(time)
- today = self.day_stamp(time)
- path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
- if path not in hfive:
- dim = len(self.pos_map)
- hfive[path] = np.zeros((dim, 86400))
- diff = time - today
- if diff < 0:
- print(diff)
- hfive[path][pos, diff] = val
- print(path, pos, prefix, diff, time, val, hfive[path][pos, diff])
- pass
- def day_stamp(self, stamp):
- stamp = int(stamp)
- x = stime.gmtime(stamp + 8 * 3600)
- diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
- today = stamp - diff.total_seconds()
- return int(today)
- def _days(self, root):
- result = []
- try:
- for name, sub in root.items():
- if isinstance(sub, h5py.Group):
- result.append(name)
- except Exception as ex:
- print(ex)
- finally:
- return result
- def days(self):
- try:
- hfive = h5py.File(self._file_name, 'r')
- root = hfive.require_group('/')
- days = self._days(root)
- hfive.close()
- return days
- except Exception as ex:
- print(ex)
- return []
- def paths(self, time_stamp):
- try:
- day_stamp = self.day_stamp(time_stamp)
- hfive = h5py.File(self._file_name, 'r')
- group = hfive.require_group(f'/{day_stamp}')
- paths = self.dir(group)
- hfive.close()
- return paths
- except Exception as ex:
- print(ex)
- return []
- def dir(self, group):
- result = []
- for name, sub in group.items():
- if isinstance(sub, h5py.Group):
- result.extend(self.dir(sub))
- else:
- result.append(sub.name)
- return result
- def _all_none(self, **kwargs):
- for key, val in kwargs.items():
- if val is not None:
- return False
- return True
- def _merge_path(self,paths):
- result = {}
- for path in paths:
- items = re.split(r'/', path)
- if len(items) != 6:
- continue
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
- _mchid = int(_mchid)
- if _mchid not in result:
- result[_mchid] = []
- result[_mchid].append(path)
- return result
- def draw_plot(self, start_time, interval=300, **kwargs):
- logger = logging.getLogger('app')
- hfive = h5py.File(self._file_name, 'r')
- try:
- day_stamp = self.day_stamp(start_time)
- start_pos = start_time - day_stamp
- cur_day = self.day_stamp(stime.time())
- if day_stamp == cur_day:
- end_pos = int(stime.time()) - day_stamp
- else:
- end_pos = -1
- fig = Figure(figsize=(16, 8))
- ax = fig.subplots()
- dim = len(self.pos_map)
- predata = np.zeros((dim, 86400))
- x = np.arange(0, 86400, interval)
- sub_count = 0
- filer_text, paths = self.datasets(hfive, start_time, **kwargs)
- if self._all_none(**kwargs):
- paths = self._merge_path(paths)
- for mchid, data in self._read_dict_data(hfive, paths):
- predata = predata + data
- path = f'{mchid}'
- ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
- if ret:
- sub_count += 1
- pass
- else:
- for path, data in self.read_data(hfive, paths):
- data = np.array(data)
- predata = predata + data
- ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
- if ret:
- sub_count += 1
- if sub_count > 1:
- self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
- ax.legend()
- ax.grid()
- ax.set_title('success ratio')
- ax.set(xlabel='time', ylabel='ratio')
- fig.autofmt_xdate()
- fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
- buf = BytesIO()
- fig.savefig(buf, format="png")
- return buf
- except Exception as ex:
- print(ex)
- finally:
- hfive.close()
- def read_data(self, hfive, paths):
- for path in paths:
- yield path, hfive[path]
- def _read_dict_data(self, hfive, mchPaths):
- for mchid, paths in mchPaths.items():
- dim = len(self.pos_map)
- predata = np.zeros((dim, 86400))
- for path in paths:
- predata += hfive[path]
- yield mchid, predata
- def datasets(self, hfive, start_time, **kwargs):
- logger = logging.getLogger('app')
- day_stamp = self.day_stamp(start_time)
- sday = f'{day_stamp}'
- root = hfive.require_group('/')
- days = self._days(root)
- if sday not in days:
- return False
- group = hfive.require_group(sday)
- dsets = self.dir(group)
- mchid = quality = card_type = amount = None
- for key, val in kwargs.items():
- if val is None:
- continue
- if key == 'mchid':
- mchid = val
- elif key == 'quality':
- quality = f'{val}'
- elif key == 'card_type':
- card_type = f'{val}'
- elif key == 'amount':
- amount = f'{val}'
- else:
- continue
- return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
- def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
- filer_text = ''
- if mchid is not None:
- filer_text = mchid
- if quality is not None:
- filer_text = filer_text + f"-qua:{quality}"
- if card_type is not None:
- filer_text = filer_text + f"-type:{card_type}"
- if amount is not None:
- filer_text = filer_text + f"-amount:{amount}"
- paths = []
- for text in dsets:
- items = re.split(r'/', text)
- if len(items) != 6:
- return False
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
- if (mchid is not None) and (_mchid != mchid):
- continue
- if (quality is not None) and (_quality != quality):
- continue
- if (card_type is not None) and (_card_type != card_type):
- continue
- if (amount is not None) and (_amount != amount):
- continue
- paths.append(text)
- return filer_text, paths
- def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
- # 'commit': 0, 'succ': 1, 'fail': 2
- logging.getLogger('app').debug("path=%s", path)
- all = data[1] + data[2]
- all = all.reshape((-1, interval))
- all = np.sum(all, axis=1)
- commit = data[0]
- commit = commit.reshape((-1, interval))
- commit = np.sum(commit, axis=1)
- ySucc = data[1]
- ySucc = ySucc.reshape((-1, interval))
- ySucc = np.sum(ySucc, axis=1)
- yFail = data[2]
- yFail = yFail.reshape((-1, interval))
- yFail = np.sum(yFail, axis=1)
- if end_pos == -1:
- pos = np.where(x >= start_pos)
- x = x[pos]
- ySucc = ySucc[pos]
- all = all[pos]
- commit = commit[pos]
- yFail = yFail[pos]
- else:
- pos = np.where(start_pos <= x)
- x = x[pos]
- ySucc = ySucc[pos]
- all = all[pos]
- commit = commit[pos]
- yFail = yFail[pos]
- pos = np.where(x < end_pos)
- x = x[pos]
- ySucc = ySucc[pos]
- all = all[pos]
- commit = commit[pos]
- yFail = yFail[pos]
- succ_count = int(np.sum(ySucc))
- all_count = int(np.sum(all))
- commit_count = int(np.sum(commit))
- fail_count = int(np.sum(yFail))
- if all_count < 1:
- return False
- pos = np.where(ySucc > all)
- ySucc[pos] = all[pos]
- ySucc = ySucc / (all + 0.00000001)
- xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
- ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
- ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count,commit_count,fail_count))
- return True
- def _label(self, path, succ_count, all, commit_count, fail_count):
- ratio = 0.00
- if all > 0:
- ratio = round(succ_count * 100 / all, 2)
- items = re.split(r'/', path)
- if len(items) == 6:
- (_, _sday, _chname, _quality, _card_type, _amount) = items
- card_type = ''
- if _card_type == '1':
- card_type = 'SY'
- elif _card_type == '2':
- card_type = 'SH'
- elif _card_type == '4':
- card_type = 'YD'
- elif _card_type == '5':
- card_type = 'LT'
- elif _card_type == '6':
- card_type = 'DX'
- elif _card_type == '7':
- card_type = 'TH'
- return f"{_chname}-{_quality}-{card_type}-{_amount}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
- else:
- if path == '' or path is None:
- path = 'average'
- return f"{path}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
- pass
- #统计机构当前时间之前序列时间成功率
- def _merge_mobile_path(self,paths):
- result = {}
- for path in paths:
- items = re.split(r'/', path)
- if len(items) != 6:
- continue
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
- _card_type = int(_card_type)
- if _card_type not in [4, 5, 6]:
- continue
- _mchid = int(_mchid)
- if _mchid not in result:
- result[_mchid] = []
- result[_mchid].append(path)
- return result
- def _merge_data(self, hfive, paths):
- dim = len(self.pos_map)
- predata = np.zeros((dim, 86400))
- for path in paths:
- predata += hfive[path]
- return predata
- def _calc_mratio(self,data,startes,end):
- succ = data[1]
- fail = data[2]
- x = np.arange(0, 86400, 1)
- result = {}
- for start in startes:
- if end - start < 0:
- start_pos = 0
- else:
- start_pos = end - start
- pos = np.where(x >= start_pos)
- t = x[pos]
- _fail = fail[pos]
- _succ = succ[pos]
- pos = np.where(t < end)
- _fail = _fail[pos]
- _succ = _succ[pos]
- succs = int(np.sum(_succ))
- fails = int(np.sum(_fail))
- ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
- result[start] = [succs, fails, ratio]
- return result
- def mratios(self, time_stamp,presecs):
- paths = self.paths(time_stamp)
- mchid_paths = self._merge_mobile_path(paths)
- day_stamp = self.day_stamp(time_stamp)
- mratios = {}
- hfive = h5py.File(self._file_name, 'r')
- for mchid, paths in mchid_paths.items():
- mdata = self._merge_data(hfive,paths)
- result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
- mratios[mchid] = result
- hfive.close()
- return mratios
- def calc_ratio(self):
- import json
- r = None
- try:
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
- r = redis.Redis(connection_pool=pool)
- except Exception as ex:
- print(ex)
- while True:
- try:
- time_sec = int(stime.time())
- presecs = [900, 1800, 3600, 7200, 86400]
- mratios = self.mratios(time_sec, presecs)
- if len(mratios) != 0:
- r.set(f"nc_merchant_ratios", json.dumps(mratios))
- r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
- print('push msg=',mratios)
- except Exception as ex:
- print(ex)
- finally:
- stime.sleep(2)
- #以下为按照卡类型计算成功率代码
- def _merge_mobile_type_path(self,paths,card_type=None):
- result = {}
- for path in paths:
- items = re.split(r'/', path)
- if len(items) != 6:
- continue
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
- _card_type = int(_card_type)
- if _card_type not in [4, 5, 6]:
- continue
- if card_type is not None and _card_type != card_type:
- continue
- _mchid = int(_mchid)
- if _mchid not in result:
- result[_mchid] = []
- result[_mchid].append(path)
- return result
- def mratio_types(self, time_stamp,presecs):
- paths = self.paths(time_stamp)
- mchid_paths = self._merge_mobile_type_path(paths)
- day_stamp = self.day_stamp(time_stamp)
- card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
- mratios = {}
- hfive = h5py.File(self._file_name, 'r')
- for mchid, paths in mchid_paths.items():
- mch_ratios = {}
- for card_type, name in card_types.items():
- print('card_type=', card_type, 'name=', name)
- if card_type is None:
- cur_paths = paths
- else:
- cur_paths = self._merge_mobile_type_path(paths, card_type)
- if len(cur_paths) == 0:
- continue
- cur_paths = cur_paths[mchid]
- mdata = self._merge_data(hfive,cur_paths)
- result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
- mch_ratios[name] = result
- mratios[mchid] = mch_ratios
- hfive.close()
- return mratios
- def calc_ratios(self):
- import json
- r = None
- try:
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
- r = redis.Redis(connection_pool=pool)
- except Exception as ex:
- print(ex)
- while True:
- try:
- time_sec = int(stime.time())
- presecs = [900, 1800, 3600, 7200, 86400]
- mratios = self.mratio_types(time_sec, presecs)
- if len(mratios) != 0:
- r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
- # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
- print('push msg=', mratios)
- except Exception as ex:
- print(ex)
- finally:
- stime.sleep(2)
- ####################################################################################################################
- ####
- def _calc_mcount(self,data,startes,end):
- succ = data[1]
- fail = data[2]
- x = np.arange(0, 86400, 1)
- result = {}
- for start in startes:
- if end - start < 0:
- start_pos = 0
- else:
- start_pos = end - start
- pos = np.where(x >= start_pos)
- t = x[pos]
- _succ = succ[pos]
- _fail = fail[pos]
- pos = np.where(t < end)
- _succ = _succ[pos]
- _fail = _fail[pos]
- succs = int(np.sum(_succ))
- fails = int(np.sum(_fail))
- ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
- result[start] = [succs, fails, ratio]
- return result
- def merchant_rmobile_path(self, paths):
- result = {}
- for path in paths:
- items = re.split(r'/', path)
- if len(items) != 6:
- continue
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
- _card_type = int(_card_type)
- if _card_type not in [4, 5, 6]:
- continue
- _mchid = int(_mchid)
- if _mchid not in result:
- result[_mchid] = []
- result[_mchid].append(path)
- return result
- def rmobile_path(self, paths):
- result = []
- for path in paths:
- items = re.split(r'/', path)
- if len(items) != 6:
- continue
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
- _card_type = int(_card_type)
- if _card_type not in [4, 5, 6]:
- continue
- result.append(path)
- return result
- def mch_count(self, paths, presecs,time_stamp):
- hfive = h5py.File(self._file_name, 'r')
- mdata = self._merge_data(hfive,paths)
- day_stamp = self.day_stamp(time_stamp)
- result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
- hfive.close()
- return result
- def mch_detail_count(self, paths, presecs,time_stamp):
- hfive = h5py.File(self._file_name, 'r')
- result = {}
- for path in paths:
- items = re.split(r'/', path)
- if len(items) != 6:
- continue
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
- key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
- mdata = hfive[path]
- day_stamp = self.day_stamp(time_stamp)
- result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
- hfive.close()
- return result
- def mch_counts(self):
- import json
- r = None
- try:
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
- r = redis.Redis(connection_pool=pool)
- except Exception as ex:
- print(ex)
- while True:
- try:
- time_stamp = int(stime.time())
- presecs = [900, 1800, 3600, 7200, 86400]
- all_paths = self.paths(time_stamp)
- mchid_paths = self.merchant_rmobile_path(all_paths)
- gross = {}
- for mchid, paths in mchid_paths.items():
- counts = self.mch_count(paths, presecs, time_stamp)
- gross[mchid] = counts
- paths = self.rmobile_path(all_paths)
- detail = self.mch_detail_count(paths, presecs, time_stamp)
- result = {'gross': gross, 'detail': detail}
- if len(result) != 0:
- r.set(f"nc_merchant_refill_counts", json.dumps(result))
- r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
- print('push msg=', result)
- except Exception as ex:
- print(ex)
- finally:
- stime.sleep(2)
- mchDataCenter = MchDataCenter()
|