123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- from abc import ABCMeta, abstractmethod, ABC
- from datetime import timedelta
- # from mpi4py import MPI
- import h5py
- from enum import IntEnum
- __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5']
- import logging
- log = logging.getLogger('stream')
- def day_stamp(stamp):
- import time as stime
- stamp = int(stamp)
- st_time = stime.gmtime(stamp + 8 * 3600)
- diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
- today = stamp - diff.total_seconds()
- return int(today)
- def open_hdf5(file, is_wirte):
- if is_wirte:
- # return h5py.File(file, 'a', driver='mpio', comm=MPI.COMM_WORLD)
- return h5py.File(file, 'a')
- else:
- return h5py.File(file, 'r')
- class DataWriteStream(metaclass=ABCMeta):
- _version = 20200618
- def __init__(self, hfive):
- self._hfive = hfive
- # Getter function
- @property
- def file(self):
- return self._hfive
- # Setter function
- @file.setter
- def file(self, value):
- self._hfive = value
- @abstractmethod
- def write(self, method, params):
- pass
- def close(self):
- if self._hfive is not None:
- self._hfive.close()
- self._hfive = None
- pass
- class DataReadStream(metaclass=ABCMeta):
- _version = 20200618
- def __init__(self, hfive):
- self._hfive = hfive
- # Getter function
- @property
- def file(self):
- return self._hfive
- # Setter function
- @file.setter
- def file(self, value):
- self._hfive = value
- @abstractmethod
- def read(self, path):
- pass
- def close(self):
- if self._hfive is not None:
- self._hfive.close()
- self._hfive = None
- def _sub_dirs(self, root):
- result = []
- try:
- for name, sub in root.items():
- if isinstance(sub, h5py.Group):
- result.append(name)
- except Exception as ex:
- log.error(ex)
- finally:
- return result
- def dir(self, group):
- result = []
- for name, sub in group.items():
- if isinstance(sub, h5py.Group):
- result.extend(self.dir(sub))
- else:
- result.append(sub.name)
- return result
- def _root_path(self):
- return f'/{self._version}/'
- def dirs(self):
- try:
- root_ptah = self._root_path()
- root = self.file.require_group(root_ptah)
- days = self.dir(root)
- return days
- except Exception as ex:
- log.error(ex)
- return []
- def days(self):
- try:
- root_ptah = self._root_path()
- root = self.file.require_group(root_ptah)
- days = self._sub_dirs(root)
- return days
- except Exception as ex:
- log.error(ex)
- return []
- pass
- class EMchPosmap(IntEnum):
- submit_count = 0
- submit_amounts = 1
- succ_count = 2
- succ_mch_amounts = 3
- succ_ch_amounts = 4
- fail_count = 5
- fail_mch_amounts = 6
- @staticmethod
- def dim():
- return 7
- pass
- class EChPosmap(IntEnum):
- commit_count = 0
- commit_amounts = 1
- succ_count = 2
- succ_amounts = 3
- succ_periods = 4
- fail_count = 5
- fail_amounts = 6
- fail_periods = 7
- @staticmethod
- def dim():
- return 8
- pass
- class ENetPosmap(IntEnum):
- succ_count = 0
- fail_count = 1
- @staticmethod
- def dim():
- return 2
- pass
|