DataStream.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. from abc import ABCMeta, abstractmethod, ABC
  2. from datetime import timedelta
  3. from mpi4py import MPI
  4. import h5py
  5. from enum import IntEnum
  6. import threading
  7. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border']
  8. import logging
  9. logger = logging.getLogger('stream')
  10. def day_stamp(stamp):
  11. import time as stime
  12. stamp = int(stamp)
  13. st_time = stime.gmtime(stamp + 8 * 3600)
  14. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  15. today = stamp - diff.total_seconds()
  16. return int(today)
  17. def open_hdf5(file, is_wirte):
  18. if is_wirte:
  19. return h5py.File(file, 'a')
  20. else:
  21. return h5py.File(file, 'r')
  22. def time_border(interval, time_stamp, lt):
  23. day = day_stamp(time_stamp)
  24. pos = time_stamp - day
  25. if lt:
  26. pos = pos - pos % interval
  27. elif pos % interval > 0:
  28. pos += interval - pos % interval
  29. else:
  30. pos = pos
  31. return pos + day
  32. def calc_interval(start, end):
  33. period = end - start
  34. segment = int(period / 24)
  35. if segment == 0:
  36. return 1
  37. else:
  38. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 14400, 21600, 28800, 36000, 43200, 86400, 172800]
  39. for i in intervals:
  40. if segment < i:
  41. return i
  42. pass
  43. def span_days(start_time, end_time):
  44. start_day = day_stamp(start_time)
  45. end_day = day_stamp(end_time)
  46. days = list()
  47. while start_day <= end_day:
  48. days.append(start_day)
  49. start_day += 86400
  50. return days
  51. class DataWriteStream(metaclass=ABCMeta):
  52. _version = 20200618
  53. _paths = dict()
  54. _lock = threading.Lock()
  55. def __init__(self, hfive):
  56. self._hfive = hfive
  57. # Getter function
  58. @property
  59. def file(self):
  60. return self._hfive
  61. # Setter function
  62. @file.setter
  63. def file(self, value):
  64. self._hfive = value
  65. @abstractmethod
  66. def write(self, method, params):
  67. pass
  68. def close(self):
  69. self._lock.acquire()
  70. if self._hfive is not None:
  71. self._hfive.close()
  72. self._hfive = None
  73. self._lock.release()
  74. pass
  75. class DataReadStream(metaclass=ABCMeta):
  76. _version = 20200618
  77. _days = list()
  78. def __init__(self, hfive):
  79. self._hfive = hfive
  80. self._days = self._getdays()
  81. logger.debug(self._days)
  82. def __del__(self):
  83. pass
  84. def days(self):
  85. return self._days
  86. # Getter function
  87. @property
  88. def file(self):
  89. return self._hfive
  90. # Setter function
  91. @file.setter
  92. def file(self, value):
  93. self._hfive = value
  94. def close(self):
  95. if self._hfive is not None:
  96. self._hfive.close()
  97. self._hfive = None
  98. def _root_path(self, day_stamp=None):
  99. if day_stamp is None:
  100. return f'/{self._version}/'
  101. else:
  102. return f'/{self._version}/{day_stamp}/'
  103. def datasets(self, day_stamp):
  104. def dir(group):
  105. result = []
  106. for name, sub in group.items():
  107. if isinstance(sub, h5py.Group):
  108. result.extend(dir(sub))
  109. else:
  110. result.append(sub.name)
  111. return result
  112. try:
  113. path = self._root_path(day_stamp)
  114. if path in self.file:
  115. group = self.file.require_group(path)
  116. days = dir(group)
  117. return days
  118. else:
  119. return []
  120. except Exception as ex:
  121. logger.error(ex)
  122. return []
  123. def _getdays(self):
  124. def sub(root):
  125. result = []
  126. try:
  127. for name, sub in root.items():
  128. if isinstance(sub, h5py.Group):
  129. result.append(name)
  130. except Exception as ex:
  131. logger.error(ex)
  132. finally:
  133. return result
  134. try:
  135. root_ptah = self._root_path()
  136. root = self.file.require_group(root_ptah)
  137. tmp = sub(root)
  138. days = [int(day) for day in tmp]
  139. days.sort()
  140. return days
  141. except Exception as ex:
  142. logger.error(ex)
  143. return []
  144. def near_stamp(self, time_stamp, left=True):
  145. if len(self._days) == 0:
  146. return None
  147. if left:
  148. min = self._days[0]
  149. time_stamp = min if time_stamp < min else time_stamp
  150. else:
  151. max = self._days[-1] + 86400 - 1
  152. time_stamp = max if time_stamp > max else time_stamp
  153. return time_stamp
  154. class EMchPosmap(IntEnum):
  155. submit_count = 0
  156. submit_amounts = 1
  157. succ_count = 2
  158. succ_mch_amounts = 3
  159. succ_ch_amounts = 4
  160. fail_count = 5
  161. fail_mch_amounts = 6
  162. @staticmethod
  163. def dim():
  164. return 7
  165. pass
  166. class EChPosmap(IntEnum):
  167. commit_count = 0
  168. commit_amounts = 1
  169. succ_count = 2
  170. succ_amounts = 3
  171. succ_periods = 4
  172. fail_count = 5
  173. fail_amounts = 6
  174. fail_periods = 7
  175. @staticmethod
  176. def dim():
  177. return 8
  178. pass
  179. class ENetPosmap(IntEnum):
  180. succ_count = 0
  181. fail_count = 1
  182. @staticmethod
  183. def dim():
  184. return 2
  185. pass