import time from abc import ABCMeta, abstractmethod, ABC from datetime import timedelta from mpi4py import MPI import h5py from enum import IntEnum import threading __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border', 'calc_interval'] import logging logger = logging.getLogger('stream') def day_stamp(stamp): import time as stime stamp = int(stamp) st_time = stime.gmtime(stamp + 8 * 3600) diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec) today = stamp - diff.total_seconds() return int(today) def open_hdf5(file, is_wirte): if is_wirte: # 不知道为何在内存中加载这么慢 # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True #这种加载方式速度可以 return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151) else: return h5py.File(file, 'r', libver='latest') #plot/test_h5py.py def time_border(interval, time_stamp, lt): day = day_stamp(time_stamp) pos = time_stamp - day if lt: pos = pos - pos % interval elif pos % interval > 0: pos += interval - pos % interval else: pos = pos return pos + day def calc_interval(start, end): period = end - start segment = int(period / 24) if segment == 0: return 1 else: intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 14400, 21600, 28800, 36000, 43200, 86400, 172800] for i in intervals: if segment < i: return i pass def span_days(start_time, end_time): start_day = day_stamp(start_time) end_day = day_stamp(end_time) days = list() while start_day <= end_day: days.append(start_day) start_day += 86400 return days class DataWriteStream(metaclass=ABCMeta): _version = 20200618 _paths = dict() _lock = threading.Lock() def __init__(self, hfive): self._hfive = hfive # Getter function @property def file(self): return self._hfive # Setter function @file.setter def file(self, value): self._hfive = value @abstractmethod def write(self, method, params): pass def flush(self): hfive = self.file hfive.flush() def close(self): self._lock.acquire() if self._hfive is not None: self._hfive.close() self._hfive = None self._lock.release() pass class DataReadStream(metaclass=ABCMeta): _version = 20200618 _days = list() def __init__(self, hfive): self._hfive = hfive self._days = self._getdays() stime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t)) sdays = [stime(day) for day in self._days] logger.debug(sdays) logger.debug(self._days) def __del__(self): pass def days(self): return self._days # Getter function @property def file(self): return self._hfive # Setter function @file.setter def file(self, value): self._hfive = value def close(self): if self._hfive is not None: self._hfive.close() self._hfive = None def _root_path(self, day_stamp=None): if day_stamp is None: return f'/{self._version}/' else: return f'/{self._version}/{day_stamp}/' def datasets(self, day_stamp): def dir(group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(dir(sub)) else: result.append(sub.name) return result try: path = self._root_path(day_stamp) if path in self.file: group = self.file.require_group(path) days = dir(group) return days else: return [] except Exception as ex: logger.error(ex) return [] def _getdays(self): def sub(root): result = [] try: for name, sub in root.items(): if isinstance(sub, h5py.Group): result.append(name) except Exception as ex: logger.error(ex) finally: return result try: root_ptah = self._root_path() root = self.file.require_group(root_ptah) tmp = sub(root) days = [int(day) for day in tmp] days.sort() return days except Exception as ex: logger.error(ex) return [] def near_stamp(self, time_stamp, left=True): if len(self._days) == 0: return None if time_stamp > int(time.time()): time_stamp = int(time.time()) if left: min = self._days[0] time_stamp = min if time_stamp < min else time_stamp else: max = self._days[-1] + 86400 - 1 time_stamp = max if time_stamp > max else time_stamp return time_stamp class EMchPosmap(IntEnum): submit_count = 0 submit_amounts = 1 succ_count = 2 succ_mch_amounts = 3 succ_ch_amounts = 4 fail_count = 5 fail_mch_amounts = 6 @staticmethod def dim(): return 7 pass class EChPosmap(IntEnum): commit_count = 0 commit_amounts = 1 succ_count = 2 succ_amounts = 3 succ_periods = 4 fail_count = 5 fail_amounts = 6 fail_periods = 7 @staticmethod def dim(): return 8 pass class ENetPosmap(IntEnum): succ_count = 0 fail_count = 1 @staticmethod def dim(): return 2 pass