DataStream.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import time
  2. from abc import ABCMeta, abstractmethod, ABC
  3. from datetime import timedelta
  4. from mpi4py import MPI
  5. import h5py
  6. from enum import IntEnum
  7. import threading
  8. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border']
  9. import logging
  10. logger = logging.getLogger('stream')
  11. def day_stamp(stamp):
  12. import time as stime
  13. stamp = int(stamp)
  14. st_time = stime.gmtime(stamp + 8 * 3600)
  15. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  16. today = stamp - diff.total_seconds()
  17. return int(today)
  18. def open_hdf5(file, is_wirte):
  19. if is_wirte:
  20. return h5py.File(file, 'a')
  21. else:
  22. return h5py.File(file, 'r')
  23. def time_border(interval, time_stamp, lt):
  24. day = day_stamp(time_stamp)
  25. pos = time_stamp - day
  26. if lt:
  27. pos = pos - pos % interval
  28. elif pos % interval > 0:
  29. pos += interval - pos % interval
  30. else:
  31. pos = pos
  32. return pos + day
  33. def calc_interval(start, end):
  34. period = end - start
  35. segment = int(period / 24)
  36. if segment == 0:
  37. return 1
  38. else:
  39. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 14400, 21600, 28800, 36000, 43200, 86400, 172800]
  40. for i in intervals:
  41. if segment < i:
  42. return i
  43. pass
  44. def span_days(start_time, end_time):
  45. start_day = day_stamp(start_time)
  46. end_day = day_stamp(end_time)
  47. days = list()
  48. while start_day <= end_day:
  49. days.append(start_day)
  50. start_day += 86400
  51. return days
  52. class DataWriteStream(metaclass=ABCMeta):
  53. _version = 20200618
  54. _paths = dict()
  55. _lock = threading.Lock()
  56. def __init__(self, hfive):
  57. self._hfive = hfive
  58. # Getter function
  59. @property
  60. def file(self):
  61. return self._hfive
  62. # Setter function
  63. @file.setter
  64. def file(self, value):
  65. self._hfive = value
  66. @abstractmethod
  67. def write(self, method, params):
  68. pass
  69. def close(self):
  70. self._lock.acquire()
  71. if self._hfive is not None:
  72. self._hfive.close()
  73. self._hfive = None
  74. self._lock.release()
  75. pass
  76. class DataReadStream(metaclass=ABCMeta):
  77. _version = 20200618
  78. _days = list()
  79. def __init__(self, hfive):
  80. self._hfive = hfive
  81. self._days = self._getdays()
  82. logger.debug(self._days)
  83. def __del__(self):
  84. pass
  85. def days(self):
  86. return self._days
  87. # Getter function
  88. @property
  89. def file(self):
  90. return self._hfive
  91. # Setter function
  92. @file.setter
  93. def file(self, value):
  94. self._hfive = value
  95. def close(self):
  96. if self._hfive is not None:
  97. self._hfive.close()
  98. self._hfive = None
  99. def _root_path(self, day_stamp=None):
  100. if day_stamp is None:
  101. return f'/{self._version}/'
  102. else:
  103. return f'/{self._version}/{day_stamp}/'
  104. def datasets(self, day_stamp):
  105. def dir(group):
  106. result = []
  107. for name, sub in group.items():
  108. if isinstance(sub, h5py.Group):
  109. result.extend(dir(sub))
  110. else:
  111. result.append(sub.name)
  112. return result
  113. try:
  114. path = self._root_path(day_stamp)
  115. if path in self.file:
  116. group = self.file.require_group(path)
  117. days = dir(group)
  118. return days
  119. else:
  120. return []
  121. except Exception as ex:
  122. logger.error(ex)
  123. return []
  124. def _getdays(self):
  125. def sub(root):
  126. result = []
  127. try:
  128. for name, sub in root.items():
  129. if isinstance(sub, h5py.Group):
  130. result.append(name)
  131. except Exception as ex:
  132. logger.error(ex)
  133. finally:
  134. return result
  135. try:
  136. root_ptah = self._root_path()
  137. root = self.file.require_group(root_ptah)
  138. tmp = sub(root)
  139. days = [int(day) for day in tmp]
  140. days.sort()
  141. return days
  142. except Exception as ex:
  143. logger.error(ex)
  144. return []
  145. def near_stamp(self, time_stamp, left=True):
  146. if len(self._days) == 0:
  147. return None
  148. if time_stamp > int(time.time()):
  149. time_stamp = int(time.time())
  150. if left:
  151. min = self._days[0]
  152. time_stamp = min if time_stamp < min else time_stamp
  153. else:
  154. max = self._days[-1] + 86400 - 1
  155. time_stamp = max if time_stamp > max else time_stamp
  156. return time_stamp
  157. class EMchPosmap(IntEnum):
  158. submit_count = 0
  159. submit_amounts = 1
  160. succ_count = 2
  161. succ_mch_amounts = 3
  162. succ_ch_amounts = 4
  163. fail_count = 5
  164. fail_mch_amounts = 6
  165. @staticmethod
  166. def dim():
  167. return 7
  168. pass
  169. class EChPosmap(IntEnum):
  170. commit_count = 0
  171. commit_amounts = 1
  172. succ_count = 2
  173. succ_amounts = 3
  174. succ_periods = 4
  175. fail_count = 5
  176. fail_amounts = 6
  177. fail_periods = 7
  178. @staticmethod
  179. def dim():
  180. return 8
  181. pass
  182. class ENetPosmap(IntEnum):
  183. succ_count = 0
  184. fail_count = 1
  185. @staticmethod
  186. def dim():
  187. return 2
  188. pass