DataStream.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import time
  2. from abc import ABCMeta, abstractmethod, ABC
  3. from datetime import timedelta
  4. from mpi4py import MPI
  5. import h5py
  6. from enum import IntEnum
  7. import threading
  8. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border',
  9. 'calc_interval']
  10. import logging
  11. logger = logging.getLogger('stream')
  12. def day_stamp(stamp):
  13. import time as stime
  14. stamp = int(stamp)
  15. st_time = stime.gmtime(stamp + 8 * 3600)
  16. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  17. today = stamp - diff.total_seconds()
  18. return int(today)
  19. def open_hdf5(file, is_wirte):
  20. if is_wirte:
  21. # 不知道为何在内存中加载这么慢
  22. # return h5py.File(file, 'a', libver='latest', driver='core') # , swmr=True, backing_store=True
  23. # return h5py.File(file, driver='core',backing_store=True) # , swmr=True, backing_store=True
  24. #这种加载方式速度可以
  25. return h5py.File(file, 'a', libver='latest', rdcc_nbytes=1024 ** 3, rdcc_w0=0.25, rdcc_nslots=6151)
  26. else:
  27. return h5py.File(file, 'r') #, libver='latest') #plot/test_h5py.py
  28. def time_border(interval, time_stamp, lt):
  29. day = day_stamp(time_stamp)
  30. pos = time_stamp - day
  31. if lt:
  32. pos = pos - pos % interval
  33. elif pos % interval > 0:
  34. pos += interval - pos % interval
  35. else:
  36. pos = pos
  37. return pos + day
  38. def calc_interval(start, end):
  39. period = end - start
  40. segment = int(period / 24)
  41. if segment == 0:
  42. return 1
  43. else:
  44. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 10800, 14400, 18000, 21600, 25200, 28800, 32400, 36000, 39600, 43200,
  45. 57600, 86400, 129600, 172800]
  46. for i in intervals:
  47. if segment < i:
  48. return i
  49. pass
  50. def span_days(start_time, end_time):
  51. start_day = day_stamp(start_time)
  52. end_day = day_stamp(end_time)
  53. if end_day == end_time:
  54. end_day = end_day - 1
  55. days = list()
  56. while start_day <= end_day:
  57. days.append(start_day)
  58. start_day += 86400
  59. return days
  60. class DataWriteStream(metaclass=ABCMeta):
  61. _version = 20200618
  62. _paths = dict()
  63. _lock = threading.Lock()
  64. def __init__(self, hfive):
  65. self._hfive = hfive
  66. # Getter function
  67. @property
  68. def file(self):
  69. return self._hfive
  70. # Setter function
  71. @file.setter
  72. def file(self, value):
  73. self._hfive = value
  74. @abstractmethod
  75. def write(self, method, params):
  76. pass
  77. def flush(self):
  78. hfive = self.file
  79. hfive.flush()
  80. def close(self):
  81. self._lock.acquire()
  82. if self._hfive is not None:
  83. self._hfive.close()
  84. self._hfive = None
  85. self._lock.release()
  86. pass
  87. class DataReadStream(metaclass=ABCMeta):
  88. _version = 20200618
  89. _days = list()
  90. def __init__(self, hfive):
  91. self._hfive = hfive
  92. self._days = self._getdays()
  93. stime = lambda t: time.strftime('%y-%m-%d %H:%M:%S', time.localtime(t))
  94. sdays = [stime(day) for day in self._days]
  95. logger.debug(sdays)
  96. def __del__(self):
  97. pass
  98. def days(self):
  99. return self._days
  100. # Getter function
  101. @property
  102. def file(self):
  103. return self._hfive
  104. # Setter function
  105. @file.setter
  106. def file(self, value):
  107. self._hfive = value
  108. def close(self):
  109. if self._hfive is not None:
  110. self._hfive.close()
  111. self._hfive = None
  112. def _root_path(self, day_stamp=None):
  113. if day_stamp is None:
  114. return f'/{self._version}/'
  115. else:
  116. return f'/{self._version}/{day_stamp}/'
  117. def datasets(self, day_stamp):
  118. def dir(group):
  119. result = []
  120. for name, sub in group.items():
  121. if isinstance(sub, h5py.Group):
  122. result.extend(dir(sub))
  123. else:
  124. result.append(sub.name)
  125. return result
  126. try:
  127. path = self._root_path(day_stamp)
  128. if path in self.file:
  129. group = self.file.require_group(path)
  130. days = dir(group)
  131. return days
  132. else:
  133. return []
  134. except Exception as ex:
  135. logger.error(ex)
  136. return []
  137. def _getdays(self):
  138. def sub(root):
  139. result = []
  140. try:
  141. for name, sub in root.items():
  142. if isinstance(sub, h5py.Group):
  143. result.append(name)
  144. except Exception as ex:
  145. logger.error(ex)
  146. finally:
  147. return result
  148. try:
  149. root_ptah = self._root_path()
  150. root = self.file.require_group(root_ptah)
  151. tmp = sub(root)
  152. days = [int(day) for day in tmp]
  153. days.sort()
  154. return days
  155. except Exception as ex:
  156. logger.error(ex)
  157. return []
  158. class EMchPosmap(IntEnum):
  159. submit_count = 0
  160. submit_amounts = 1
  161. succ_count = 2
  162. succ_mch_amounts = 3
  163. succ_ch_amounts = 4
  164. fail_count = 5
  165. fail_mch_amounts = 6
  166. @staticmethod
  167. def dim():
  168. return 7
  169. pass
  170. class EChPosmap(IntEnum):
  171. commit_count = 0
  172. commit_amounts = 1
  173. succ_count = 2
  174. succ_amounts = 3
  175. succ_periods = 4
  176. fail_count = 5
  177. fail_amounts = 6
  178. fail_periods = 7
  179. succ_mch_amounts = 8
  180. @staticmethod
  181. def dim():
  182. return 9
  183. @staticmethod
  184. def old_dim():
  185. return 8
  186. pass
  187. class ENetPosmap(IntEnum):
  188. succ_count = 0
  189. fail_count = 1
  190. @staticmethod
  191. def dim():
  192. return 2
  193. pass