DataStream.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. from abc import ABCMeta, abstractmethod, ABC
  2. from datetime import timedelta
  3. # from mpi4py import MPI
  4. import h5py
  5. from enum import IntEnum
  6. __all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5']
  7. import logging
  8. log = logging.getLogger('stream')
  9. def day_stamp(stamp):
  10. import time as stime
  11. stamp = int(stamp)
  12. st_time = stime.gmtime(stamp + 8 * 3600)
  13. diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
  14. today = stamp - diff.total_seconds()
  15. return int(today)
  16. def open_hdf5(file, is_wirte):
  17. if is_wirte:
  18. # return h5py.File(file, 'a', driver='mpio', comm=MPI.COMM_WORLD)
  19. return h5py.File(file, 'a')
  20. else:
  21. return h5py.File(file, 'r')
  22. class DataWriteStream(metaclass=ABCMeta):
  23. _version = 20200618
  24. def __init__(self, hfive):
  25. self._hfive = hfive
  26. # Getter function
  27. @property
  28. def file(self):
  29. return self._hfive
  30. # Setter function
  31. @file.setter
  32. def file(self, value):
  33. self._hfive = value
  34. @abstractmethod
  35. def write(self, method, params):
  36. pass
  37. def close(self):
  38. if self._hfive is not None:
  39. self._hfive.close()
  40. self._hfive = None
  41. pass
  42. class DataReadStream(metaclass=ABCMeta):
  43. _version = 20200618
  44. def __init__(self, hfive):
  45. self._hfive = hfive
  46. # Getter function
  47. @property
  48. def file(self):
  49. return self._hfive
  50. # Setter function
  51. @file.setter
  52. def file(self, value):
  53. self._hfive = value
  54. @abstractmethod
  55. def read(self, path):
  56. pass
  57. def close(self):
  58. if self._hfive is not None:
  59. self._hfive.close()
  60. self._hfive = None
  61. def _sub_dirs(self, root):
  62. result = []
  63. try:
  64. for name, sub in root.items():
  65. if isinstance(sub, h5py.Group):
  66. result.append(name)
  67. except Exception as ex:
  68. log.error(ex)
  69. finally:
  70. return result
  71. def dir(self, group):
  72. result = []
  73. for name, sub in group.items():
  74. if isinstance(sub, h5py.Group):
  75. result.extend(self.dir(sub))
  76. else:
  77. result.append(sub.name)
  78. return result
  79. def _root_path(self):
  80. return f'/{self._version}/'
  81. def dirs(self):
  82. try:
  83. root_ptah = self._root_path()
  84. root = self.file.require_group(root_ptah)
  85. days = self.dir(root)
  86. return days
  87. except Exception as ex:
  88. log.error(ex)
  89. return []
  90. def days(self):
  91. try:
  92. root_ptah = self._root_path()
  93. root = self.file.require_group(root_ptah)
  94. days = self._sub_dirs(root)
  95. return days
  96. except Exception as ex:
  97. log.error(ex)
  98. return []
  99. pass
  100. class EMchPosmap(IntEnum):
  101. submit_count = 0
  102. submit_amounts = 1
  103. succ_count = 2
  104. succ_mch_amounts = 3
  105. succ_ch_amounts = 4
  106. fail_count = 5
  107. fail_mch_amounts = 6
  108. @staticmethod
  109. def dim():
  110. return 7
  111. pass
  112. class EChPosmap(IntEnum):
  113. commit_count = 0
  114. commit_amounts = 1
  115. succ_count = 2
  116. succ_amounts = 3
  117. succ_periods = 4
  118. fail_count = 5
  119. fail_amounts = 6
  120. fail_periods = 7
  121. @staticmethod
  122. def dim():
  123. return 8
  124. pass
  125. class ENetPosmap(IntEnum):
  126. succ_count = 0
  127. fail_count = 1
  128. @staticmethod
  129. def dim():
  130. return 2
  131. pass