123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- from abc import ABCMeta, abstractmethod, ABC
- from datetime import timedelta
- from mpi4py import MPI
- import h5py
- from enum import IntEnum
- import threading
- __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5','time_border']
- import logging
- logger = logging.getLogger('stream')
- def day_stamp(stamp):
- import time as stime
- stamp = int(stamp)
- st_time = stime.gmtime(stamp + 8 * 3600)
- diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
- today = stamp - diff.total_seconds()
- return int(today)
- def open_hdf5(file, is_wirte):
- if is_wirte:
- return h5py.File(file, 'a')
- else:
- return h5py.File(file, 'r')
- def time_border(interval, time_stamp, lt):
- day = day_stamp(time_stamp)
- pos = time_stamp - day
- if lt:
- pos = pos - pos % interval
- elif pos % interval > 0:
- pos += interval - pos % interval
- else:
- pos = pos
- return pos + day
- def calc_interval(start,end):
- period = end - start
- segment = int(period / 24)
- if segment == 0:
- return 1
- else:
- intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 14400, 21600, 28800, 36000, 43200, 86400, 172800]
- for i in intervals:
- if segment < i:
- return i
- pass
- def span_days(start_time,end_time):
- start_day = day_stamp(start_time)
- end_day = day_stamp(end_time)
- days = list()
- while start_day <= end_day:
- days.append(start_day)
- start_day += 86400
- return days
- class DataWriteStream(metaclass=ABCMeta):
- _version = 20200618
- _paths = dict()
- _lock = threading.Lock()
- def __init__(self, hfive):
- self._hfive = hfive
- # Getter function
- @property
- def file(self):
- return self._hfive
- # Setter function
- @file.setter
- def file(self, value):
- self._hfive = value
- @abstractmethod
- def write(self, method, params):
- pass
- def close(self):
- self._lock.acquire()
- if self._hfive is not None:
- self._hfive.close()
- self._hfive = None
- self._lock.release()
- pass
- class DataReadStream(metaclass=ABCMeta):
- _version = 20200618
- _days = list()
- def __init__(self, hfive):
- self._hfive = hfive
- self._days = self._getdays()
- def __del__(self):
- pass
- def days(self):
- return self._days
- # Getter function
- @property
- def file(self):
- return self._hfive
- # Setter function
- @file.setter
- def file(self, value):
- self._hfive = value
- def close(self):
- if self._hfive is not None:
- self._hfive.close()
- self._hfive = None
- def _root_path(self,day_stamp = None):
- if day_stamp is None:
- return f'/{self._version}/'
- else:
- return f'/{self._version}/{day_stamp}/'
- def datasets(self, day_stamp):
- def dir(group):
- result = []
- for name, sub in group.items():
- if isinstance(sub, h5py.Group):
- result.extend(dir(sub))
- else:
- result.append(sub.name)
- return result
- try:
- path = self._root_path(day_stamp)
- group = self.file.require_group(path)
- days = dir(group)
- return days
- except Exception as ex:
- logger.error(ex)
- return []
- def _getdays(self):
- def sub(root):
- result = []
- try:
- for name, sub in root.items():
- if isinstance(sub, h5py.Group):
- result.append(name)
- except Exception as ex:
- logger.error(ex)
- finally:
- return result
- try:
- root_ptah = self._root_path()
- root = self.file.require_group(root_ptah)
- tmp = sub(root)
- days = [int(day) for day in tmp]
- days.sort()
- return days
- except Exception as ex:
- logger.error(ex)
- return []
- def near_stamp(self,time_stamp,left = True):
- if len(self._days) == 0:
- return None
- day = day_stamp(time_stamp)
- pos = time_stamp - day
- if left:
- while True:
- if day >= self._days[0]:
- break
- else:
- day += 86400
- else:
- while True:
- if day <= self._days[-1]:
- break
- else:
- day -= 86400
- return pos + day
- class EMchPosmap(IntEnum):
- submit_count = 0
- submit_amounts = 1
- succ_count = 2
- succ_mch_amounts = 3
- succ_ch_amounts = 4
- fail_count = 5
- fail_mch_amounts = 6
- @staticmethod
- def dim():
- return 7
- pass
- class EChPosmap(IntEnum):
- commit_count = 0
- commit_amounts = 1
- succ_count = 2
- succ_amounts = 3
- succ_periods = 4
- fail_count = 5
- fail_amounts = 6
- fail_periods = 7
- @staticmethod
- def dim():
- return 8
- pass
- class ENetPosmap(IntEnum):
- succ_count = 0
- fail_count = 1
- @staticmethod
- def dim():
- return 2
- pass
|