DataStream.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import time
  2. from abc import ABCMeta, abstractmethod, ABC
  3. from datetime import timedelta
  4. from mpi4py import MPI
  5. import h5py
  6. from enum import IntEnum
  7. import threading
  8. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border',
  9. 'calc_interval']
  10. import logging
  11. logger = logging.getLogger('stream')
  12. def day_stamp(stamp):
  13. import time as stime
  14. stamp = int(stamp)
  15. st_time = stime.gmtime(stamp + 8 * 3600)
  16. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  17. today = stamp - diff.total_seconds()
  18. return int(today)
  19. def open_hdf5(file, is_wirte):
  20. if is_wirte:
  21. # 不知道为何在内存中加载这么慢
  22. # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True
  23. # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True
  24. #这种加载方式速度可以
  25. return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151)
  26. else:
  27. return h5py.File(file, 'r') #, libver='latest') #plot/test_h5py.py
  28. def time_border(interval, time_stamp, lt):
  29. day = day_stamp(time_stamp)
  30. pos = time_stamp - day
  31. if lt:
  32. pos = pos - pos % interval
  33. elif pos % interval > 0:
  34. pos += interval - pos % interval
  35. else:
  36. pos = pos
  37. return pos + day
  38. def calc_interval(start, end):
  39. period = end - start
  40. segment = int(period / 24)
  41. if segment == 0:
  42. return 1
  43. else:
  44. intervals = [30, 60, 300, 600, 900, 1800, 3600, 5400, 7200, 10800, 14400, 21600, 28800, 36000, 43200, 86400, 172800]
  45. for i in intervals:
  46. if segment < i:
  47. return i
  48. pass
  49. def span_days(start_time, end_time):
  50. start_day = day_stamp(start_time)
  51. end_day = day_stamp(end_time)
  52. if end_day == end_time:
  53. end_day = end_day - 1
  54. days = list()
  55. while start_day <= end_day:
  56. days.append(start_day)
  57. start_day += 86400
  58. return days
  59. class DataWriteStream(metaclass=ABCMeta):
  60. _version = 20200618
  61. _paths = dict()
  62. _lock = threading.Lock()
  63. def __init__(self, hfive):
  64. self._hfive = hfive
  65. # Getter function
  66. @property
  67. def file(self):
  68. return self._hfive
  69. # Setter function
  70. @file.setter
  71. def file(self, value):
  72. self._hfive = value
  73. @abstractmethod
  74. def write(self, method, params):
  75. pass
  76. def flush(self):
  77. hfive = self.file
  78. hfive.flush()
  79. def close(self):
  80. self._lock.acquire()
  81. if self._hfive is not None:
  82. self._hfive.close()
  83. self._hfive = None
  84. self._lock.release()
  85. pass
  86. class DataReadStream(metaclass=ABCMeta):
  87. _version = 20200618
  88. _days = list()
  89. def __init__(self, hfive):
  90. self._hfive = hfive
  91. self._days = self._getdays()
  92. stime = lambda t: time.strftime('%y-%m-%d %H:%M:%S', time.localtime(t))
  93. sdays = [stime(day) for day in self._days]
  94. logger.debug(sdays)
  95. def __del__(self):
  96. pass
  97. def days(self):
  98. return self._days
  99. # Getter function
  100. @property
  101. def file(self):
  102. return self._hfive
  103. # Setter function
  104. @file.setter
  105. def file(self, value):
  106. self._hfive = value
  107. def close(self):
  108. if self._hfive is not None:
  109. self._hfive.close()
  110. self._hfive = None
  111. def _root_path(self, day_stamp=None):
  112. if day_stamp is None:
  113. return f'/{self._version}/'
  114. else:
  115. return f'/{self._version}/{day_stamp}/'
  116. def datasets(self, day_stamp):
  117. def dir(group):
  118. result = []
  119. for name, sub in group.items():
  120. if isinstance(sub, h5py.Group):
  121. result.extend(dir(sub))
  122. else:
  123. result.append(sub.name)
  124. return result
  125. try:
  126. path = self._root_path(day_stamp)
  127. if path in self.file:
  128. group = self.file.require_group(path)
  129. days = dir(group)
  130. return days
  131. else:
  132. return []
  133. except Exception as ex:
  134. logger.error(ex)
  135. return []
  136. def _getdays(self):
  137. def sub(root):
  138. result = []
  139. try:
  140. for name, sub in root.items():
  141. if isinstance(sub, h5py.Group):
  142. result.append(name)
  143. except Exception as ex:
  144. logger.error(ex)
  145. finally:
  146. return result
  147. try:
  148. root_ptah = self._root_path()
  149. root = self.file.require_group(root_ptah)
  150. tmp = sub(root)
  151. days = [int(day) for day in tmp]
  152. days.sort()
  153. return days
  154. except Exception as ex:
  155. logger.error(ex)
  156. return []
  157. class EMchPosmap(IntEnum):
  158. submit_count = 0
  159. submit_amounts = 1
  160. succ_count = 2
  161. succ_mch_amounts = 3
  162. succ_ch_amounts = 4
  163. fail_count = 5
  164. fail_mch_amounts = 6
  165. @staticmethod
  166. def dim():
  167. return 7
  168. pass
  169. class EChPosmap(IntEnum):
  170. commit_count = 0
  171. commit_amounts = 1
  172. succ_count = 2
  173. succ_amounts = 3
  174. succ_periods = 4
  175. fail_count = 5
  176. fail_amounts = 6
  177. fail_periods = 7
  178. @staticmethod
  179. def dim():
  180. return 8
  181. pass
  182. class ENetPosmap(IntEnum):
  183. succ_count = 0
  184. fail_count = 1
  185. @staticmethod
  186. def dim():
  187. return 2
  188. pass