from abc import ABCMeta, abstractmethod, ABC from datetime import timedelta # from mpi4py import MPI import h5py from enum import IntEnum __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5'] import logging log = logging.getLogger('stream') def day_stamp(stamp): import time as stime stamp = int(stamp) st_time = stime.gmtime(stamp + 8 * 3600) diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec) today = stamp - diff.total_seconds() return int(today) def open_hdf5(file, is_wirte): if is_wirte: # return h5py.File(file, 'a', driver='mpio', comm=MPI.COMM_WORLD) return h5py.File(file, 'a') else: return h5py.File(file, 'r') class DataWriteStream(metaclass=ABCMeta): _version = 20200618 def __init__(self, hfive): self._hfive = hfive # Getter function @property def file(self): return self._hfive # Setter function @file.setter def file(self, value): self._hfive = value @abstractmethod def write(self, method, params): pass def close(self): if self._hfive is not None: self._hfive.close() self._hfive = None pass class DataReadStream(metaclass=ABCMeta): _version = 20200618 def __init__(self, hfive): self._hfive = hfive # Getter function @property def file(self): return self._hfive # Setter function @file.setter def file(self, value): self._hfive = value @abstractmethod def read(self, path): pass def close(self): if self._hfive is not None: self._hfive.close() self._hfive = None def _sub_dirs(self, root): result = [] try: for name, sub in root.items(): if isinstance(sub, h5py.Group): result.append(name) except Exception as ex: log.error(ex) finally: return result def dir(self, group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(self.dir(sub)) else: result.append(sub.name) return result def _root_path(self): return f'/{self._version}/' def dirs(self): try: root_ptah = self._root_path() root = self.file.require_group(root_ptah) days = self.dir(root) return days except Exception as ex: log.error(ex) return [] def days(self): try: root_ptah = self._root_path() root = self.file.require_group(root_ptah) days = self._sub_dirs(root) return days except Exception as ex: log.error(ex) return [] pass class EMchPosmap(IntEnum): submit_count = 0 submit_amounts = 1 succ_count = 2 succ_mch_amounts = 3 succ_ch_amounts = 4 fail_count = 5 fail_mch_amounts = 6 @staticmethod def dim(): return 7 pass class EChPosmap(IntEnum): commit_count = 0 commit_amounts = 1 succ_count = 2 succ_amounts = 3 succ_periods = 4 fail_count = 5 fail_amounts = 6 fail_periods = 7 @staticmethod def dim(): return 8 pass class ENetPosmap(IntEnum): succ_count = 0 fail_count = 1 @staticmethod def dim(): return 2 pass