DataStream.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. from abc import ABCMeta, abstractmethod, ABC
  2. from datetime import timedelta
  3. from mpi4py import MPI
  4. import h5py
  5. from enum import IntEnum
  6. import threading
  7. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5','time_border']
  8. import logging
  9. logger = logging.getLogger('stream')
  10. def day_stamp(stamp):
  11. import time as stime
  12. stamp = int(stamp)
  13. st_time = stime.gmtime(stamp + 8 * 3600)
  14. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  15. today = stamp - diff.total_seconds()
  16. return int(today)
  17. def open_hdf5(file, is_wirte):
  18. if is_wirte:
  19. return h5py.File(file, 'a')
  20. else:
  21. return h5py.File(file, 'r')
  22. def time_border(interval, time_stamp, lt):
  23. day = day_stamp(time_stamp)
  24. pos = time_stamp - day
  25. if lt:
  26. pos = pos - pos % interval
  27. elif pos % interval > 0:
  28. pos += interval - pos % interval
  29. else:
  30. pos = pos
  31. return pos + day
  32. def calc_interval(start,end):
  33. period = end - start
  34. segment = int(period / 24)
  35. if segment == 0:
  36. return 1
  37. else:
  38. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 14400, 21600, 28800, 36000, 43200, 86400, 172800]
  39. for i in intervals:
  40. if segment < i:
  41. return i
  42. pass
  43. def span_days(start_time,end_time):
  44. start_day = day_stamp(start_time)
  45. end_day = day_stamp(end_time)
  46. days = list()
  47. while start_day <= end_day:
  48. days.append(start_day)
  49. start_day += 86400
  50. return days
  51. class DataWriteStream(metaclass=ABCMeta):
  52. _version = 20200618
  53. _paths = dict()
  54. _lock = threading.Lock()
  55. def __init__(self, hfive):
  56. self._hfive = hfive
  57. # Getter function
  58. @property
  59. def file(self):
  60. return self._hfive
  61. # Setter function
  62. @file.setter
  63. def file(self, value):
  64. self._hfive = value
  65. @abstractmethod
  66. def write(self, method, params):
  67. pass
  68. def close(self):
  69. self._lock.acquire()
  70. if self._hfive is not None:
  71. self._hfive.close()
  72. self._hfive = None
  73. self._lock.release()
  74. pass
  75. class DataReadStream(metaclass=ABCMeta):
  76. _version = 20200618
  77. _days = list()
  78. def __init__(self, hfive):
  79. self._hfive = hfive
  80. self._days = self._getdays()
  81. def __del__(self):
  82. pass
  83. def days(self):
  84. return self._days
  85. # Getter function
  86. @property
  87. def file(self):
  88. return self._hfive
  89. # Setter function
  90. @file.setter
  91. def file(self, value):
  92. self._hfive = value
  93. def close(self):
  94. if self._hfive is not None:
  95. self._hfive.close()
  96. self._hfive = None
  97. def _root_path(self,day_stamp = None):
  98. if day_stamp is None:
  99. return f'/{self._version}/'
  100. else:
  101. return f'/{self._version}/{day_stamp}/'
  102. def datasets(self, day_stamp):
  103. def dir(group):
  104. result = []
  105. for name, sub in group.items():
  106. if isinstance(sub, h5py.Group):
  107. result.extend(dir(sub))
  108. else:
  109. result.append(sub.name)
  110. return result
  111. try:
  112. path = self._root_path(day_stamp)
  113. group = self.file.require_group(path)
  114. days = dir(group)
  115. return days
  116. except Exception as ex:
  117. logger.error(ex)
  118. return []
  119. def _getdays(self):
  120. def sub(root):
  121. result = []
  122. try:
  123. for name, sub in root.items():
  124. if isinstance(sub, h5py.Group):
  125. result.append(name)
  126. except Exception as ex:
  127. logger.error(ex)
  128. finally:
  129. return result
  130. try:
  131. root_ptah = self._root_path()
  132. root = self.file.require_group(root_ptah)
  133. tmp = sub(root)
  134. days = [int(day) for day in tmp]
  135. days.sort()
  136. return days
  137. except Exception as ex:
  138. logger.error(ex)
  139. return []
  140. def near_stamp(self,time_stamp,left = True):
  141. if len(self._days) == 0:
  142. return None
  143. day = day_stamp(time_stamp)
  144. pos = time_stamp - day
  145. if left:
  146. while True:
  147. if day >= self._days[0]:
  148. break
  149. else:
  150. day += 86400
  151. else:
  152. while True:
  153. if day <= self._days[-1]:
  154. break
  155. else:
  156. day -= 86400
  157. return pos + day
  158. class EMchPosmap(IntEnum):
  159. submit_count = 0
  160. submit_amounts = 1
  161. succ_count = 2
  162. succ_mch_amounts = 3
  163. succ_ch_amounts = 4
  164. fail_count = 5
  165. fail_mch_amounts = 6
  166. @staticmethod
  167. def dim():
  168. return 7
  169. pass
  170. class EChPosmap(IntEnum):
  171. commit_count = 0
  172. commit_amounts = 1
  173. succ_count = 2
  174. succ_amounts = 3
  175. succ_periods = 4
  176. fail_count = 5
  177. fail_amounts = 6
  178. fail_periods = 7
  179. @staticmethod
  180. def dim():
  181. return 8
  182. pass
  183. class ENetPosmap(IntEnum):
  184. succ_count = 0
  185. fail_count = 1
  186. @staticmethod
  187. def dim():
  188. return 2
  189. pass