from threading import Thread import os import time as stime from Singleton import Singleton import redis import h5py from os import path import re from datetime import timedelta import numpy as np from matplotlib.figure import Figure from matplotlib import ticker from io import BytesIO import logging class DataCenter(object): pos_map = { 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4 } def __init__(self): self._mquit = False self._file_name = '/var/www/html/data/stdata/data.hdf5' def stop(self): self._mquit = True pass def prepare_data(self): while True: try: # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0) pool = redis.ConnectionPool(host='172.26.105.125', port=6379, db=0) r = redis.Redis(connection_pool=pool) if path.exists(self._file_name): hfive = h5py.File(self._file_name, 'a') else: hfive = h5py.File(self._file_name, 'w') self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit') self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify') hfive.close() self.del_redis(r, 'nc_channel_monitor_commit') self.del_redis(r, 'nc_channel_monitor_notify') except Exception as ex: print(ex) finally: for i in range(60): if self._mquit == True: break else: stime.sleep(1) def del_redis(self, redis, name): latest_time = int(stime.time()) - 300 for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") items = re.split(r'-', key) fdel = True if len(items) == 6: (stype, chname, quality, card_type, amount, time) = items time = int(time) if latest_time <= time: fdel = False if fdel: redis.hdel(name, key) pass def read_redis(self, hfive, redis, name, prefix): i = 0 for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") val = str(item[1], encoding="utf-8") print(f'{prefix}:{i}') i += 1 self.parase(hfive, key, val, prefix) def parase(self, hfive, text, val, prefix): items = re.split(r'-', text) if len(items) != 6: return False (stype, chname, quality, card_type, amount, time) = items if stype == 'succ': pos = self.pos_map[f'{prefix}-succ'] elif stype == 'fail': pos = self.pos_map[f'{prefix}-fail'] else: return False time = int(time) today = self.day_stamp(time) path = f'/{today}/{chname}/{quality}/{card_type}/{amount}' if path not in hfive: hfive[path] = np.zeros((5, 86400)) diff = time - today if diff < 0: print(diff) hfive[path][pos, diff] = val print(path, pos, diff, val, hfive[path][pos, diff]) pass def day_stamp(self, stamp): stamp = int(stamp) x = stime.gmtime(stamp + 8 * 3600) diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec) today = stamp - diff.total_seconds() return int(today) def _days(self, root): result = [] try: for name, sub in root.items(): if isinstance(sub, h5py.Group): result.append(name) except Exception as ex: print(ex) finally: return result def days(self): try: hfive = h5py.File(self._file_name, 'r') root = hfive.require_group('/') days = self._days(root) hfive.close() return days except Exception as ex: print(ex) return [] def paths(self,time_stamp): try: day_stamp = self.day_stamp(time_stamp) hfive = h5py.File(self._file_name, 'r') group = hfive.require_group(f'/{day_stamp}') paths = self.dir(group) hfive.close() return paths except Exception as ex: print(ex) return [] def dir(self, group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(self.dir(sub)) else: result.append(sub.name) return result def draw_plot(self, start_time, interval=300, **kwargs): logger = logging.getLogger('app') hfive = h5py.File(self._file_name, 'r') try: filer_text, paths = self.datasets(hfive, start_time, **kwargs) day_stamp = self.day_stamp(start_time) start_pos = start_time - day_stamp cur_day = self.day_stamp(stime.time()) if day_stamp == cur_day: end_pos = int(stime.time()) - day_stamp else: end_pos = -1 fig = Figure(figsize=(16, 9)) ax = fig.subplots() predata = np.zeros((5, 86400)) x = np.arange(0, 86400, interval) for path, data in self.read_data(hfive, paths): data = np.array(data) predata = predata + data self._draw_plot(ax, x, day_stamp, start_pos,end_pos, data, interval, path) logger.info("path=%s", path) self._draw_plot(ax, x, day_stamp, start_pos,end_pos, predata, interval, filer_text) ax.legend() ax.grid() ax.set_title('success ratio') ax.set(xlabel='time', ylabel='ratio') fig.autofmt_xdate() fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1) buf = BytesIO() fig.savefig(buf, format="png") return buf except Exception as ex: print(ex) finally: hfive.close() def read_data(self, hfive, paths): for path in paths: yield path, hfive[path] def datasets(self, hfive, start_time, **kwargs): logger = logging.getLogger('app') day_stamp = self.day_stamp(start_time) sday = f'{day_stamp}' root = hfive.require_group('/') days = self._days(root) if sday not in days: return False group = hfive.require_group(sday) dsets = self.dir(group) chname = quality = card_type = amount = None for key, val in kwargs.items(): if val is None: continue if key == 'chname': chname = val elif key == 'quality': quality = f'{val}' elif key == 'card_type': card_type = f'{val}' elif key == 'amount': amount = f'{val}' else: continue return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount) def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None): filer_text = '' if chname is not None: filer_text = chname if quality is not None: filer_text = filer_text + f"-qua:{quality}" if card_type is not None: filer_text = filer_text + f"-type:{card_type}" if amount is not None: filer_text = filer_text + f"-amount:{amount}" paths = [] for text in dsets: items = re.split(r'/', text) if len(items) != 6: return False (_, _sday, _chname, _quality, _card_type, _amount) = items if (chname is not None) and (_chname != chname): continue if (quality is not None) and (_quality != quality): continue if (card_type is not None) and (_card_type != card_type): continue if (amount is not None) and (_amount != amount): continue paths.append(text) return filer_text, paths def _draw_plot(self, ax, x, day_stamp, start_pos,end_pos, data, interval=300, path=''): import matplotlib.dates as mdate # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4 all = data[2] + data[3] all = all.reshape((-1, interval)) all = np.sum(all, axis=1) ySucc = data[2] ySucc = ySucc.reshape((-1, interval)) ySucc = np.sum(ySucc, axis=1) ySucc = ySucc / (all + 0.00000001) if end_pos == -1: pos = np.where(x >= start_pos) x = x[pos] ySucc = ySucc[pos] else: pos = np.where(start_pos <= x) x = x[pos] ySucc = ySucc[pos] pos = np.where(x < end_pos) x = x[pos] ySucc = ySucc[pos] xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x]) ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0)) ax.plot(xs, ySucc, ls='--', marker='o', label=path) dataCenter = DataCenter()