SpeedDataCenter.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. class SpeedDataCenter(object):
  10. def __init__(self):
  11. self._mquit = False
  12. self._mRHost = ''
  13. self._mRPort = 6379
  14. self._file_name = '/var/www/html/data/stdata/speed.hdf5'
  15. def set_redis(self, rhost, rport):
  16. self._mRHost = rhost
  17. self._mRPort = rport
  18. def stop(self):
  19. self._mquit = True
  20. pass
  21. def prepare_data(self):
  22. while self._mquit == False:
  23. try:
  24. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  25. r = redis.Redis(connection_pool=pool)
  26. if path.exists(self._file_name):
  27. hfive = h5py.File(self._file_name, 'a')
  28. else:
  29. hfive = h5py.File(self._file_name, 'w')
  30. self.read_redis(hfive, r, 'nc_commit_speed_monitor')
  31. hfive.close()
  32. self.del_redis(r, 'nc_commit_speed_monitor')
  33. except Exception as ex:
  34. print(ex)
  35. finally:
  36. stime.sleep(0.05)
  37. def del_redis(self, redis, name):
  38. latest_time = int(stime.time()) - 2
  39. for item in redis.hscan_iter(name):
  40. key = str(item[0], encoding="utf-8")
  41. items = re.split(r'-', key)
  42. fdel = True
  43. if len(items) == 5:
  44. (chname, quality, card_type, amount, time) = items
  45. time = int(time)
  46. if latest_time <= time:
  47. fdel = False
  48. if fdel:
  49. redis.hdel(name, key)
  50. pass
  51. def read_redis(self, hfive, redis, name):
  52. for item in redis.hscan_iter(name):
  53. key = str(item[0], encoding="utf-8")
  54. val = str(item[1], encoding="utf-8")
  55. self.parase(hfive, key, val)
  56. def parase(self, hfive, text, val):
  57. items = re.split(r'-', text)
  58. if len(items) != 5:
  59. return False
  60. (chname, quality, card_type, spec, time) = items
  61. time = int(time)
  62. today = self.day_stamp(time)
  63. path = f'/{today}/{chname}/{quality}/{card_type}/{spec}'
  64. if path not in hfive:
  65. hfive[path] = np.zeros((1, 86400))
  66. diff = time - today
  67. hfive[path][0, diff] = val
  68. print(path, 0, diff, val, hfive[path][0, diff])
  69. pass
  70. def day_stamp(self, stamp):
  71. stamp = int(stamp)
  72. x = stime.gmtime(stamp + 8 * 3600)
  73. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  74. today = stamp - diff.total_seconds()
  75. return int(today)
  76. ########################################################################################################################
  77. def channl_speed(self):
  78. import json
  79. r = None
  80. try:
  81. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  82. r = redis.Redis(connection_pool=pool)
  83. except Exception as ex:
  84. print(ex)
  85. while True:
  86. try:
  87. time_stamp = int(stime.time())
  88. paths = self.paths(time_stamp)
  89. pos_range = self._pos_range(time_stamp)
  90. hfive = h5py.File(self._file_name, 'r')
  91. result = {}
  92. for path in paths:
  93. key = self._parse_path(path)
  94. speed = self._calc_speed(hfive, path, pos_range)
  95. result[key] = speed
  96. pass
  97. hfive.close()
  98. if len(result) != 0:
  99. r.set(f"nc_channels_speed", json.dumps(result))
  100. r.publish('refill',json.dumps({'type':'channels_speed','value':0}))
  101. print('push msg=', result)
  102. except Exception as ex:
  103. print(ex)
  104. finally:
  105. stime.sleep(0.05)
  106. def _parse_path(self,path):
  107. items = re.split(r'/', path)
  108. if len(items) != 6:
  109. return False
  110. (_, _sday, _chname, _quality, _card_type, _spec) = items
  111. key = f'{_chname}-{_spec}-{_card_type}-{_quality}' #接收端的key,是如此定义的
  112. return key
  113. def _pos_range(self,end):
  114. pos_day = np.arange(0, 86400, 1)
  115. day_stamp = self.day_stamp(end)
  116. period = 60
  117. if end - period > day_stamp:
  118. start = end - period
  119. pos_range = np.where(pos_day > start)
  120. else:
  121. start = day_stamp
  122. pos_range = np.where(pos_day >= start)
  123. pos_range = np.where(pos_range <= end)
  124. return pos_range
  125. def _calc_speed(self,hFive,path,pos_range):
  126. datas = hFive[path]
  127. commit = datas[0]
  128. commit = commit[pos_range]
  129. speed = int(np.sum(commit))
  130. return speed
  131. def paths(self, time_stamp):
  132. try:
  133. day_stamp = self.day_stamp(time_stamp)
  134. hfive = h5py.File(self._file_name, 'r')
  135. group = hfive.require_group(f'/{day_stamp}')
  136. paths = self.dir(group)
  137. hfive.close()
  138. return paths
  139. except Exception as ex:
  140. print(ex)
  141. return []
  142. def dir(self, group):
  143. result = []
  144. for name, sub in group.items():
  145. if isinstance(sub, h5py.Group):
  146. result.extend(self.dir(sub))
  147. else:
  148. result.append(sub.name)
  149. return result
  150. speedDataCenter = SpeedDataCenter()