DataCenter.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. from threading import Thread
  2. import os
  3. import time as stime
  4. from Singleton import Singleton
  5. import redis
  6. import h5py
  7. from os import path
  8. import re
  9. from datetime import timedelta
  10. import numpy as np
  11. from matplotlib.figure import Figure
  12. from matplotlib import ticker
  13. from io import BytesIO
  14. import logging
  15. class DataCenter(object):
  16. pos_map = {
  17. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  18. }
  19. def __init__(self):
  20. self._mquit = False
  21. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  22. def stop(self):
  23. self._mquit = True
  24. pass
  25. def prepare_data(self):
  26. while True:
  27. try:
  28. # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0)
  29. pool = redis.ConnectionPool(host='172.26.105.125', port=6379, db=0)
  30. r = redis.Redis(connection_pool=pool)
  31. if path.exists(self._file_name):
  32. hfive = h5py.File(self._file_name, 'a')
  33. else:
  34. hfive = h5py.File(self._file_name, 'w')
  35. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  36. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  37. hfive.close()
  38. self.del_redis(r, 'nc_channel_monitor_commit')
  39. self.del_redis(r, 'nc_channel_monitor_notify')
  40. except Exception as ex:
  41. print(ex)
  42. finally:
  43. for i in range(60):
  44. if self._mquit == True:
  45. break
  46. else:
  47. stime.sleep(1)
  48. def del_redis(self, redis, name):
  49. latest_time = int(stime.time()) - 300
  50. for item in redis.hscan_iter(name):
  51. key = str(item[0], encoding="utf-8")
  52. items = re.split(r'-', key)
  53. fdel = True
  54. if len(items) == 6:
  55. (stype, chname, quality, card_type, amount, time) = items
  56. time = int(time)
  57. if latest_time <= time:
  58. fdel = False
  59. if fdel:
  60. redis.hdel(name, key)
  61. pass
  62. def read_redis(self, hfive, redis, name, prefix):
  63. i = 0
  64. for item in redis.hscan_iter(name):
  65. key = str(item[0], encoding="utf-8")
  66. val = str(item[1], encoding="utf-8")
  67. print(f'{prefix}:{i}')
  68. i += 1
  69. self.parase(hfive, key, val, prefix)
  70. def parase(self, hfive, text, val, prefix):
  71. items = re.split(r'-', text)
  72. if len(items) != 6:
  73. return False
  74. (stype, chname, quality, card_type, amount, time) = items
  75. if stype == 'succ':
  76. pos = self.pos_map[f'{prefix}-succ']
  77. elif stype == 'fail':
  78. pos = self.pos_map[f'{prefix}-fail']
  79. else:
  80. return False
  81. time = int(time)
  82. today = self.day_stamp(time)
  83. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  84. if path not in hfive:
  85. hfive[path] = np.zeros((5, 86400))
  86. diff = time - today
  87. if diff < 0:
  88. print(diff)
  89. hfive[path][pos, diff] = val
  90. print(path, pos, diff, val, hfive[path][pos, diff])
  91. pass
  92. def day_stamp(self, stamp):
  93. stamp = int(stamp)
  94. x = stime.gmtime(stamp + 8 * 3600)
  95. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  96. today = stamp - diff.total_seconds()
  97. return int(today)
  98. def _days(self, root):
  99. result = []
  100. try:
  101. for name, sub in root.items():
  102. if isinstance(sub, h5py.Group):
  103. result.append(name)
  104. except Exception as ex:
  105. print(ex)
  106. finally:
  107. return result
  108. def days(self):
  109. try:
  110. hfive = h5py.File(self._file_name, 'r')
  111. root = hfive.require_group('/')
  112. days = self._days(root)
  113. hfive.close()
  114. return days
  115. except Exception as ex:
  116. print(ex)
  117. return []
  118. def paths(self,time_stamp):
  119. try:
  120. day_stamp = self.day_stamp(time_stamp)
  121. hfive = h5py.File(self._file_name, 'r')
  122. group = hfive.require_group(f'/{day_stamp}')
  123. paths = self.dir(group)
  124. hfive.close()
  125. return paths
  126. except Exception as ex:
  127. print(ex)
  128. return []
  129. def dir(self, group):
  130. result = []
  131. for name, sub in group.items():
  132. if isinstance(sub, h5py.Group):
  133. result.extend(self.dir(sub))
  134. else:
  135. result.append(sub.name)
  136. return result
  137. def draw_plot(self, start_time, interval=300, **kwargs):
  138. logger = logging.getLogger('app')
  139. hfive = h5py.File(self._file_name, 'r')
  140. try:
  141. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  142. day_stamp = self.day_stamp(start_time)
  143. start_pos = start_time - day_stamp
  144. cur_day = self.day_stamp(stime.time())
  145. if day_stamp == cur_day:
  146. end_pos = int(stime.time()) - day_stamp
  147. else:
  148. end_pos = -1
  149. fig = Figure(figsize=(16, 9))
  150. ax = fig.subplots()
  151. predata = np.zeros((5, 86400))
  152. x = np.arange(0, 86400, interval)
  153. for path, data in self.read_data(hfive, paths):
  154. data = np.array(data)
  155. predata = predata + data
  156. self._draw_plot(ax, x, day_stamp, start_pos,end_pos, data, interval, path)
  157. logger.info("path=%s", path)
  158. self._draw_plot(ax, x, day_stamp, start_pos,end_pos, predata, interval, filer_text)
  159. ax.legend()
  160. ax.grid()
  161. ax.set_title('success ratio')
  162. ax.set(xlabel='time', ylabel='ratio')
  163. fig.autofmt_xdate()
  164. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  165. buf = BytesIO()
  166. fig.savefig(buf, format="png")
  167. return buf
  168. except Exception as ex:
  169. print(ex)
  170. finally:
  171. hfive.close()
  172. def read_data(self, hfive, paths):
  173. for path in paths:
  174. yield path, hfive[path]
  175. def datasets(self, hfive, start_time, **kwargs):
  176. logger = logging.getLogger('app')
  177. day_stamp = self.day_stamp(start_time)
  178. sday = f'{day_stamp}'
  179. root = hfive.require_group('/')
  180. days = self._days(root)
  181. if sday not in days:
  182. return False
  183. group = hfive.require_group(sday)
  184. dsets = self.dir(group)
  185. chname = quality = card_type = amount = None
  186. for key, val in kwargs.items():
  187. if val is None:
  188. continue
  189. if key == 'chname':
  190. chname = val
  191. elif key == 'quality':
  192. quality = f'{val}'
  193. elif key == 'card_type':
  194. card_type = f'{val}'
  195. elif key == 'amount':
  196. amount = f'{val}'
  197. else:
  198. continue
  199. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  200. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  201. filer_text = ''
  202. if chname is not None:
  203. filer_text = chname
  204. if quality is not None:
  205. filer_text = filer_text + f"-qua:{quality}"
  206. if card_type is not None:
  207. filer_text = filer_text + f"-type:{card_type}"
  208. if amount is not None:
  209. filer_text = filer_text + f"-amount:{amount}"
  210. paths = []
  211. for text in dsets:
  212. items = re.split(r'/', text)
  213. if len(items) != 6:
  214. return False
  215. (_, _sday, _chname, _quality, _card_type, _amount) = items
  216. if (chname is not None) and (_chname != chname):
  217. continue
  218. if (quality is not None) and (_quality != quality):
  219. continue
  220. if (card_type is not None) and (_card_type != card_type):
  221. continue
  222. if (amount is not None) and (_amount != amount):
  223. continue
  224. paths.append(text)
  225. return filer_text, paths
  226. def _draw_plot(self, ax, x, day_stamp, start_pos,end_pos, data, interval=300, path=''):
  227. import matplotlib.dates as mdate
  228. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  229. all = data[2] + data[3]
  230. all = all.reshape((-1, interval))
  231. all = np.sum(all, axis=1)
  232. ySucc = data[2]
  233. ySucc = ySucc.reshape((-1, interval))
  234. ySucc = np.sum(ySucc, axis=1)
  235. ySucc = ySucc / (all + 0.00000001)
  236. if end_pos == -1:
  237. pos = np.where(x >= start_pos)
  238. x = x[pos]
  239. ySucc = ySucc[pos]
  240. else:
  241. pos = np.where(start_pos <= x)
  242. x = x[pos]
  243. ySucc = ySucc[pos]
  244. pos = np.where(x < end_pos)
  245. x = x[pos]
  246. ySucc = ySucc[pos]
  247. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  248. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  249. ax.plot(xs, ySucc, ls='--', marker='o', label=path)
  250. dataCenter = DataCenter()