import os import time as stime import redis import h5py from os import path import re from datetime import timedelta import numpy as np from matplotlib.figure import Figure from matplotlib import ticker from io import BytesIO import logging import time as time class DataCenter(object): pos_map = { 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4 } def __init__(self): self._mquit = False self._mRHost = '' self._mRPort = 6379 self._file_name = '/var/www/html/data/stdata/data.hdf5' def set_redis(self, rhost, rport): self._mRHost = rhost self._mRPort = rport def stop(self): self._mquit = True pass def prepare_data(self): while self._mquit == False: try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) if path.exists(self._file_name): hfive = h5py.File(self._file_name, 'a') else: hfive = h5py.File(self._file_name, 'w') self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit') self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify') hfive.close() self.del_redis(r, 'nc_channel_monitor_commit') self.del_redis(r, 'nc_channel_monitor_notify') except Exception as ex: print(ex) finally: stime.sleep(10) def del_redis(self, redis, name): latest_time = int(stime.time()) - 120 for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") items = re.split(r'-', key) fdel = True if len(items) == 6: (stype, chname, quality, card_type, amount, time) = items time = int(time) if latest_time <= time: fdel = False if fdel: redis.hdel(name, key) pass def read_redis(self, hfive, redis, name, prefix): i = 0 for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") val = str(item[1], encoding="utf-8") print(f'{prefix}:{i}') i += 1 self.parase(hfive, key, val, prefix) def parase(self, hfive, text, val, prefix): items = re.split(r'-', text) if len(items) != 6: return False (stype, chname, quality, card_type, amount, time) = items if stype == 'succ': pos = self.pos_map[f'{prefix}-succ'] elif stype == 'fail': pos = self.pos_map[f'{prefix}-fail'] else: return False time = int(time) today = self.day_stamp(time) path = f'/{today}/{chname}/{quality}/{card_type}/{amount}' if path not in hfive: hfive[path] = np.zeros((5, 86400)) diff = time - today if diff < 0: print(diff) hfive[path][pos, diff] = val print(path, pos, diff, val, hfive[path][pos, diff]) pass def day_stamp(self, stamp): stamp = int(stamp) x = stime.gmtime(stamp + 8 * 3600) diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec) today = stamp - diff.total_seconds() return int(today) def _days(self, root): result = [] try: for name, sub in root.items(): if isinstance(sub, h5py.Group): result.append(name) except Exception as ex: print(ex) finally: return result def days(self): try: hfive = h5py.File(self._file_name, 'r') root = hfive.require_group('/') days = self._days(root) hfive.close() return days except Exception as ex: print(ex) return [] def paths(self, time_stamp): try: day_stamp = self.day_stamp(time_stamp) hfive = h5py.File(self._file_name, 'r') group = hfive.require_group(f'/{day_stamp}') paths = self.dir(group) hfive.close() return paths except Exception as ex: print(ex) return [] def dir(self, group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(self.dir(sub)) else: result.append(sub.name) return result def draw_plot(self, start_time, interval=300, **kwargs): logger = logging.getLogger('app') hfive = h5py.File(self._file_name, 'r') try: day_stamp = self.day_stamp(start_time) start_pos = start_time - day_stamp cur_day = self.day_stamp(stime.time()) if day_stamp == cur_day: end_pos = int(stime.time()) - day_stamp else: end_pos = -1 fig = Figure(figsize=(16, 8)) ax = fig.subplots() x = np.arange(0, 86400, interval) filer_text, paths = self.datasets(hfive, start_time, **kwargs) sub_count = 0 predata = np.zeros((5, 86400)) for path, data in self.read_data(hfive, paths): data = np.array(data) predata = predata + data ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path) if ret: sub_count += 1 if sub_count > 1: self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text) ax.legend() ax.grid() ax.set_title('success ratio') ax.set(xlabel='time', ylabel='ratio') fig.autofmt_xdate() fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1) buf = BytesIO() fig.savefig(buf, format="png") return buf except Exception as ex: print(ex) finally: hfive.close() def read_data(self, hfive, paths): for path in paths: yield path, hfive[path] def datasets(self, hfive, start_time, **kwargs): logger = logging.getLogger('app') day_stamp = self.day_stamp(start_time) sday = f'{day_stamp}' root = hfive.require_group('/') days = self._days(root) if sday not in days: return '', [] group = hfive.require_group(sday) dsets = self.dir(group) chname = quality = card_type = amount = None for key, val in kwargs.items(): if val is None: continue elif key == 'chname': chname = val elif key == 'quality': quality = f'{val}' elif key == 'card_type': card_type = f'{val}' elif key == 'amount': amount = f'{val}' else: continue return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount) def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None): filer_text = '' if chname is not None: filer_text = chname if quality is not None: filer_text = filer_text + f"-qua:{quality}" if card_type is not None: filer_text = filer_text + f"-type:{card_type}" if amount is not None: filer_text = filer_text + f"-amount:{amount}" paths = [] for text in dsets: items = re.split(r'/', text) if len(items) != 6: return False (_, _sday, _chname, _quality, _card_type, _amount) = items if (chname is not None) and (_chname != chname): continue if (quality is not None) and (_quality != quality): continue if (card_type is not None) and (_card_type != card_type): continue if (amount is not None) and (_amount != amount): continue paths.append(text) return filer_text, paths def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''): # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4 logging.getLogger('app').debug("path=%s", path) all = data[2] + data[3] all = all.reshape((-1, interval)) all = np.sum(all, axis=1) ySucc = data[2] ySucc = ySucc.reshape((-1, interval)) ySucc = np.sum(ySucc, axis=1) if end_pos == -1: pos = np.where(x >= start_pos) x = x[pos] ySucc = ySucc[pos] all = all[pos] else: pos = np.where(start_pos <= x) x = x[pos] ySucc = ySucc[pos] all = all[pos] pos = np.where(x < end_pos) x = x[pos] ySucc = ySucc[pos] all = all[pos] succ_count = int(np.sum(ySucc)) all_count = int(np.sum(all)) opened = np.where(ySucc > 0.1) if len(opened[0]) == 0: logging.getLogger('app').debug("path=%s,opened=False", path) return False ySucc = ySucc / all xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x]) ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0)) ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count)) return True def _label(self, path, count, all): ratio = 0.00 if all > 0: ratio = round(count * 100 / all, 2) items = re.split(r'/', path) if len(items) == 6: (_, _sday, _chname, _quality, _card_type, _amount) = items card_type = '' if _card_type == '1': card_type = 'SY' elif _card_type == '2': card_type = 'SH' elif _card_type == '4': card_type = 'YD' elif _card_type == '5': card_type = 'LT' elif _card_type == '6': card_type = 'DX' elif _card_type == '7': card_type = 'TH' return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%" else: if path == '' or path is None: path = 'average' return f"{path}:{count}/{all} = {ratio}%" def _cur_min(self): time_sec = int(time.time()) cur_min = time_sec - time_sec % 60 return cur_min def _calc_tuple(self, data, start_pos, end_pos): # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4 logging.getLogger('app').debug("path=%s", path) cur_data = data[:, start_pos:end_pos] all = np.sum(cur_data, axis=1) all = all.tolist() result = [int(x) for x in all] return result[:4] def _repath(self, path): items = re.split(r'/', path) if len(items) == 6: (_, _sday, _chname, _quality, _card_type, _amount) = items return f"{_chname}-{_amount}-{_card_type}-{_quality}" else: return False def _calc_ratio(self, start_time, end_time): hfive = None try: hfive = h5py.File(self._file_name, 'r') day_stamp = self.day_stamp(start_time) start_pos = start_time - day_stamp end_pos = end_time - day_stamp result = {'time': end_time} ratios = {} filer_text, paths = self.datasets(hfive, start_time) if len(paths) == 0: return None for path, data in self.read_data(hfive, paths): data = np.array(data) ratio = self._calc_tuple(data, start_pos, end_pos) key = self._repath(path) ratios[key] = ratio print(path, ratio) result['ratios'] = ratios return result except Exception as ex: print(ex) finally: hfive.close() pass def calc_ratio(self): import json r = None try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) except Exception as ex: print(ex) while True: try: time_sec = int(time.time()) day_stamp = self.day_stamp(time_sec) start_time = 0 if time_sec > day_stamp + 901: start_time = time_sec - 901 elif time_sec > day_stamp + 301: start_time = day_stamp else: pass self.log_time(day_stamp,time_sec,start_time) if start_time >= day_stamp: ratios = self._calc_ratio(start_time=start_time, end_time=time_sec - 1) print('ratios=',ratios) if ratios != None: r.set(f"nc_channel_ratios", json.dumps(ratios)) r.publish('refill',json.dumps({'type':'ratio','value':0})) except Exception as ex: print(ex) finally: stime.sleep(10) def log_time(self,day_stamp, time_sec, start_time): d = time.asctime(time.localtime(day_stamp)) a = time.asctime(time.localtime(time_sec)) b = time.asctime(time.localtime(start_time)) print('day_stamp=', d, 'cur_time=', a, 'start_time=', b) def pub_ratio(self): import json r = None try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) r.publish('refill',json.dumps({'type':'ratio','value':0})) except Exception as ex: print(ex) dataCenter = DataCenter()