import time from abc import ABCMeta, abstractmethod, ABC from datetime import timedelta from mpi4py import MPI import h5py from enum import IntEnum import threading __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border', 'calc_interval'] import logging logger = logging.getLogger('stream') def day_stamp(stamp): import time as stime stamp = int(stamp) st_time = stime.gmtime(stamp + 8 * 3600) diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec) today = stamp - diff.total_seconds() return int(today) def open_hdf5(file, is_wirte): if is_wirte: # 不知道为何在内存中加载这么慢 # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True #这种加载方式速度可以 return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151) else: return h5py.File(file, 'r', libver='latest') #plot/test_h5py.py def mktime(strtime): tdata = time.strptime(strtime, "%Y-%m-%d %H:%M:%S") time_stamp = int(time.mktime(tdata)) return time_stamp def time_border(interval, time_stamp, lt): day = day_stamp(time_stamp) pos = time_stamp - day if lt: pos = pos - pos % interval elif pos % interval > 0: pos += interval - pos % interval else: pos = pos return pos + day def calc_interval(start, end): period = end - start segment = int(period / 24) if segment == 0: return 1 else: intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 10800, 14400, 18000, 21600, 25200, 28800, 32400, 36000, 39600, 43200, 86400, 172800] for i in intervals: if segment < i: return i pass def span_days(start_time, end_time): start_day = day_stamp(start_time) end_day = day_stamp(end_time) if end_day == end_time: end_day = end_day - 1 days = list() while start_day <= end_day: days.append(start_day) start_day += 86400 return days def ch_calc_cfgs(): start_period = 3600 ratio_period = 900 speed_period = 60 monitor_period = 1800 cdf_speed_period = 60 return start_period, ratio_period, speed_period, monitor_period, cdf_speed_period class DataWriteStream(metaclass=ABCMeta): _version = 20200618 _paths = dict() _lock = threading.Lock() def __init__(self, hfive): self._hfive = hfive # Getter function @property def file(self): return self._hfive # Setter function @file.setter def file(self, value): self._hfive = value @abstractmethod def write(self, method, params): pass def flush(self): hfive = self.file hfive.flush() def close(self): self._lock.acquire() if self._hfive is not None: self._hfive.close() self._hfive = None self._lock.release() pass class DataReadStream(metaclass=ABCMeta): _version = 20200618 _days = list() def __init__(self, hfive): self._hfive = hfive self._days = self._getdays() # stime = lambda t: time.strftime('%y-%m-%d', time.localtime(t)) # sdays = [stime(day) for day in self._days] # logger.debug(sdays) def __del__(self): pass def days(self): return self._days # Getter function @property def file(self): return self._hfive # Setter function @file.setter def file(self, value): self._hfive = value def close(self): if self._hfive is not None: self._hfive.close() self._hfive = None def _root_path(self, day_stamp=None): if day_stamp is None: return f'/{self._version}/' else: return f'/{self._version}/{day_stamp}/' def datasets(self, day_stamp): def dir(group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(dir(sub)) else: result.append(sub.name) return result try: path = self._root_path(day_stamp) if path in self.file: group = self.file.require_group(path) days = dir(group) return days else: return [] except Exception as ex: logger.error(ex) return [] def _getdays(self): def sub(root): result = [] try: for name, sub in root.items(): if isinstance(sub, h5py.Group): result.append(name) except Exception as ex: logger.error(ex) finally: return result try: root_ptah = self._root_path() root = self.file.require_group(root_ptah) tmp = sub(root) days = [int(day) for day in tmp] days.sort() return days except Exception as ex: logger.error(ex) return [] class EMchPosmap(IntEnum): submit_count = 0 submit_amounts = 1 succ_count = 2 succ_mch_amounts = 3 succ_ch_amounts = 4 fail_count = 5 fail_mch_amounts = 6 @staticmethod def dim(): return 7 pass class EChPosmap(IntEnum): commit_count = 0 commit_amounts = 1 succ_count = 2 succ_amounts = 3 succ_periods = 4 fail_count = 5 fail_amounts = 6 fail_periods = 7 succ_mch_amounts = 8 @staticmethod def dim(): return 9 @staticmethod def old_dim(): return 8 pass class ENetPosmap(IntEnum): succ_count = 0 fail_count = 1 @staticmethod def dim(): return 2 pass