import os import time as stime import redis import h5py from os import path import re from datetime import timedelta from datetime import datetime import numpy as np class SpeedDataCenter(object): latest_delta = 10 def __init__(self): self._mquit = False self._mRHost = '' self._mRPort = 6379 self._file_name = '/var/www/html/data/stdata/speed.hdf5' def set_redis(self, rhost, rport): self._mRHost = rhost self._mRPort = rport def stop(self): self._mquit = True pass def prepare_data(self): pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) while self._mquit == False: try: print('open file:', datetime.now()) if path.exists(self._file_name): hfive = h5py.File(self._file_name, 'a') else: hfive = h5py.File(self._file_name, 'w') latest_time = int(stime.time()) - self.latest_delta print('read redis:', datetime.now()) self.read_redis(hfive, r, 'nc_commit_speed_monitor') hfive.close() print('del redis:', datetime.now()) self.del_redis(r, 'nc_commit_speed_monitor',latest_time) print('del redis:', datetime.now()) except Exception as ex: print(ex) finally: stime.sleep(1) def del_redis(self, redis, name,latest_time): for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") items = re.split(r'-', key) fdel = True if len(items) == 5: (chname, quality, card_type, amount, time) = items time = int(time) if time > latest_time: fdel = False if fdel: redis.hdel(name, key) pass def read_redis(self, hfive, redis, name): for item in redis.hscan_iter(name): key = str(item[0], encoding="utf-8") val = str(item[1], encoding="utf-8") self.parase(hfive, key, val) def parase(self, hfive, text, val): items = re.split(r'-', text) if len(items) != 5: return False (chname, quality, card_type, spec, time) = items time = int(time) today = self.day_stamp(time) path = f'/{today}/{chname}/{quality}/{card_type}/{spec}' if path not in hfive: hfive[path] = np.zeros((1, 86400)) diff = time - today hfive[path][0, diff] = val print(path, 0, diff, val, hfive[path][0, diff]) pass def day_stamp(self, stamp): stamp = int(stamp) x = stime.gmtime(stamp + 8 * 3600) diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec) today = stamp - diff.total_seconds() return int(today) ######################################################################################################################## def channl_speed(self): import json r = None try: pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0) r = redis.Redis(connection_pool=pool) except Exception as ex: print(ex) while True: try: time_stamp = int(stime.time()) paths = self.paths(time_stamp) day_stamp = self.day_stamp(time_stamp) (pos_start, pos_end) = self._pos_range(time_stamp - day_stamp) hfive = h5py.File(self._file_name, 'r') result = {} for path in paths: key = self._parse_path(path) speed = self._calc_speed(hfive, path, pos_start, pos_end) result[key] = speed pass hfive.close() if len(result) != 0: r.set(f"nc_channels_speed", json.dumps(result)) r.publish('refill', json.dumps({'type': 'channels_speed', 'value': 0})) print(datetime.now(), 'push msg=', result) except Exception as ex: print(ex) finally: stime.sleep(0.1) def _parse_path(self,path): items = re.split(r'/', path) if len(items) != 6: return False (_, _sday, _chname, _quality, _card_type, _spec) = items key = f'{_chname}-{_spec}-{_card_type}-{_quality}' #接收端的key,是如此定义的 return key def _pos_range(self,end): pos_day = np.arange(0, 86400, 1) period = 60 if end - period > 0: start = end - period pos_start = np.where(pos_day > start) else: start = 0 pos_start = np.where(pos_day >= start) pos_day = pos_day[pos_start] pos_end = np.where(pos_day <= end) return pos_start, pos_end def _calc_speed(self,hFive,path,pos_start, pos_end): datas = hFive[path] commit = datas[0] commit = commit[pos_start] commit = commit[pos_end] speed = int(np.sum(commit)) return speed def paths(self, time_stamp): try: day_stamp = self.day_stamp(time_stamp) hfive = h5py.File(self._file_name, 'r') group = hfive.require_group(f'/{day_stamp}') paths = self.dir(group) hfive.close() return paths except Exception as ex: print(ex) return [] def dir(self, group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(self.dir(sub)) else: result.append(sub.name) return result speedDataCenter = SpeedDataCenter()