SpeedDataCenter.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. from datetime import datetime
  9. import numpy as np
  10. class SpeedDataCenter(object):
  11. def __init__(self):
  12. self._mquit = False
  13. self._mRHost = ''
  14. self._mRPort = 6379
  15. self._file_name = '/var/www/html/data/stdata/speed.hdf5'
  16. def set_redis(self, rhost, rport):
  17. self._mRHost = rhost
  18. self._mRPort = rport
  19. def stop(self):
  20. self._mquit = True
  21. pass
  22. def prepare_data(self):
  23. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  24. r = redis.Redis(connection_pool=pool)
  25. while self._mquit == False:
  26. try:
  27. print('open file:', datetime.now())
  28. if path.exists(self._file_name):
  29. hfive = h5py.File(self._file_name, 'a')
  30. else:
  31. hfive = h5py.File(self._file_name, 'w')
  32. print('read redis:', datetime.now())
  33. self.read_redis(hfive, r, 'nc_commit_speed_monitor')
  34. hfive.close()
  35. print('del redis:', datetime.now())
  36. self.del_redis(r, 'nc_commit_speed_monitor')
  37. print('del redis:', datetime.now())
  38. except Exception as ex:
  39. print(ex)
  40. finally:
  41. stime.sleep(0.1)
  42. def del_redis(self, redis, name):
  43. latest_time = int(stime.time()) - 2
  44. for item in redis.hscan_iter(name):
  45. key = str(item[0], encoding="utf-8")
  46. items = re.split(r'-', key)
  47. fdel = True
  48. if len(items) == 5:
  49. (chname, quality, card_type, amount, time) = items
  50. time = int(time)
  51. if time > latest_time:
  52. fdel = False
  53. if fdel:
  54. redis.hdel(name, key)
  55. pass
  56. def read_redis(self, hfive, redis, name):
  57. for item in redis.hscan_iter(name):
  58. key = str(item[0], encoding="utf-8")
  59. val = str(item[1], encoding="utf-8")
  60. self.parase(hfive, key, val)
  61. def parase(self, hfive, text, val):
  62. items = re.split(r'-', text)
  63. if len(items) != 5:
  64. return False
  65. (chname, quality, card_type, spec, time) = items
  66. time = int(time)
  67. today = self.day_stamp(time)
  68. path = f'/{today}/{chname}/{quality}/{card_type}/{spec}'
  69. if path not in hfive:
  70. hfive[path] = np.zeros((1, 86400))
  71. diff = time - today
  72. hfive[path][0, diff] = val
  73. print(path, 0, diff, val, hfive[path][0, diff])
  74. pass
  75. def day_stamp(self, stamp):
  76. stamp = int(stamp)
  77. x = stime.gmtime(stamp + 8 * 3600)
  78. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  79. today = stamp - diff.total_seconds()
  80. return int(today)
  81. ########################################################################################################################
  82. def channl_speed(self):
  83. import json
  84. r = None
  85. try:
  86. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  87. r = redis.Redis(connection_pool=pool)
  88. except Exception as ex:
  89. print(ex)
  90. while True:
  91. try:
  92. time_stamp = int(stime.time())
  93. paths = self.paths(time_stamp)
  94. day_stamp = self.day_stamp(time_stamp)
  95. (pos_start, pos_end) = self._pos_range(time_stamp - day_stamp)
  96. hfive = h5py.File(self._file_name, 'r')
  97. result = {}
  98. for path in paths:
  99. key = self._parse_path(path)
  100. speed = self._calc_speed(hfive, path, pos_start, pos_end)
  101. result[key] = speed
  102. pass
  103. hfive.close()
  104. if len(result) != 0:
  105. r.set(f"nc_channels_speed", json.dumps(result))
  106. r.publish('refill', json.dumps({'type': 'channels_speed', 'value': 0}))
  107. print(datetime.now(), 'push msg=', result)
  108. except Exception as ex:
  109. print(ex)
  110. finally:
  111. stime.sleep(0.1)
  112. def _parse_path(self,path):
  113. items = re.split(r'/', path)
  114. if len(items) != 6:
  115. return False
  116. (_, _sday, _chname, _quality, _card_type, _spec) = items
  117. key = f'{_chname}-{_spec}-{_card_type}-{_quality}' #接收端的key,是如此定义的
  118. return key
  119. def _pos_range(self,end):
  120. pos_day = np.arange(0, 86400, 1)
  121. period = 60
  122. if end - period > 0:
  123. start = end - period
  124. pos_start = np.where(pos_day > start)
  125. else:
  126. start = 0
  127. pos_start = np.where(pos_day >= start)
  128. pos_day = pos_day[pos_start]
  129. pos_end = np.where(pos_day <= end)
  130. return pos_start, pos_end
  131. def _calc_speed(self,hFive,path,pos_start, pos_end):
  132. datas = hFive[path]
  133. commit = datas[0]
  134. commit = commit[pos_start]
  135. commit = commit[pos_end]
  136. speed = int(np.sum(commit))
  137. return speed
  138. def paths(self, time_stamp):
  139. try:
  140. day_stamp = self.day_stamp(time_stamp)
  141. hfive = h5py.File(self._file_name, 'r')
  142. group = hfive.require_group(f'/{day_stamp}')
  143. paths = self.dir(group)
  144. hfive.close()
  145. return paths
  146. except Exception as ex:
  147. print(ex)
  148. return []
  149. def dir(self, group):
  150. result = []
  151. for name, sub in group.items():
  152. if isinstance(sub, h5py.Group):
  153. result.extend(self.dir(sub))
  154. else:
  155. result.append(sub.name)
  156. return result
  157. speedDataCenter = SpeedDataCenter()