123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- from threading import Thread
- import os
- import time
- from Singleton import Singleton
- import redis
- import h5py
- from os import path
- import re
- from datetime import timedelta
- import numpy as np
- class DataCenter(object):
- pos_map = {
- 'commit-succ': 0, 'commit-fail': 1, 'notify_succ': 2, 'notify-fail': 3, 'user_succ': 4
- }
- def __init__(self):
- self._mquit = False
- file_name = '/var/www/html/data/stdata/data.hdf5'
- if path.exists(file_name):
- self._mHfive = h5py.File(file_name, 'r+')
- else:
- self._mHfive = h5py.File(file_name, 'a')
- def __del__(self):
- self._mHfive.close()
- def start(self):
- self._mTrdReader = Thread(target=DataCenter.prepare_data, args=(self,))
- self._mTrdReader.start()
- self._mTrdReader.join()
- pass
- def stop(self):
- self._mquit = True
- pass
- def wait(self):
- self._mTrdReader.join()
- def prepare_data(self):
- try:
- # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0)
- pool = redis.ConnectionPool(host='172.26.105.125', port=6379, db=0)
- r = redis.Redis(connection_pool=pool)
- self.read_redis(r, 'nc_channel_monitor_commit', 'commit')
- self.read_redis(r, 'nc_channel_monitor_notify', 'notify')
- except Exception as ex:
- print(ex)
- # while not self._mquit:
- # try:
- # # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0)
- # pool = redis.ConnectionPool(host='172.26.105.125', port=6379, db=0)
- # r = redis.Redis(connection_pool=pool)
- # self.read_redis(r, 'nc_channel_monitor_commit', 'commit')
- # self.read_redis(r, 'nc_channel_monitor_notify', 'notify')
- # break
- # except Exception as ex:
- # print(ex)
- # finally:
- # time.sleep(60)
- pass
- def read_redis(self, redis, name, prefix):
- i = 0
- for item in redis.hscan_iter(name):
- key = str(item[0], encoding="utf-8")
- val = str(item[1], encoding="utf-8")
- print(f'{prefix}:{i}')
- i += 1
- self.parase(key, val, prefix)
- def parase(self, text, val, prefix):
- items = re.split(r'-', text)
- if len(items) != 6:
- return False
- (stype, chname, quality, card_type, amount, time) = items
- if stype == 'succ':
- pos = self.pos_map[f'{prefix}-succ']
- elif stype == 'fail':
- pos = self.pos_map[f'{prefix}-fail']
- else:
- return False
- time = int(time)
- today = self.day_stamp(time)
- path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
- if path not in self._mHfive:
- self._mHfive[path] = np.zeros((5, 86400))
- diff = time - today
- if diff < 0:
- print(diff)
- self._mHfive[path][pos][diff] = int(val)
- print(path, pos, diff)
- pass
- def day_stamp(self, stamp):
- stamp = int(stamp)
- x = time.gmtime(stamp + 8 * 3600)
- diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
- today = stamp - diff.total_seconds()
- return int(today)
|