DataStream.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. import time
  2. from abc import ABCMeta, abstractmethod, ABC
  3. from datetime import timedelta
  4. from mpi4py import MPI
  5. import h5py
  6. from enum import IntEnum
  7. import threading
  8. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border',
  9. 'calc_interval']
  10. import logging
  11. logger = logging.getLogger('stream')
  12. def day_stamp(stamp):
  13. import time as stime
  14. stamp = int(stamp)
  15. st_time = stime.gmtime(stamp + 8 * 3600)
  16. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  17. today = stamp - diff.total_seconds()
  18. return int(today)
  19. def open_hdf5(file, is_wirte):
  20. if is_wirte:
  21. # 不知道为何在内存中加载这么慢
  22. # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True
  23. # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True
  24. #这种加载方式速度可以
  25. return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151)
  26. else:
  27. return h5py.File(file, 'r', libver='latest') #plot/test_h5py.py
  28. def time_border(interval, time_stamp, lt):
  29. day = day_stamp(time_stamp)
  30. pos = time_stamp - day
  31. if lt:
  32. pos = pos - pos % interval
  33. elif pos % interval > 0:
  34. pos += interval - pos % interval
  35. else:
  36. pos = pos
  37. return pos + day
  38. def calc_interval(start, end):
  39. period = end - start
  40. segment = int(period / 24)
  41. if segment == 0:
  42. return 1
  43. else:
  44. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 14400, 21600, 28800, 36000, 43200, 86400, 172800]
  45. for i in intervals:
  46. if segment < i:
  47. return i
  48. pass
  49. def span_days(start_time, end_time):
  50. start_day = day_stamp(start_time)
  51. end_day = day_stamp(end_time)
  52. days = list()
  53. while start_day <= end_day:
  54. days.append(start_day)
  55. start_day += 86400
  56. return days
  57. class DataWriteStream(metaclass=ABCMeta):
  58. _version = 20200618
  59. _paths = dict()
  60. _lock = threading.Lock()
  61. def __init__(self, hfive):
  62. self._hfive = hfive
  63. # Getter function
  64. @property
  65. def file(self):
  66. return self._hfive
  67. # Setter function
  68. @file.setter
  69. def file(self, value):
  70. self._hfive = value
  71. @abstractmethod
  72. def write(self, method, params):
  73. pass
  74. def flush(self):
  75. hfive = self.file
  76. hfive.flush()
  77. def close(self):
  78. self._lock.acquire()
  79. if self._hfive is not None:
  80. self._hfive.close()
  81. self._hfive = None
  82. self._lock.release()
  83. pass
  84. class DataReadStream(metaclass=ABCMeta):
  85. _version = 20200618
  86. _days = list()
  87. def __init__(self, hfive):
  88. self._hfive = hfive
  89. self._days = self._getdays()
  90. stime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))
  91. sdays = [stime(day) for day in self._days]
  92. logger.debug(sdays)
  93. logger.debug(self._days)
  94. def __del__(self):
  95. pass
  96. def days(self):
  97. return self._days
  98. # Getter function
  99. @property
  100. def file(self):
  101. return self._hfive
  102. # Setter function
  103. @file.setter
  104. def file(self, value):
  105. self._hfive = value
  106. def close(self):
  107. if self._hfive is not None:
  108. self._hfive.close()
  109. self._hfive = None
  110. def _root_path(self, day_stamp=None):
  111. if day_stamp is None:
  112. return f'/{self._version}/'
  113. else:
  114. return f'/{self._version}/{day_stamp}/'
  115. def datasets(self, day_stamp):
  116. def dir(group):
  117. result = []
  118. for name, sub in group.items():
  119. if isinstance(sub, h5py.Group):
  120. result.extend(dir(sub))
  121. else:
  122. result.append(sub.name)
  123. return result
  124. try:
  125. path = self._root_path(day_stamp)
  126. if path in self.file:
  127. group = self.file.require_group(path)
  128. days = dir(group)
  129. return days
  130. else:
  131. return []
  132. except Exception as ex:
  133. logger.error(ex)
  134. return []
  135. def _getdays(self):
  136. def sub(root):
  137. result = []
  138. try:
  139. for name, sub in root.items():
  140. if isinstance(sub, h5py.Group):
  141. result.append(name)
  142. except Exception as ex:
  143. logger.error(ex)
  144. finally:
  145. return result
  146. try:
  147. root_ptah = self._root_path()
  148. root = self.file.require_group(root_ptah)
  149. tmp = sub(root)
  150. days = [int(day) for day in tmp]
  151. days.sort()
  152. return days
  153. except Exception as ex:
  154. logger.error(ex)
  155. return []
  156. def near_stamp(self, time_stamp, left=True):
  157. if len(self._days) == 0:
  158. return None
  159. if time_stamp > int(time.time()):
  160. time_stamp = int(time.time())
  161. if left:
  162. min = self._days[0]
  163. time_stamp = min if time_stamp < min else time_stamp
  164. else:
  165. max = self._days[-1] + 86400 - 1
  166. time_stamp = max if time_stamp > max else time_stamp
  167. return time_stamp
  168. class EMchPosmap(IntEnum):
  169. submit_count = 0
  170. submit_amounts = 1
  171. succ_count = 2
  172. succ_mch_amounts = 3
  173. succ_ch_amounts = 4
  174. fail_count = 5
  175. fail_mch_amounts = 6
  176. @staticmethod
  177. def dim():
  178. return 7
  179. pass
  180. class EChPosmap(IntEnum):
  181. commit_count = 0
  182. commit_amounts = 1
  183. succ_count = 2
  184. succ_amounts = 3
  185. succ_periods = 4
  186. fail_count = 5
  187. fail_amounts = 6
  188. fail_periods = 7
  189. @staticmethod
  190. def dim():
  191. return 8
  192. pass
  193. class ENetPosmap(IntEnum):
  194. succ_count = 0
  195. fail_count = 1
  196. @staticmethod
  197. def dim():
  198. return 2
  199. pass