|
@@ -6,16 +6,8 @@ from os import path
|
|
|
import re
|
|
|
from datetime import timedelta
|
|
|
import numpy as np
|
|
|
-from matplotlib.figure import Figure
|
|
|
-from matplotlib import ticker
|
|
|
-from io import BytesIO
|
|
|
-import logging
|
|
|
|
|
|
class SpeedDataCenter(object):
|
|
|
- pos_map = {
|
|
|
- 'commit': 0, 'success': 1, 'fail': 2
|
|
|
- }
|
|
|
-
|
|
|
def __init__(self):
|
|
|
self._mquit = False
|
|
|
self._mRHost = ''
|
|
@@ -35,70 +27,56 @@ class SpeedDataCenter(object):
|
|
|
try:
|
|
|
pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
|
|
|
r = redis.Redis(connection_pool=pool)
|
|
|
-
|
|
|
if path.exists(self._file_name):
|
|
|
hfive = h5py.File(self._file_name, 'a')
|
|
|
else:
|
|
|
hfive = h5py.File(self._file_name, 'w')
|
|
|
-
|
|
|
- self.read_redis(hfive, r, 'nc_channel_monitor_commit')
|
|
|
+ self.read_redis(hfive, r, 'nc_commit_speed_monitor')
|
|
|
hfive.close()
|
|
|
- self.del_redis(r, 'nc_channel_monitor_commit')
|
|
|
+
|
|
|
+ self.del_redis(r, 'nc_commit_speed_monitor')
|
|
|
except Exception as ex:
|
|
|
print(ex)
|
|
|
finally:
|
|
|
- for i in range(60):
|
|
|
- if self._mquit:
|
|
|
- break
|
|
|
- else:
|
|
|
- stime.sleep(1)
|
|
|
+ stime.sleep(0.05)
|
|
|
|
|
|
def del_redis(self, redis, name):
|
|
|
- latest_time = int(stime.time()) - 300
|
|
|
+ latest_time = int(stime.time()) - 2
|
|
|
for item in redis.hscan_iter(name):
|
|
|
key = str(item[0], encoding="utf-8")
|
|
|
items = re.split(r'-', key)
|
|
|
-
|
|
|
fdel = True
|
|
|
if len(items) == 5:
|
|
|
- (mchid, quality, card_type, amount, time) = items
|
|
|
+ (chname, quality, card_type, amount, time) = items
|
|
|
time = int(time)
|
|
|
if latest_time <= time:
|
|
|
fdel = False
|
|
|
-
|
|
|
if fdel:
|
|
|
redis.hdel(name, key)
|
|
|
pass
|
|
|
|
|
|
- def read_redis(self, hfive, redis, name, prefix):
|
|
|
- i = 0
|
|
|
+ def read_redis(self, hfive, redis, name):
|
|
|
for item in redis.hscan_iter(name):
|
|
|
key = str(item[0], encoding="utf-8")
|
|
|
val = str(item[1], encoding="utf-8")
|
|
|
- print(f'{prefix}:{i}')
|
|
|
- i += 1
|
|
|
- self.parase(hfive, key, val, prefix)
|
|
|
+ self.parase(hfive, key, val)
|
|
|
|
|
|
- def parase(self, hfive, text, val, prefix):
|
|
|
+ def parase(self, hfive, text, val):
|
|
|
items = re.split(r'-', text)
|
|
|
if len(items) != 5:
|
|
|
return False
|
|
|
|
|
|
- (mchid, quality, card_type, amount, time) = items
|
|
|
- pos = self.pos_map[f'{prefix}']
|
|
|
+ (chname, quality, card_type, spec, time) = items
|
|
|
|
|
|
time = int(time)
|
|
|
today = self.day_stamp(time)
|
|
|
- path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
|
|
|
+ path = f'/{today}/{chname}/{quality}/{card_type}/{spec}'
|
|
|
if path not in hfive:
|
|
|
- dim = len(self.pos_map)
|
|
|
- hfive[path] = np.zeros((dim, 86400))
|
|
|
+ hfive[path] = np.zeros((1, 86400))
|
|
|
|
|
|
diff = time - today
|
|
|
- if diff < 0:
|
|
|
- print(diff)
|
|
|
- hfive[path][pos, diff] = val
|
|
|
- print(path, pos, diff, val, hfive[path][pos, diff])
|
|
|
+ hfive[path][0, diff] = val
|
|
|
+ print(path, 0, diff, val, hfive[path][0, diff])
|
|
|
pass
|
|
|
|
|
|
def day_stamp(self, stamp):
|
|
@@ -107,336 +85,9 @@ class SpeedDataCenter(object):
|
|
|
diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
|
|
|
today = stamp - diff.total_seconds()
|
|
|
return int(today)
|
|
|
+########################################################################################################################
|
|
|
|
|
|
- def _days(self, root):
|
|
|
- result = []
|
|
|
- try:
|
|
|
- for name, sub in root.items():
|
|
|
- if isinstance(sub, h5py.Group):
|
|
|
- result.append(name)
|
|
|
- except Exception as ex:
|
|
|
- print(ex)
|
|
|
- finally:
|
|
|
- return result
|
|
|
-
|
|
|
- def days(self):
|
|
|
- try:
|
|
|
- hfive = h5py.File(self._file_name, 'r')
|
|
|
- root = hfive.require_group('/')
|
|
|
- days = self._days(root)
|
|
|
- hfive.close()
|
|
|
- return days
|
|
|
- except Exception as ex:
|
|
|
- print(ex)
|
|
|
- return []
|
|
|
-
|
|
|
- def paths(self, time_stamp):
|
|
|
- try:
|
|
|
- day_stamp = self.day_stamp(time_stamp)
|
|
|
- hfive = h5py.File(self._file_name, 'r')
|
|
|
- group = hfive.require_group(f'/{day_stamp}')
|
|
|
- paths = self.dir(group)
|
|
|
- hfive.close()
|
|
|
- return paths
|
|
|
- except Exception as ex:
|
|
|
- print(ex)
|
|
|
- return []
|
|
|
-
|
|
|
- def dir(self, group):
|
|
|
- result = []
|
|
|
- for name, sub in group.items():
|
|
|
- if isinstance(sub, h5py.Group):
|
|
|
- result.extend(self.dir(sub))
|
|
|
- else:
|
|
|
- result.append(sub.name)
|
|
|
- return result
|
|
|
-
|
|
|
- def _all_none(self, **kwargs):
|
|
|
- for key, val in kwargs.items():
|
|
|
- if val is not None:
|
|
|
- return False
|
|
|
- return True
|
|
|
-
|
|
|
- def _merge_path(self,paths):
|
|
|
- result = {}
|
|
|
- for path in paths:
|
|
|
- items = re.split(r'/', path)
|
|
|
- if len(items) != 6:
|
|
|
- continue
|
|
|
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
|
|
|
-
|
|
|
- _mchid = int(_mchid)
|
|
|
- if _mchid not in result:
|
|
|
- result[_mchid] = []
|
|
|
- result[_mchid].append(path)
|
|
|
-
|
|
|
- return result
|
|
|
-
|
|
|
- def draw_plot(self, start_time, interval=300, **kwargs):
|
|
|
- logger = logging.getLogger('app')
|
|
|
- hfive = h5py.File(self._file_name, 'r')
|
|
|
- try:
|
|
|
- day_stamp = self.day_stamp(start_time)
|
|
|
- start_pos = start_time - day_stamp
|
|
|
-
|
|
|
- cur_day = self.day_stamp(stime.time())
|
|
|
- if day_stamp == cur_day:
|
|
|
- end_pos = int(stime.time()) - day_stamp
|
|
|
- else:
|
|
|
- end_pos = -1
|
|
|
-
|
|
|
- fig = Figure(figsize=(16, 8))
|
|
|
- ax = fig.subplots()
|
|
|
-
|
|
|
- dim = len(self.pos_map)
|
|
|
- predata = np.zeros((dim, 86400))
|
|
|
- x = np.arange(0, 86400, interval)
|
|
|
-
|
|
|
- sub_count = 0
|
|
|
- filer_text, paths = self.datasets(hfive, start_time, **kwargs)
|
|
|
- if self._all_none(**kwargs):
|
|
|
- paths = self._merge_path(paths)
|
|
|
- for mchid, data in self._read_dict_data(hfive, paths):
|
|
|
- predata = predata + data
|
|
|
- path = f'{mchid}'
|
|
|
- ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
|
|
|
- if ret:
|
|
|
- sub_count += 1
|
|
|
- pass
|
|
|
- else:
|
|
|
- for path, data in self.read_data(hfive, paths):
|
|
|
- data = np.array(data)
|
|
|
- predata = predata + data
|
|
|
- ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
|
|
|
- if ret:
|
|
|
- sub_count += 1
|
|
|
- if sub_count > 1:
|
|
|
- self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
|
|
|
-
|
|
|
- ax.legend()
|
|
|
- ax.grid()
|
|
|
- ax.set_title('success ratio')
|
|
|
- ax.set(xlabel='time', ylabel='ratio')
|
|
|
- fig.autofmt_xdate()
|
|
|
- fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
|
|
|
-
|
|
|
- buf = BytesIO()
|
|
|
- fig.savefig(buf, format="png")
|
|
|
- return buf
|
|
|
- except Exception as ex:
|
|
|
- print(ex)
|
|
|
- finally:
|
|
|
- hfive.close()
|
|
|
-
|
|
|
- def read_data(self, hfive, paths):
|
|
|
- for path in paths:
|
|
|
- yield path, hfive[path]
|
|
|
-
|
|
|
- def _read_dict_data(self, hfive, mchPaths):
|
|
|
- for mchid, paths in mchPaths.items():
|
|
|
- dim = len(self.pos_map)
|
|
|
- predata = np.zeros((dim, 86400))
|
|
|
- for path in paths:
|
|
|
- predata += hfive[path]
|
|
|
- yield mchid, predata
|
|
|
-
|
|
|
- def datasets(self, hfive, start_time, **kwargs):
|
|
|
- logger = logging.getLogger('app')
|
|
|
-
|
|
|
- day_stamp = self.day_stamp(start_time)
|
|
|
- sday = f'{day_stamp}'
|
|
|
- root = hfive.require_group('/')
|
|
|
- days = self._days(root)
|
|
|
- if sday not in days:
|
|
|
- return False
|
|
|
-
|
|
|
- group = hfive.require_group(sday)
|
|
|
- dsets = self.dir(group)
|
|
|
-
|
|
|
- mchid = quality = card_type = amount = None
|
|
|
- for key, val in kwargs.items():
|
|
|
- if val is None:
|
|
|
- continue
|
|
|
- if key == 'mchid':
|
|
|
- mchid = val
|
|
|
- elif key == 'quality':
|
|
|
- quality = f'{val}'
|
|
|
- elif key == 'card_type':
|
|
|
- card_type = f'{val}'
|
|
|
- elif key == 'amount':
|
|
|
- amount = f'{val}'
|
|
|
- else:
|
|
|
- continue
|
|
|
- return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
|
|
|
-
|
|
|
- def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
|
|
|
- filer_text = ''
|
|
|
- if mchid is not None:
|
|
|
- filer_text = mchid
|
|
|
- if quality is not None:
|
|
|
- filer_text = filer_text + f"-qua:{quality}"
|
|
|
- if card_type is not None:
|
|
|
- filer_text = filer_text + f"-type:{card_type}"
|
|
|
- if amount is not None:
|
|
|
- filer_text = filer_text + f"-amount:{amount}"
|
|
|
-
|
|
|
- paths = []
|
|
|
- for text in dsets:
|
|
|
- items = re.split(r'/', text)
|
|
|
- if len(items) != 6:
|
|
|
- return False
|
|
|
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
|
|
|
- if (mchid is not None) and (_mchid != mchid):
|
|
|
- continue
|
|
|
- if (quality is not None) and (_quality != quality):
|
|
|
- continue
|
|
|
- if (card_type is not None) and (_card_type != card_type):
|
|
|
- continue
|
|
|
- if (amount is not None) and (_amount != amount):
|
|
|
- continue
|
|
|
- paths.append(text)
|
|
|
-
|
|
|
- return filer_text, paths
|
|
|
-
|
|
|
- def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
|
|
|
- # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2
|
|
|
- logging.getLogger('app').debug("path=%s", path)
|
|
|
-
|
|
|
- all = data[1] + data[2]
|
|
|
- all = all.reshape((-1, interval))
|
|
|
- all = np.sum(all, axis=1)
|
|
|
-
|
|
|
- ySucc = data[1]
|
|
|
- ySucc = ySucc.reshape((-1, interval))
|
|
|
- ySucc = np.sum(ySucc, axis=1)
|
|
|
-
|
|
|
- if end_pos == -1:
|
|
|
- pos = np.where(x >= start_pos)
|
|
|
- x = x[pos]
|
|
|
- ySucc = ySucc[pos]
|
|
|
- all = all[pos]
|
|
|
- else:
|
|
|
- pos = np.where(start_pos <= x)
|
|
|
- x = x[pos]
|
|
|
- ySucc = ySucc[pos]
|
|
|
- all = all[pos]
|
|
|
-
|
|
|
- pos = np.where(x < end_pos)
|
|
|
- x = x[pos]
|
|
|
- ySucc = ySucc[pos]
|
|
|
- all = all[pos]
|
|
|
-
|
|
|
- succ_count = int(np.sum(ySucc))
|
|
|
- all_count = int(np.sum(all))
|
|
|
-
|
|
|
- if all_count < 1:
|
|
|
- return False
|
|
|
-
|
|
|
- pos = np.where(ySucc > all)
|
|
|
- ySucc[pos] = all[pos]
|
|
|
-
|
|
|
- ySucc = ySucc / (all + 0.00000001)
|
|
|
- xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
|
|
|
- ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
|
|
|
- ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
|
|
|
- return True
|
|
|
-
|
|
|
- def _label(self, path, count, all):
|
|
|
- ratio = 0.00
|
|
|
- if all > 0:
|
|
|
- ratio = round(count * 100 / all, 2)
|
|
|
-
|
|
|
- items = re.split(r'/', path)
|
|
|
- if len(items) == 6:
|
|
|
- (_, _sday, _chname, _quality, _card_type, _amount) = items
|
|
|
- card_type = ''
|
|
|
- if _card_type == '1':
|
|
|
- card_type = 'SY'
|
|
|
- elif _card_type == '2':
|
|
|
- card_type = 'SH'
|
|
|
- elif _card_type == '4':
|
|
|
- card_type = 'YD'
|
|
|
- elif _card_type == '5':
|
|
|
- card_type = 'LT'
|
|
|
- elif _card_type == '6':
|
|
|
- card_type = 'DX'
|
|
|
- elif _card_type == '7':
|
|
|
- card_type = 'TH'
|
|
|
- return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
|
|
|
- else:
|
|
|
- if path == '' or path is None:
|
|
|
- path = 'average'
|
|
|
- return f"{path}:{count}/{all} = {ratio}%"
|
|
|
- pass
|
|
|
-
|
|
|
- #统计机构当前时间之前序列时间成功率
|
|
|
- def _merge_mobile_path(self,paths):
|
|
|
- result = {}
|
|
|
- for path in paths:
|
|
|
- items = re.split(r'/', path)
|
|
|
- if len(items) != 6:
|
|
|
- continue
|
|
|
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
|
|
|
-
|
|
|
- _card_type = int(_card_type)
|
|
|
- if _card_type not in [4, 5, 6]:
|
|
|
- continue
|
|
|
-
|
|
|
- _mchid = int(_mchid)
|
|
|
- if _mchid not in result:
|
|
|
- result[_mchid] = []
|
|
|
- result[_mchid].append(path)
|
|
|
- return result
|
|
|
-
|
|
|
- def _merge_data(self, hfive, paths):
|
|
|
- dim = len(self.pos_map)
|
|
|
- predata = np.zeros((dim, 86400))
|
|
|
- for path in paths:
|
|
|
- predata += hfive[path]
|
|
|
- return predata
|
|
|
-
|
|
|
- def _calc_mratio(self,data,startes,end):
|
|
|
- succ = data[1]
|
|
|
- fail = data[2]
|
|
|
- x = np.arange(0, 86400, 1)
|
|
|
-
|
|
|
- result = {}
|
|
|
- for start in startes:
|
|
|
- if end - start < 0:
|
|
|
- start_pos = 0
|
|
|
- else:
|
|
|
- start_pos = end - start
|
|
|
-
|
|
|
- pos = np.where(x >= start_pos)
|
|
|
- t = x[pos]
|
|
|
- _fail = fail[pos]
|
|
|
- _succ = succ[pos]
|
|
|
-
|
|
|
- pos = np.where(t < end)
|
|
|
- _fail = _fail[pos]
|
|
|
- _succ = _succ[pos]
|
|
|
-
|
|
|
- succs = int(np.sum(_succ))
|
|
|
- fails = int(np.sum(_fail))
|
|
|
- ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
|
|
|
- result[start] = [succs, fails, ratio]
|
|
|
- return result
|
|
|
-
|
|
|
- def mratios(self, time_stamp,presecs):
|
|
|
- paths = self.paths(time_stamp)
|
|
|
- mchid_paths = self._merge_mobile_path(paths)
|
|
|
- day_stamp = self.day_stamp(time_stamp)
|
|
|
-
|
|
|
- mratios = {}
|
|
|
- hfive = h5py.File(self._file_name, 'r')
|
|
|
- for mchid, paths in mchid_paths.items():
|
|
|
- mdata = self._merge_data(hfive,paths)
|
|
|
- result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
|
|
|
- mratios[mchid] = result
|
|
|
- hfive.close()
|
|
|
- return mratios
|
|
|
-
|
|
|
- def calc_ratio(self):
|
|
|
+ def channl_speed(self):
|
|
|
import json
|
|
|
|
|
|
r = None
|
|
@@ -448,212 +99,76 @@ class SpeedDataCenter(object):
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
- time_sec = int(stime.time())
|
|
|
- presecs = [900, 1800, 3600, 7200, 86400]
|
|
|
- mratios = self.mratios(time_sec, presecs)
|
|
|
+ time_stamp = int(stime.time())
|
|
|
+ paths = self.paths(time_stamp)
|
|
|
+ pos_range = self._pos_range(time_stamp)
|
|
|
+
|
|
|
+ hfive = h5py.File(self._file_name, 'r')
|
|
|
+ result = {}
|
|
|
+ for path in paths:
|
|
|
+ key = self._parse_path(path)
|
|
|
+ speed = self._calc_speed(hfive, path, pos_range)
|
|
|
+ result[key] = speed
|
|
|
+ pass
|
|
|
+ hfive.close()
|
|
|
|
|
|
- if len(mratios) != 0:
|
|
|
- r.set(f"nc_merchant_ratios", json.dumps(mratios))
|
|
|
- r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
|
|
|
- print('push msg=',mratios)
|
|
|
+ if len(result) != 0:
|
|
|
+ r.set(f"nc_channels_speed", json.dumps(result))
|
|
|
+ r.publish('refill',json.dumps({'type':'channels_speed','value':0}))
|
|
|
+ print('push msg=', result)
|
|
|
except Exception as ex:
|
|
|
print(ex)
|
|
|
finally:
|
|
|
- stime.sleep(2)
|
|
|
-
|
|
|
- #以下为按照卡类型计算成功率代码
|
|
|
- def _merge_mobile_type_path(self,paths,card_type=None):
|
|
|
- result = {}
|
|
|
- for path in paths:
|
|
|
- items = re.split(r'/', path)
|
|
|
- if len(items) != 6:
|
|
|
- continue
|
|
|
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
|
|
|
-
|
|
|
- _card_type = int(_card_type)
|
|
|
- if _card_type not in [4, 5, 6]:
|
|
|
- continue
|
|
|
- if card_type is not None and _card_type != card_type:
|
|
|
- continue
|
|
|
-
|
|
|
- _mchid = int(_mchid)
|
|
|
- if _mchid not in result:
|
|
|
- result[_mchid] = []
|
|
|
- result[_mchid].append(path)
|
|
|
- return result
|
|
|
+ stime.sleep(0.05)
|
|
|
|
|
|
- def mratio_types(self, time_stamp,presecs):
|
|
|
- paths = self.paths(time_stamp)
|
|
|
-
|
|
|
- mchid_paths = self._merge_mobile_type_path(paths)
|
|
|
- day_stamp = self.day_stamp(time_stamp)
|
|
|
-
|
|
|
- card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
|
|
|
+ def _parse_path(self,path):
|
|
|
+ items = re.split(r'/', path)
|
|
|
+ if len(items) != 6:
|
|
|
+ return False
|
|
|
+ (_, _sday, _chname, _quality, _card_type, _spec) = items
|
|
|
+ key = f'{_chname}-{_spec}-{_card_type}-{_quality}' #接收端的key,是如此定义的
|
|
|
+ return key
|
|
|
+
|
|
|
+ def _pos_range(self,end):
|
|
|
+ pos_day = np.arange(0, 86400, 1)
|
|
|
+ day_stamp = self.day_stamp(end)
|
|
|
+ period = 60
|
|
|
+ if end - period > day_stamp:
|
|
|
+ start = end - period
|
|
|
+ pos_range = np.where(pos_day > start)
|
|
|
+ else:
|
|
|
+ start = day_stamp
|
|
|
+ pos_range = np.where(pos_day >= start)
|
|
|
+ pos_range = np.where(pos_range <= end)
|
|
|
|
|
|
- mratios = {}
|
|
|
- hfive = h5py.File(self._file_name, 'r')
|
|
|
- for mchid, paths in mchid_paths.items():
|
|
|
- mch_ratios = {}
|
|
|
- for card_type, name in card_types.items():
|
|
|
- print('card_type=', card_type, 'name=', name)
|
|
|
- if card_type is None:
|
|
|
- cur_paths = paths
|
|
|
- else:
|
|
|
- cur_paths = self._merge_mobile_type_path(paths, card_type)
|
|
|
- if len(cur_paths) == 0:
|
|
|
- continue
|
|
|
- cur_paths = cur_paths[mchid]
|
|
|
- mdata = self._merge_data(hfive,cur_paths)
|
|
|
- result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
|
|
|
- mch_ratios[name] = result
|
|
|
- mratios[mchid] = mch_ratios
|
|
|
- hfive.close()
|
|
|
- return mratios
|
|
|
+ return pos_range
|
|
|
|
|
|
- def calc_ratios(self):
|
|
|
- import json
|
|
|
+ def _calc_speed(self,hFive,path,pos_range):
|
|
|
+ datas = hFive[path]
|
|
|
+ commit = datas[0]
|
|
|
+ commit = commit[pos_range]
|
|
|
+ speed = int(np.sum(commit))
|
|
|
+ return speed
|
|
|
|
|
|
- r = None
|
|
|
+ def paths(self, time_stamp):
|
|
|
try:
|
|
|
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
|
|
|
- r = redis.Redis(connection_pool=pool)
|
|
|
+ day_stamp = self.day_stamp(time_stamp)
|
|
|
+ hfive = h5py.File(self._file_name, 'r')
|
|
|
+ group = hfive.require_group(f'/{day_stamp}')
|
|
|
+ paths = self.dir(group)
|
|
|
+ hfive.close()
|
|
|
+ return paths
|
|
|
except Exception as ex:
|
|
|
print(ex)
|
|
|
+ return []
|
|
|
|
|
|
- while True:
|
|
|
- try:
|
|
|
- time_sec = int(stime.time())
|
|
|
- presecs = [900, 1800, 3600, 7200, 86400]
|
|
|
- mratios = self.mratio_types(time_sec, presecs)
|
|
|
-
|
|
|
- if len(mratios) != 0:
|
|
|
- r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
|
|
|
- # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
|
|
|
- print('push msg=', mratios)
|
|
|
- except Exception as ex:
|
|
|
- print(ex)
|
|
|
- finally:
|
|
|
- stime.sleep(2)
|
|
|
-
|
|
|
- ####################################################################################################################
|
|
|
- ####
|
|
|
- def _calc_mcount(self,data,startes,end):
|
|
|
- succ = data[1]
|
|
|
- fail = data[2]
|
|
|
- x = np.arange(0, 86400, 1)
|
|
|
-
|
|
|
- result = {}
|
|
|
- for start in startes:
|
|
|
- if end - start < 0:
|
|
|
- start_pos = 0
|
|
|
- else:
|
|
|
- start_pos = end - start
|
|
|
-
|
|
|
- pos = np.where(x >= start_pos)
|
|
|
- t = x[pos]
|
|
|
- _succ = succ[pos]
|
|
|
- _fail = fail[pos]
|
|
|
-
|
|
|
- pos = np.where(t < end)
|
|
|
- _succ = _succ[pos]
|
|
|
- _fail = _fail[pos]
|
|
|
-
|
|
|
- succs = int(np.sum(_succ))
|
|
|
- fails = int(np.sum(_fail))
|
|
|
- ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
|
|
|
- result[start] = [succs, fails, ratio]
|
|
|
- return result
|
|
|
-
|
|
|
- def merchant_rmobile_path(self, paths):
|
|
|
- result = {}
|
|
|
- for path in paths:
|
|
|
- items = re.split(r'/', path)
|
|
|
- if len(items) != 6:
|
|
|
- continue
|
|
|
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
|
|
|
-
|
|
|
- _card_type = int(_card_type)
|
|
|
- if _card_type not in [4, 5, 6]:
|
|
|
- continue
|
|
|
-
|
|
|
- _mchid = int(_mchid)
|
|
|
- if _mchid not in result:
|
|
|
- result[_mchid] = []
|
|
|
- result[_mchid].append(path)
|
|
|
- return result
|
|
|
-
|
|
|
- def rmobile_path(self, paths):
|
|
|
+ def dir(self, group):
|
|
|
result = []
|
|
|
- for path in paths:
|
|
|
- items = re.split(r'/', path)
|
|
|
- if len(items) != 6:
|
|
|
- continue
|
|
|
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
|
|
|
-
|
|
|
- _card_type = int(_card_type)
|
|
|
- if _card_type not in [4, 5, 6]:
|
|
|
- continue
|
|
|
- result.append(path)
|
|
|
- return result
|
|
|
-
|
|
|
- def mch_count(self, paths, presecs,time_stamp):
|
|
|
- hfive = h5py.File(self._file_name, 'r')
|
|
|
- mdata = self._merge_data(hfive,paths)
|
|
|
- day_stamp = self.day_stamp(time_stamp)
|
|
|
- result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
|
|
|
- hfive.close()
|
|
|
-
|
|
|
- return result
|
|
|
-
|
|
|
- def mch_detail_count(self, paths, presecs,time_stamp):
|
|
|
- hfive = h5py.File(self._file_name, 'r')
|
|
|
-
|
|
|
- result = {}
|
|
|
- for path in paths:
|
|
|
- items = re.split(r'/', path)
|
|
|
- if len(items) != 6:
|
|
|
- continue
|
|
|
- (_, _sday, _mchid, _quality, _card_type, _amount) = items
|
|
|
- key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
|
|
|
- mdata = hfive[path]
|
|
|
- day_stamp = self.day_stamp(time_stamp)
|
|
|
- result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
|
|
|
- hfive.close()
|
|
|
+ for name, sub in group.items():
|
|
|
+ if isinstance(sub, h5py.Group):
|
|
|
+ result.extend(self.dir(sub))
|
|
|
+ else:
|
|
|
+ result.append(sub.name)
|
|
|
return result
|
|
|
|
|
|
- def mch_counts(self):
|
|
|
- import json
|
|
|
-
|
|
|
- r = None
|
|
|
- try:
|
|
|
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
|
|
|
- r = redis.Redis(connection_pool=pool)
|
|
|
- except Exception as ex:
|
|
|
- print(ex)
|
|
|
-
|
|
|
- while True:
|
|
|
- try:
|
|
|
- time_stamp = int(stime.time())
|
|
|
- presecs = [900, 1800, 3600, 7200, 86400]
|
|
|
- all_paths = self.paths(time_stamp)
|
|
|
- mchid_paths = self.merchant_rmobile_path(all_paths)
|
|
|
-
|
|
|
- gross = {}
|
|
|
- for mchid, paths in mchid_paths.items():
|
|
|
- counts = self.mch_count(paths, presecs, time_stamp)
|
|
|
- gross[mchid] = counts
|
|
|
-
|
|
|
- paths = self.rmobile_path(all_paths)
|
|
|
- detail = self.mch_detail_count(paths, presecs, time_stamp)
|
|
|
-
|
|
|
- result = {'gross': gross, 'detail': detail}
|
|
|
- if len(result) != 0:
|
|
|
- r.set(f"nc_merchant_refill_counts", json.dumps(result))
|
|
|
- r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
|
|
|
- print('push msg=', result)
|
|
|
- except Exception as ex:
|
|
|
- print(ex)
|
|
|
- finally:
|
|
|
- stime.sleep(2)
|
|
|
-
|
|
|
-mchDataCenter = SpeedDataCenter()
|
|
|
+speedDataCenter = SpeedDataCenter()
|