DataStream.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import time
  2. from abc import ABCMeta, abstractmethod, ABC
  3. from datetime import timedelta
  4. from mpi4py import MPI
  5. import h5py
  6. from enum import IntEnum
  7. import threading
  8. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5', 'time_border',
  9. 'calc_interval']
  10. import logging
  11. logger = logging.getLogger('stream')
  12. def day_stamp(stamp):
  13. import time as stime
  14. stamp = int(stamp)
  15. st_time = stime.gmtime(stamp + 8 * 3600)
  16. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  17. today = stamp - diff.total_seconds()
  18. return int(today)
  19. def open_hdf5(file, is_wirte):
  20. if is_wirte:
  21. return h5py.File(file, 'a', libver='latest') #, swmr=True
  22. else:
  23. return h5py.File(file, 'r', libver='latest') #plot/test_h5py.py
  24. def time_border(interval, time_stamp, lt):
  25. day = day_stamp(time_stamp)
  26. pos = time_stamp - day
  27. if lt:
  28. pos = pos - pos % interval
  29. elif pos % interval > 0:
  30. pos += interval - pos % interval
  31. else:
  32. pos = pos
  33. return pos + day
  34. def calc_interval(start, end):
  35. period = end - start
  36. segment = int(period / 24)
  37. if segment == 0:
  38. return 1
  39. else:
  40. intervals = [30, 60, 300, 600, 900, 1800, 3600, 7200, 14400, 21600, 28800, 36000, 43200, 86400, 172800]
  41. for i in intervals:
  42. if segment < i:
  43. return i
  44. pass
  45. def span_days(start_time, end_time):
  46. start_day = day_stamp(start_time)
  47. end_day = day_stamp(end_time)
  48. days = list()
  49. while start_day <= end_day:
  50. days.append(start_day)
  51. start_day += 86400
  52. return days
  53. class DataWriteStream(metaclass=ABCMeta):
  54. _version = 20200618
  55. _paths = dict()
  56. _lock = threading.Lock()
  57. def __init__(self, hfive):
  58. self._hfive = hfive
  59. # Getter function
  60. @property
  61. def file(self):
  62. return self._hfive
  63. # Setter function
  64. @file.setter
  65. def file(self, value):
  66. self._hfive = value
  67. @abstractmethod
  68. def write(self, method, params):
  69. pass
  70. def close(self):
  71. self._lock.acquire()
  72. if self._hfive is not None:
  73. self._hfive.close()
  74. self._hfive = None
  75. self._lock.release()
  76. pass
  77. class DataReadStream(metaclass=ABCMeta):
  78. _version = 20200618
  79. _days = list()
  80. def __init__(self, hfive):
  81. self._hfive = hfive
  82. self._days = self._getdays()
  83. stime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))
  84. sdays = [stime(day) for day in self._days]
  85. logger.debug(sdays)
  86. logger.debug(self._days)
  87. def __del__(self):
  88. pass
  89. def days(self):
  90. return self._days
  91. # Getter function
  92. @property
  93. def file(self):
  94. return self._hfive
  95. # Setter function
  96. @file.setter
  97. def file(self, value):
  98. self._hfive = value
  99. def close(self):
  100. if self._hfive is not None:
  101. self._hfive.close()
  102. self._hfive = None
  103. def _root_path(self, day_stamp=None):
  104. if day_stamp is None:
  105. return f'/{self._version}/'
  106. else:
  107. return f'/{self._version}/{day_stamp}/'
  108. def datasets(self, day_stamp):
  109. def dir(group):
  110. result = []
  111. for name, sub in group.items():
  112. if isinstance(sub, h5py.Group):
  113. result.extend(dir(sub))
  114. else:
  115. result.append(sub.name)
  116. return result
  117. try:
  118. path = self._root_path(day_stamp)
  119. if path in self.file:
  120. group = self.file.require_group(path)
  121. days = dir(group)
  122. return days
  123. else:
  124. return []
  125. except Exception as ex:
  126. logger.error(ex)
  127. return []
  128. def _getdays(self):
  129. def sub(root):
  130. result = []
  131. try:
  132. for name, sub in root.items():
  133. if isinstance(sub, h5py.Group):
  134. result.append(name)
  135. except Exception as ex:
  136. logger.error(ex)
  137. finally:
  138. return result
  139. try:
  140. root_ptah = self._root_path()
  141. root = self.file.require_group(root_ptah)
  142. tmp = sub(root)
  143. days = [int(day) for day in tmp]
  144. days.sort()
  145. return days
  146. except Exception as ex:
  147. logger.error(ex)
  148. return []
  149. def near_stamp(self, time_stamp, left=True):
  150. if len(self._days) == 0:
  151. return None
  152. if time_stamp > int(time.time()):
  153. time_stamp = int(time.time())
  154. if left:
  155. min = self._days[0]
  156. time_stamp = min if time_stamp < min else time_stamp
  157. else:
  158. max = self._days[-1] + 86400 - 1
  159. time_stamp = max if time_stamp > max else time_stamp
  160. return time_stamp
  161. class EMchPosmap(IntEnum):
  162. submit_count = 0
  163. submit_amounts = 1
  164. succ_count = 2
  165. succ_mch_amounts = 3
  166. succ_ch_amounts = 4
  167. fail_count = 5
  168. fail_mch_amounts = 6
  169. @staticmethod
  170. def dim():
  171. return 7
  172. pass
  173. class EChPosmap(IntEnum):
  174. commit_count = 0
  175. commit_amounts = 1
  176. succ_count = 2
  177. succ_amounts = 3
  178. succ_periods = 4
  179. fail_count = 5
  180. fail_amounts = 6
  181. fail_periods = 7
  182. @staticmethod
  183. def dim():
  184. return 8
  185. pass
  186. class ENetPosmap(IntEnum):
  187. succ_count = 0
  188. fail_count = 1
  189. @staticmethod
  190. def dim():
  191. return 2
  192. pass