DataStream.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import time
  2. from abc import ABCMeta, abstractmethod, ABC
  3. from datetime import timedelta
  4. from mpi4py import MPI
  5. import h5py
  6. from enum import IntEnum
  7. import threading
  8. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border',
  9. 'calc_interval']
  10. import logging
  11. logger = logging.getLogger('stream')
  12. def day_stamp(stamp):
  13. import time as stime
  14. stamp = int(stamp)
  15. st_time = stime.gmtime(stamp + 8 * 3600)
  16. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  17. today = stamp - diff.total_seconds()
  18. return int(today)
  19. def open_hdf5(file, is_wirte):
  20. if is_wirte:
  21. # 不知道为何在内存中加载这么慢
  22. # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True
  23. # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True
  24. #这种加载方式速度可以
  25. return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151)
  26. else:
  27. return h5py.File(file, 'r', libver='latest') #plot/test_h5py.py
  28. def mktime(strtime):
  29. tdata = time.strptime(strtime, "%Y-%m-%d %H:%M:%S")
  30. time_stamp = int(time.mktime(tdata))
  31. return time_stamp
  32. def time_border(interval, time_stamp, lt):
  33. day = day_stamp(time_stamp)
  34. pos = time_stamp - day
  35. if lt:
  36. pos = pos - pos % interval
  37. elif pos % interval > 0:
  38. pos += interval - pos % interval
  39. else:
  40. pos = pos
  41. return pos + day
  42. def calc_interval(start, end):
  43. period = end - start
  44. segment = int(period / 24)
  45. if segment == 0:
  46. return 1
  47. else:
  48. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 10800, 14400, 18000, 21600, 25200, 28800, 32400, 36000, 39600, 43200,
  49. 86400, 172800]
  50. for i in intervals:
  51. if segment < i:
  52. return i
  53. pass
  54. def span_days(start_time, end_time):
  55. start_day = day_stamp(start_time)
  56. end_day = day_stamp(end_time)
  57. if end_day == end_time:
  58. end_day = end_day - 1
  59. days = list()
  60. while start_day <= end_day:
  61. days.append(start_day)
  62. start_day += 86400
  63. return days
  64. def ch_calc_cfgs():
  65. start_period = 3600
  66. ratio_period = 900
  67. speed_period = 60
  68. monitor_period = 1800
  69. cdf_speed_period = 60
  70. return start_period, ratio_period, speed_period, monitor_period, cdf_speed_period
  71. class DataWriteStream(metaclass=ABCMeta):
  72. _version = 20200618
  73. _paths = dict()
  74. _lock = threading.Lock()
  75. def __init__(self, hfive):
  76. self._hfive = hfive
  77. # Getter function
  78. @property
  79. def file(self):
  80. return self._hfive
  81. # Setter function
  82. @file.setter
  83. def file(self, value):
  84. self._hfive = value
  85. @abstractmethod
  86. def write(self, method, params):
  87. pass
  88. def flush(self):
  89. hfive = self.file
  90. hfive.flush()
  91. def close(self):
  92. self._lock.acquire()
  93. if self._hfive is not None:
  94. self._hfive.close()
  95. self._hfive = None
  96. self._lock.release()
  97. pass
  98. class DataReadStream(metaclass=ABCMeta):
  99. _version = 20200618
  100. _days = list()
  101. def __init__(self, hfive):
  102. self._hfive = hfive
  103. self._days = self._getdays()
  104. # stime = lambda t: time.strftime('%y-%m-%d', time.localtime(t))
  105. # sdays = [stime(day) for day in self._days]
  106. # logger.debug(sdays)
  107. def __del__(self):
  108. pass
  109. def days(self):
  110. return self._days
  111. # Getter function
  112. @property
  113. def file(self):
  114. return self._hfive
  115. # Setter function
  116. @file.setter
  117. def file(self, value):
  118. self._hfive = value
  119. def close(self):
  120. if self._hfive is not None:
  121. self._hfive.close()
  122. self._hfive = None
  123. def _root_path(self, day_stamp=None):
  124. if day_stamp is None:
  125. return f'/{self._version}/'
  126. else:
  127. return f'/{self._version}/{day_stamp}/'
  128. def datasets(self, day_stamp):
  129. def dir(group):
  130. result = []
  131. for name, sub in group.items():
  132. if isinstance(sub, h5py.Group):
  133. result.extend(dir(sub))
  134. else:
  135. result.append(sub.name)
  136. return result
  137. try:
  138. path = self._root_path(day_stamp)
  139. if path in self.file:
  140. group = self.file.require_group(path)
  141. days = dir(group)
  142. return days
  143. else:
  144. return []
  145. except Exception as ex:
  146. logger.error(ex)
  147. return []
  148. def _getdays(self):
  149. def sub(root):
  150. result = []
  151. try:
  152. for name, sub in root.items():
  153. if isinstance(sub, h5py.Group):
  154. result.append(name)
  155. except Exception as ex:
  156. logger.error(ex)
  157. finally:
  158. return result
  159. try:
  160. root_ptah = self._root_path()
  161. root = self.file.require_group(root_ptah)
  162. tmp = sub(root)
  163. days = [int(day) for day in tmp]
  164. days.sort()
  165. return days
  166. except Exception as ex:
  167. logger.error(ex)
  168. return []
  169. class EMchPosmap(IntEnum):
  170. submit_count = 0
  171. submit_amounts = 1
  172. succ_count = 2
  173. succ_mch_amounts = 3
  174. succ_ch_amounts = 4
  175. fail_count = 5
  176. fail_mch_amounts = 6
  177. @staticmethod
  178. def dim():
  179. return 7
  180. pass
  181. class EChPosmap(IntEnum):
  182. commit_count = 0
  183. commit_amounts = 1
  184. succ_count = 2
  185. succ_amounts = 3
  186. succ_periods = 4
  187. fail_count = 5
  188. fail_amounts = 6
  189. fail_periods = 7
  190. succ_mch_amounts = 8
  191. @staticmethod
  192. def dim():
  193. return 9
  194. @staticmethod
  195. def old_dim():
  196. return 8
  197. pass
  198. class ENetPosmap(IntEnum):
  199. succ_count = 0
  200. fail_count = 1
  201. @staticmethod
  202. def dim():
  203. return 2
  204. pass