SpeedDataCenter.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. from datetime import datetime
  9. import numpy as np
  10. class SpeedDataCenter(object):
  11. latest_delta = 10
  12. def __init__(self):
  13. self._mquit = False
  14. self._mRHost = ''
  15. self._mRPort = 6379
  16. self._file_name = '/var/www/html/data/stdata/speed.hdf5'
  17. def set_redis(self, rhost, rport):
  18. self._mRHost = rhost
  19. self._mRPort = rport
  20. def stop(self):
  21. self._mquit = True
  22. pass
  23. def prepare_data(self):
  24. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  25. r = redis.Redis(connection_pool=pool)
  26. while self._mquit == False:
  27. try:
  28. print('open file:', datetime.now())
  29. if path.exists(self._file_name):
  30. hfive = h5py.File(self._file_name, 'a')
  31. else:
  32. hfive = h5py.File(self._file_name, 'w')
  33. latest_time = int(stime.time()) - self.latest_delta
  34. print('read redis:', datetime.now())
  35. self.read_redis(hfive, r, 'nc_commit_speed_monitor')
  36. hfive.close()
  37. print('del redis:', datetime.now())
  38. self.del_redis(r, 'nc_commit_speed_monitor',latest_time)
  39. print('del redis:', datetime.now())
  40. except Exception as ex:
  41. print(ex)
  42. finally:
  43. stime.sleep(1)
  44. def del_redis(self, redis, name,latest_time):
  45. for item in redis.hscan_iter(name):
  46. key = str(item[0], encoding="utf-8")
  47. items = re.split(r'-', key)
  48. fdel = True
  49. if len(items) == 5:
  50. (chname, quality, card_type, amount, time) = items
  51. time = int(time)
  52. if time > latest_time:
  53. fdel = False
  54. if fdel:
  55. redis.hdel(name, key)
  56. pass
  57. def read_redis(self, hfive, redis, name):
  58. for item in redis.hscan_iter(name):
  59. key = str(item[0], encoding="utf-8")
  60. val = str(item[1], encoding="utf-8")
  61. self.parase(hfive, key, val)
  62. def parase(self, hfive, text, val):
  63. items = re.split(r'-', text)
  64. if len(items) != 5:
  65. return False
  66. (chname, quality, card_type, spec, time) = items
  67. time = int(time)
  68. today = self.day_stamp(time)
  69. path = f'/{today}/{chname}/{quality}/{card_type}/{spec}'
  70. if path not in hfive:
  71. hfive[path] = np.zeros((1, 86400))
  72. diff = time - today
  73. hfive[path][0, diff] = val
  74. print(path, 0, diff, val, hfive[path][0, diff])
  75. pass
  76. def day_stamp(self, stamp):
  77. stamp = int(stamp)
  78. x = stime.gmtime(stamp + 8 * 3600)
  79. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  80. today = stamp - diff.total_seconds()
  81. return int(today)
  82. ########################################################################################################################
  83. def channl_speed(self):
  84. import json
  85. r = None
  86. try:
  87. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  88. r = redis.Redis(connection_pool=pool)
  89. except Exception as ex:
  90. print(ex)
  91. while True:
  92. try:
  93. time_stamp = int(stime.time())
  94. paths = self.paths(time_stamp)
  95. day_stamp = self.day_stamp(time_stamp)
  96. (pos_start, pos_end) = self._pos_range(time_stamp - day_stamp)
  97. hfive = h5py.File(self._file_name, 'r')
  98. result = {}
  99. for path in paths:
  100. key = self._parse_path(path)
  101. speed = self._calc_speed(hfive, path, pos_start, pos_end)
  102. result[key] = speed
  103. pass
  104. hfive.close()
  105. if len(result) != 0:
  106. r.set(f"nc_channels_speed", json.dumps(result))
  107. r.publish('refill', json.dumps({'type': 'channels_speed', 'value': 0}))
  108. print(datetime.now(), 'push msg=', result)
  109. except Exception as ex:
  110. print(ex)
  111. finally:
  112. stime.sleep(0.1)
  113. def _parse_path(self,path):
  114. items = re.split(r'/', path)
  115. if len(items) != 6:
  116. return False
  117. (_, _sday, _chname, _quality, _card_type, _spec) = items
  118. key = f'{_chname}-{_spec}-{_card_type}-{_quality}' #接收端的key,是如此定义的
  119. return key
  120. def _pos_range(self,end):
  121. pos_day = np.arange(0, 86400, 1)
  122. period = 60
  123. if end - period > 0:
  124. start = end - period
  125. pos_start = np.where(pos_day > start)
  126. else:
  127. start = 0
  128. pos_start = np.where(pos_day >= start)
  129. pos_day = pos_day[pos_start]
  130. pos_end = np.where(pos_day <= end)
  131. return pos_start, pos_end
  132. def _calc_speed(self,hFive,path,pos_start, pos_end):
  133. datas = hFive[path]
  134. commit = datas[0]
  135. commit = commit[pos_start]
  136. commit = commit[pos_end]
  137. speed = int(np.sum(commit))
  138. return speed
  139. def paths(self, time_stamp):
  140. try:
  141. day_stamp = self.day_stamp(time_stamp)
  142. hfive = h5py.File(self._file_name, 'r')
  143. group = hfive.require_group(f'/{day_stamp}')
  144. paths = self.dir(group)
  145. hfive.close()
  146. return paths
  147. except Exception as ex:
  148. print(ex)
  149. return []
  150. def dir(self, group):
  151. result = []
  152. for name, sub in group.items():
  153. if isinstance(sub, h5py.Group):
  154. result.extend(self.dir(sub))
  155. else:
  156. result.append(sub.name)
  157. return result
  158. speedDataCenter = SpeedDataCenter()