123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import os
- import time as stime
- import redis
- import h5py
- from os import path
- import re
- from datetime import timedelta
- import numpy as np
- class SpeedDataCenter(object):
- def __init__(self):
- self._mquit = False
- self._mRHost = ''
- self._mRPort = 6379
- self._file_name = '/var/www/html/data/stdata/speed.hdf5'
- def set_redis(self, rhost, rport):
- self._mRHost = rhost
- self._mRPort = rport
- def stop(self):
- self._mquit = True
- pass
- def prepare_data(self):
- while self._mquit == False:
- try:
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
- r = redis.Redis(connection_pool=pool)
- if path.exists(self._file_name):
- hfive = h5py.File(self._file_name, 'a')
- else:
- hfive = h5py.File(self._file_name, 'w')
- self.read_redis(hfive, r, 'nc_commit_speed_monitor')
- hfive.close()
- self.del_redis(r, 'nc_commit_speed_monitor')
- except Exception as ex:
- print(ex)
- finally:
- stime.sleep(0.05)
- def del_redis(self, redis, name):
- latest_time = int(stime.time()) - 2
- for item in redis.hscan_iter(name):
- key = str(item[0], encoding="utf-8")
- items = re.split(r'-', key)
- fdel = True
- if len(items) == 5:
- (chname, quality, card_type, amount, time) = items
- time = int(time)
- if latest_time <= time:
- fdel = False
- if fdel:
- redis.hdel(name, key)
- pass
- def read_redis(self, hfive, redis, name):
- for item in redis.hscan_iter(name):
- key = str(item[0], encoding="utf-8")
- val = str(item[1], encoding="utf-8")
- self.parase(hfive, key, val)
- def parase(self, hfive, text, val):
- items = re.split(r'-', text)
- if len(items) != 5:
- return False
- (chname, quality, card_type, spec, time) = items
- time = int(time)
- today = self.day_stamp(time)
- path = f'/{today}/{chname}/{quality}/{card_type}/{spec}'
- if path not in hfive:
- hfive[path] = np.zeros((1, 86400))
- diff = time - today
- hfive[path][0, diff] = val
- print(path, 0, diff, val, hfive[path][0, diff])
- pass
- def day_stamp(self, stamp):
- stamp = int(stamp)
- x = stime.gmtime(stamp + 8 * 3600)
- diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
- today = stamp - diff.total_seconds()
- return int(today)
- ########################################################################################################################
- def channl_speed(self):
- import json
- r = None
- try:
- pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
- r = redis.Redis(connection_pool=pool)
- except Exception as ex:
- print(ex)
- while True:
- try:
- time_stamp = int(stime.time())
- paths = self.paths(time_stamp)
- pos_range = self._pos_range(time_stamp)
- hfive = h5py.File(self._file_name, 'r')
- result = {}
- for path in paths:
- key = self._parse_path(path)
- speed = self._calc_speed(hfive, path, pos_range)
- result[key] = speed
- pass
- hfive.close()
- if len(result) != 0:
- r.set(f"nc_channels_speed", json.dumps(result))
- r.publish('refill',json.dumps({'type':'channels_speed','value':0}))
- print('push msg=', result)
- except Exception as ex:
- print(ex)
- finally:
- stime.sleep(0.05)
- def _parse_path(self,path):
- items = re.split(r'/', path)
- if len(items) != 6:
- return False
- (_, _sday, _chname, _quality, _card_type, _spec) = items
- key = f'{_chname}-{_spec}-{_card_type}-{_quality}' #接收端的key,是如此定义的
- return key
- def _pos_range(self,end):
- pos_day = np.arange(0, 86400, 1)
- day_stamp = self.day_stamp(end)
- period = 60
- if end - period > day_stamp:
- start = end - period
- pos_range = np.where(pos_day > start)
- else:
- start = day_stamp
- pos_range = np.where(pos_day >= start)
- pos_range = np.where(pos_range <= end)
- return pos_range
- def _calc_speed(self,hFive,path,pos_range):
- datas = hFive[path]
- commit = datas[0]
- commit = commit[pos_range]
- speed = int(np.sum(commit))
- return speed
- def paths(self, time_stamp):
- try:
- day_stamp = self.day_stamp(time_stamp)
- hfive = h5py.File(self._file_name, 'r')
- group = hfive.require_group(f'/{day_stamp}')
- paths = self.dir(group)
- hfive.close()
- return paths
- except Exception as ex:
- print(ex)
- return []
- def dir(self, group):
- result = []
- for name, sub in group.items():
- if isinstance(sub, h5py.Group):
- result.extend(self.dir(sub))
- else:
- result.append(sub.name)
- return result
- speedDataCenter = SpeedDataCenter()
|