123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import time
- from abc import ABCMeta, abstractmethod, ABC
- from datetime import timedelta
- from mpi4py import MPI
- import h5py
- from enum import IntEnum
- import threading
- __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border',
- 'calc_interval']
- import logging
- logger = logging.getLogger('stream')
- def day_stamp(stamp):
- import time as stime
- stamp = int(stamp)
- st_time = stime.gmtime(stamp + 8 * 3600)
- diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
- today = stamp - diff.total_seconds()
- return int(today)
- def open_hdf5(file, is_wirte):
- if is_wirte:
- # 不知道为何在内存中加载这么慢
- # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True
- # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True
- #这种加载方式速度可以
- return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151)
- else:
- return h5py.File(file, 'r') #, libver='latest') #plot/test_h5py.py
- def time_border(interval, time_stamp, lt):
- day = day_stamp(time_stamp)
- pos = time_stamp - day
- if lt:
- pos = pos - pos % interval
- elif pos % interval > 0:
- pos += interval - pos % interval
- else:
- pos = pos
- return pos + day
- def calc_interval(start, end):
- period = end - start
- segment = int(period / 24)
- if segment == 0:
- return 1
- else:
- intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 10800, 14400, 18000, 21600, 25200, 28800, 32400, 36000, 39600, 43200,
- 86400, 172800]
- for i in intervals:
- if segment < i:
- return i
- pass
- def span_days(start_time, end_time):
- start_day = day_stamp(start_time)
- end_day = day_stamp(end_time)
- if end_day == end_time:
- end_day = end_day - 1
- days = list()
- while start_day <= end_day:
- days.append(start_day)
- start_day += 86400
- return days
- def ch_calc_cfgs():
- ratio_period = 1800
- speed_period = 300
- monitor_period = 600
- return ratio_period,speed_period,monitor_period
- class DataWriteStream(metaclass=ABCMeta):
- _version = 20200618
- _paths = dict()
- _lock = threading.Lock()
- def __init__(self, hfive):
- self._hfive = hfive
- # Getter function
- @property
- def file(self):
- return self._hfive
- # Setter function
- @file.setter
- def file(self, value):
- self._hfive = value
- @abstractmethod
- def write(self, method, params):
- pass
- def flush(self):
- hfive = self.file
- hfive.flush()
- def close(self):
- self._lock.acquire()
- if self._hfive is not None:
- self._hfive.close()
- self._hfive = None
- self._lock.release()
- pass
- class DataReadStream(metaclass=ABCMeta):
- _version = 20200618
- _days = list()
- def __init__(self, hfive):
- self._hfive = hfive
- self._days = self._getdays()
- stime = lambda t: time.strftime('%y-%m-%d %H:%M:%S', time.localtime(t))
- sdays = [stime(day) for day in self._days]
- logger.debug(sdays)
- def __del__(self):
- pass
- def days(self):
- return self._days
- # Getter function
- @property
- def file(self):
- return self._hfive
- # Setter function
- @file.setter
- def file(self, value):
- self._hfive = value
- def close(self):
- if self._hfive is not None:
- self._hfive.close()
- self._hfive = None
- def _root_path(self, day_stamp=None):
- if day_stamp is None:
- return f'/{self._version}/'
- else:
- return f'/{self._version}/{day_stamp}/'
- def datasets(self, day_stamp):
- def dir(group):
- result = []
- for name, sub in group.items():
- if isinstance(sub, h5py.Group):
- result.extend(dir(sub))
- else:
- result.append(sub.name)
- return result
- try:
- path = self._root_path(day_stamp)
- if path in self.file:
- group = self.file.require_group(path)
- days = dir(group)
- return days
- else:
- return []
- except Exception as ex:
- logger.error(ex)
- return []
- def _getdays(self):
- def sub(root):
- result = []
- try:
- for name, sub in root.items():
- if isinstance(sub, h5py.Group):
- result.append(name)
- except Exception as ex:
- logger.error(ex)
- finally:
- return result
- try:
- root_ptah = self._root_path()
- root = self.file.require_group(root_ptah)
- tmp = sub(root)
- days = [int(day) for day in tmp]
- days.sort()
- return days
- except Exception as ex:
- logger.error(ex)
- return []
- class EMchPosmap(IntEnum):
- submit_count = 0
- submit_amounts = 1
- succ_count = 2
- succ_mch_amounts = 3
- succ_ch_amounts = 4
- fail_count = 5
- fail_mch_amounts = 6
- @staticmethod
- def dim():
- return 7
- pass
- class EChPosmap(IntEnum):
- commit_count = 0
- commit_amounts = 1
- succ_count = 2
- succ_amounts = 3
- succ_periods = 4
- fail_count = 5
- fail_amounts = 6
- fail_periods = 7
- succ_mch_amounts = 8
- @staticmethod
- def dim():
- return 9
- @staticmethod
- def old_dim():
- return 8
- pass
- class ENetPosmap(IntEnum):
- succ_count = 0
- fail_count = 1
- @staticmethod
- def dim():
- return 2
- pass
|