DataStream.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. import time
  2. from abc import ABCMeta, abstractmethod, ABC
  3. from datetime import timedelta
  4. from mpi4py import MPI
  5. import h5py
  6. from enum import IntEnum
  7. import threading
  8. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border',
  9. 'calc_interval']
  10. import logging
  11. logger = logging.getLogger('stream')
  12. def day_stamp(stamp):
  13. import time as stime
  14. stamp = int(stamp)
  15. st_time = stime.gmtime(stamp + 8 * 3600)
  16. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  17. today = stamp - diff.total_seconds()
  18. return int(today)
  19. def open_hdf5(file, is_wirte):
  20. if is_wirte:
  21. # 不知道为何在内存中加载这么慢
  22. # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True
  23. # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True
  24. #这种加载方式速度可以
  25. return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151)
  26. else:
  27. return h5py.File(file, 'r') #, libver='latest') #plot/test_h5py.py
  28. def time_border(interval, time_stamp, lt):
  29. day = day_stamp(time_stamp)
  30. pos = time_stamp - day
  31. if lt:
  32. pos = pos - pos % interval
  33. elif pos % interval > 0:
  34. pos += interval - pos % interval
  35. else:
  36. pos = pos
  37. return pos + day
  38. def calc_interval(start, end):
  39. period = end - start
  40. segment = int(period / 24)
  41. if segment == 0:
  42. return 1
  43. else:
  44. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 10800, 14400, 18000, 21600, 25200, 28800, 32400, 36000, 39600, 43200,
  45. 86400, 172800]
  46. for i in intervals:
  47. if segment < i:
  48. return i
  49. pass
  50. def span_days(start_time, end_time):
  51. start_day = day_stamp(start_time)
  52. end_day = day_stamp(end_time)
  53. if end_day == end_time:
  54. end_day = end_day - 1
  55. days = list()
  56. while start_day <= end_day:
  57. days.append(start_day)
  58. start_day += 86400
  59. return days
  60. def ch_calc_cfgs():
  61. ratio_period = 1800
  62. speed_period = 300
  63. monitor_period = 600
  64. start_period = 1800
  65. cdf_speed_period = 60
  66. return start_period, ratio_period, speed_period, monitor_period, cdf_speed_period
  67. class DataWriteStream(metaclass=ABCMeta):
  68. _version = 20200618
  69. _paths = dict()
  70. _lock = threading.Lock()
  71. def __init__(self, hfive):
  72. self._hfive = hfive
  73. # Getter function
  74. @property
  75. def file(self):
  76. return self._hfive
  77. # Setter function
  78. @file.setter
  79. def file(self, value):
  80. self._hfive = value
  81. @abstractmethod
  82. def write(self, method, params):
  83. pass
  84. def flush(self):
  85. hfive = self.file
  86. hfive.flush()
  87. def close(self):
  88. self._lock.acquire()
  89. if self._hfive is not None:
  90. self._hfive.close()
  91. self._hfive = None
  92. self._lock.release()
  93. pass
  94. class DataReadStream(metaclass=ABCMeta):
  95. _version = 20200618
  96. _days = list()
  97. def __init__(self, hfive):
  98. self._hfive = hfive
  99. self._days = self._getdays()
  100. # stime = lambda t: time.strftime('%y-%m-%d', time.localtime(t))
  101. # sdays = [stime(day) for day in self._days]
  102. # logger.debug(sdays)
  103. def __del__(self):
  104. pass
  105. def days(self):
  106. return self._days
  107. # Getter function
  108. @property
  109. def file(self):
  110. return self._hfive
  111. # Setter function
  112. @file.setter
  113. def file(self, value):
  114. self._hfive = value
  115. def close(self):
  116. if self._hfive is not None:
  117. self._hfive.close()
  118. self._hfive = None
  119. def _root_path(self, day_stamp=None):
  120. if day_stamp is None:
  121. return f'/{self._version}/'
  122. else:
  123. return f'/{self._version}/{day_stamp}/'
  124. def datasets(self, day_stamp):
  125. def dir(group):
  126. result = []
  127. for name, sub in group.items():
  128. if isinstance(sub, h5py.Group):
  129. result.extend(dir(sub))
  130. else:
  131. result.append(sub.name)
  132. return result
  133. try:
  134. path = self._root_path(day_stamp)
  135. if path in self.file:
  136. group = self.file.require_group(path)
  137. days = dir(group)
  138. return days
  139. else:
  140. return []
  141. except Exception as ex:
  142. logger.error(ex)
  143. return []
  144. def _getdays(self):
  145. def sub(root):
  146. result = []
  147. try:
  148. for name, sub in root.items():
  149. if isinstance(sub, h5py.Group):
  150. result.append(name)
  151. except Exception as ex:
  152. logger.error(ex)
  153. finally:
  154. return result
  155. try:
  156. root_ptah = self._root_path()
  157. root = self.file.require_group(root_ptah)
  158. tmp = sub(root)
  159. days = [int(day) for day in tmp]
  160. days.sort()
  161. return days
  162. except Exception as ex:
  163. logger.error(ex)
  164. return []
  165. class EMchPosmap(IntEnum):
  166. submit_count = 0
  167. submit_amounts = 1
  168. succ_count = 2
  169. succ_mch_amounts = 3
  170. succ_ch_amounts = 4
  171. fail_count = 5
  172. fail_mch_amounts = 6
  173. @staticmethod
  174. def dim():
  175. return 7
  176. pass
  177. class EChPosmap(IntEnum):
  178. commit_count = 0
  179. commit_amounts = 1
  180. succ_count = 2
  181. succ_amounts = 3
  182. succ_periods = 4
  183. fail_count = 5
  184. fail_amounts = 6
  185. fail_periods = 7
  186. succ_mch_amounts = 8
  187. @staticmethod
  188. def dim():
  189. return 9
  190. @staticmethod
  191. def old_dim():
  192. return 8
  193. pass
  194. class ENetPosmap(IntEnum):
  195. succ_count = 0
  196. fail_count = 1
  197. @staticmethod
  198. def dim():
  199. return 2
  200. pass