import os import time as stime import redis import h5py from os import path import re from datetime import timedelta import numpy as np from matplotlib.figure import Figure from matplotlib import ticker from io import BytesIO import logging class MchDataCenter(object): latest_delta = 2 pos_map = { 'commit': 0, 'success': 1, 'fail': 2 } def __init__(self): self._mquit = False self._mRHost = '' self._mRPort = 6379 self._file_name = '/var/www/html/data/stdata/user.hdf5' def set_redis(self, rhost, rport): self._mRHost = rhost self._mRPort = rport def stop(self): self._mquit = True pass def prepare_data(self): while self._mquit == False: try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) if path.exists(self._file_name): hfive = h5py.File(self._file_name, 'a') else: hfive = h5py.File(self._file_name, 'w') latest_time = int(stime.time()) - self.latest_delta self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit') self.read_redis(hfive, r, 'nc_user_monitor_success', 'success') self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail') hfive.close() self.del_redis(r, 'nc_user_monitor_commit',latest_time) self.del_redis(r, 'nc_user_monitor_success',latest_time) self.del_redis(r, 'nc_user_monitor_fail',latest_time) except Exception as ex: print(ex) finally: for i in range(60): if self._mquit: break else: stime.sleep(1) def del_redis(self, redis, name,latest_time): for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") items = re.split(r'-', key) fdel = True if len(items) == 5: (mchid, quality, card_type, amount, time) = items time = int(time) if latest_time <= time: fdel = False if fdel: redis.hdel(name, key) pass def read_redis(self, hfive, redis, name, prefix): for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") val = str(item[1], encoding="utf-8") self.parase(hfive, key, val, prefix) def parase(self, hfive, text, val, prefix): items = re.split(r'-', text) if len(items) != 5: return False (mchid, quality, card_type, amount, time) = items pos = self.pos_map[f'{prefix}'] time = int(time) today = self.day_stamp(time) path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}' if path not in hfive: dim = len(self.pos_map) hfive[path] = np.zeros((dim, 86400)) diff = time - today if diff < 0: print(diff) hfive[path][pos, diff] = val print(path, pos, diff, val, hfive[path][pos, diff]) pass def day_stamp(self, stamp): stamp = int(stamp) x = stime.gmtime(stamp + 8 * 3600) diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec) today = stamp - diff.total_seconds() return int(today) def _days(self, root): result = [] try: for name, sub in root.items(): if isinstance(sub, h5py.Group): result.append(name) except Exception as ex: print(ex) finally: return result def days(self): try: hfive = h5py.File(self._file_name, 'r') root = hfive.require_group('/') days = self._days(root) hfive.close() return days except Exception as ex: print(ex) return [] def paths(self, time_stamp): try: day_stamp = self.day_stamp(time_stamp) hfive = h5py.File(self._file_name, 'r') group = hfive.require_group(f'/{day_stamp}') paths = self.dir(group) hfive.close() return paths except Exception as ex: print(ex) return [] def dir(self, group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(self.dir(sub)) else: result.append(sub.name) return result def _all_none(self, **kwargs): for key, val in kwargs.items(): if val is not None: return False return True def _merge_path(self,paths): result = {} for path in paths: items = re.split(r'/', path) if len(items) != 6: continue (_, _sday, _mchid, _quality, _card_type, _amount) = items _mchid = int(_mchid) if _mchid not in result: result[_mchid] = [] result[_mchid].append(path) return result def draw_plot(self, start_time, interval=300, **kwargs): logger = logging.getLogger('app') hfive = h5py.File(self._file_name, 'r') try: day_stamp = self.day_stamp(start_time) start_pos = start_time - day_stamp cur_day = self.day_stamp(stime.time()) if day_stamp == cur_day: end_pos = int(stime.time()) - day_stamp else: end_pos = -1 fig = Figure(figsize=(16, 8)) ax = fig.subplots() dim = len(self.pos_map) predata = np.zeros((dim, 86400)) x = np.arange(0, 86400, interval) sub_count = 0 filer_text, paths = self.datasets(hfive, start_time, **kwargs) if self._all_none(**kwargs): paths = self._merge_path(paths) for mchid, data in self._read_dict_data(hfive, paths): predata = predata + data path = f'{mchid}' ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path) if ret: sub_count += 1 pass else: for path, data in self.read_data(hfive, paths): data = np.array(data) predata = predata + data ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path) if ret: sub_count += 1 if sub_count > 1: self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text) ax.legend() ax.grid() ax.set_title('success ratio') ax.set(xlabel='time', ylabel='ratio') fig.autofmt_xdate() fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1) buf = BytesIO() fig.savefig(buf, format="png") return buf except Exception as ex: print(ex) finally: hfive.close() def read_data(self, hfive, paths): for path in paths: yield path, hfive[path] def _read_dict_data(self, hfive, mchPaths): for mchid, paths in mchPaths.items(): dim = len(self.pos_map) predata = np.zeros((dim, 86400)) for path in paths: predata += hfive[path] yield mchid, predata def datasets(self, hfive, start_time, **kwargs): logger = logging.getLogger('app') day_stamp = self.day_stamp(start_time) sday = f'{day_stamp}' root = hfive.require_group('/') days = self._days(root) if sday not in days: return False group = hfive.require_group(sday) dsets = self.dir(group) mchid = quality = card_type = amount = None for key, val in kwargs.items(): if val is None: continue if key == 'mchid': mchid = val elif key == 'quality': quality = f'{val}' elif key == 'card_type': card_type = f'{val}' elif key == 'amount': amount = f'{val}' else: continue return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount) def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None): filer_text = '' if mchid is not None: filer_text = mchid if quality is not None: filer_text = filer_text + f"-qua:{quality}" if card_type is not None: filer_text = filer_text + f"-type:{card_type}" if amount is not None: filer_text = filer_text + f"-amount:{amount}" paths = [] for text in dsets: items = re.split(r'/', text) if len(items) != 6: return False (_, _sday, _mchid, _quality, _card_type, _amount) = items if (mchid is not None) and (_mchid != mchid): continue if (quality is not None) and (_quality != quality): continue if (card_type is not None) and (_card_type != card_type): continue if (amount is not None) and (_amount != amount): continue paths.append(text) return filer_text, paths def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''): # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2 logging.getLogger('app').debug("path=%s", path) all = data[1] + data[2] all = all.reshape((-1, interval)) all = np.sum(all, axis=1) ySucc = data[1] ySucc = ySucc.reshape((-1, interval)) ySucc = np.sum(ySucc, axis=1) if end_pos == -1: pos = np.where(x >= start_pos) x = x[pos] ySucc = ySucc[pos] all = all[pos] else: pos = np.where(start_pos <= x) x = x[pos] ySucc = ySucc[pos] all = all[pos] pos = np.where(x < end_pos) x = x[pos] ySucc = ySucc[pos] all = all[pos] succ_count = int(np.sum(ySucc)) all_count = int(np.sum(all)) if all_count < 1: return False pos = np.where(ySucc > all) ySucc[pos] = all[pos] ySucc = ySucc / (all + 0.00000001) xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x]) ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0)) ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count)) return True def _label(self, path, count, all): ratio = 0.00 if all > 0: ratio = round(count * 100 / all, 2) items = re.split(r'/', path) if len(items) == 6: (_, _sday, _chname, _quality, _card_type, _amount) = items card_type = '' if _card_type == '1': card_type = 'SY' elif _card_type == '2': card_type = 'SH' elif _card_type == '4': card_type = 'YD' elif _card_type == '5': card_type = 'LT' elif _card_type == '6': card_type = 'DX' elif _card_type == '7': card_type = 'TH' return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%" else: if path == '' or path is None: path = 'average' return f"{path}:{count}/{all} = {ratio}%" pass #统计机构当前时间之前序列时间成功率 def _merge_mobile_path(self,paths): result = {} for path in paths: items = re.split(r'/', path) if len(items) != 6: continue (_, _sday, _mchid, _quality, _card_type, _amount) = items _card_type = int(_card_type) if _card_type not in [4, 5, 6]: continue _mchid = int(_mchid) if _mchid not in result: result[_mchid] = [] result[_mchid].append(path) return result def _merge_data(self, hfive, paths): dim = len(self.pos_map) predata = np.zeros((dim, 86400)) for path in paths: predata += hfive[path] return predata def _calc_mratio(self,data,startes,end): succ = data[1] fail = data[2] x = np.arange(0, 86400, 1) result = {} for start in startes: if end - start < 0: start_pos = 0 else: start_pos = end - start pos = np.where(x >= start_pos) t = x[pos] _fail = fail[pos] _succ = succ[pos] pos = np.where(t < end) _fail = _fail[pos] _succ = _succ[pos] succs = int(np.sum(_succ)) fails = int(np.sum(_fail)) ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4) result[start] = [succs, fails, ratio] return result def mratios(self, time_stamp,presecs): paths = self.paths(time_stamp) mchid_paths = self._merge_mobile_path(paths) day_stamp = self.day_stamp(time_stamp) mratios = {} hfive = h5py.File(self._file_name, 'r') for mchid, paths in mchid_paths.items(): mdata = self._merge_data(hfive,paths) result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp) mratios[mchid] = result hfive.close() return mratios def calc_ratio(self): import json r = None try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) except Exception as ex: print(ex) while True: try: time_sec = int(stime.time()) presecs = [900, 1800, 3600, 7200, 86400] mratios = self.mratios(time_sec, presecs) if len(mratios) != 0: r.set(f"nc_merchant_ratios", json.dumps(mratios)) r.publish('refill',json.dumps({'type':'mch_ratio','value':0})) print('push msg=',mratios) except Exception as ex: print(ex) finally: stime.sleep(2) #以下为按照卡类型计算成功率代码 def _merge_mobile_type_path(self,paths,card_type=None): result = {} for path in paths: items = re.split(r'/', path) if len(items) != 6: continue (_, _sday, _mchid, _quality, _card_type, _amount) = items _card_type = int(_card_type) if _card_type not in [4, 5, 6]: continue if card_type is not None and _card_type != card_type: continue _mchid = int(_mchid) if _mchid not in result: result[_mchid] = [] result[_mchid].append(path) return result def mratio_types(self, time_stamp,presecs): paths = self.paths(time_stamp) mchid_paths = self._merge_mobile_type_path(paths) day_stamp = self.day_stamp(time_stamp) card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'} mratios = {} hfive = h5py.File(self._file_name, 'r') for mchid, paths in mchid_paths.items(): mch_ratios = {} for card_type, name in card_types.items(): print('card_type=', card_type, 'name=', name) if card_type is None: cur_paths = paths else: cur_paths = self._merge_mobile_type_path(paths, card_type) if len(cur_paths) == 0: continue cur_paths = cur_paths[mchid] mdata = self._merge_data(hfive,cur_paths) result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp) mch_ratios[name] = result mratios[mchid] = mch_ratios hfive.close() return mratios def calc_ratios(self): import json r = None try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) except Exception as ex: print(ex) while True: try: time_sec = int(stime.time()) presecs = [900, 1800, 3600, 7200, 86400] mratios = self.mratio_types(time_sec, presecs) if len(mratios) != 0: r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios)) # r.publish('refill',json.dumps({'type':'mch_ratio','value':0})) print('push msg=', mratios) except Exception as ex: print(ex) finally: stime.sleep(2) #################################################################################################################### #### def _calc_mcount(self,data,startes,end): succ = data[1] fail = data[2] x = np.arange(0, 86400, 1) result = {} for start in startes: if end - start < 0: start_pos = 0 else: start_pos = end - start pos = np.where(x >= start_pos) t = x[pos] _succ = succ[pos] _fail = fail[pos] pos = np.where(t < end) _succ = _succ[pos] _fail = _fail[pos] succs = int(np.sum(_succ)) fails = int(np.sum(_fail)) ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4) result[start] = [succs, fails, ratio] return result def merchant_rmobile_path(self, paths): result = {} for path in paths: items = re.split(r'/', path) if len(items) != 6: continue (_, _sday, _mchid, _quality, _card_type, _amount) = items _card_type = int(_card_type) if _card_type not in [4, 5, 6]: continue _mchid = int(_mchid) if _mchid not in result: result[_mchid] = [] result[_mchid].append(path) return result def rmobile_path(self, paths): result = [] for path in paths: items = re.split(r'/', path) if len(items) != 6: continue (_, _sday, _mchid, _quality, _card_type, _amount) = items _card_type = int(_card_type) if _card_type not in [4, 5, 6]: continue result.append(path) return result def mch_count(self, paths, presecs,time_stamp): hfive = h5py.File(self._file_name, 'r') mdata = self._merge_data(hfive,paths) day_stamp = self.day_stamp(time_stamp) result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp) hfive.close() return result def mch_detail_count(self, paths, presecs,time_stamp): hfive = h5py.File(self._file_name, 'r') result = {} for path in paths: items = re.split(r'/', path) if len(items) != 6: continue (_, _sday, _mchid, _quality, _card_type, _amount) = items key = f"{_mchid}-{_quality}-{_card_type}-{_amount}" mdata = hfive[path] day_stamp = self.day_stamp(time_stamp) result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp) hfive.close() return result def mch_counts(self): import json r = None try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) except Exception as ex: print(ex) while True: try: time_stamp = int(stime.time()) presecs = [900, 1800, 3600, 7200, 86400] all_paths = self.paths(time_stamp) mchid_paths = self.merchant_rmobile_path(all_paths) gross = {} for mchid, paths in mchid_paths.items(): counts = self.mch_count(paths, presecs, time_stamp) gross[mchid] = counts paths = self.rmobile_path(all_paths) detail = self.mch_detail_count(paths, presecs, time_stamp) result = {'gross': gross, 'detail': detail} if len(result) != 0: r.set(f"nc_merchant_refill_counts", json.dumps(result)) r.publish('refill',json.dumps({'type':'mch_counts','value':0})) print('push msg=', result) except Exception as ex: print(ex) finally: stime.sleep(2) mchDataCenter = MchDataCenter()