DataCenter.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. class DataCenter(object):
  14. pos_map = {
  15. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  16. }
  17. def __init__(self):
  18. self._mquit = False
  19. self._mRHost = ''
  20. self._mRPort = 6379
  21. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  22. def set_redis(self, rhost, rport):
  23. self._mRHost = rhost
  24. self._mRPort = rport
  25. def stop(self):
  26. self._mquit = True
  27. pass
  28. def prepare_data(self):
  29. while True:
  30. try:
  31. # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0)
  32. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  33. r = redis.Redis(connection_pool=pool)
  34. if path.exists(self._file_name):
  35. hfive = h5py.File(self._file_name, 'a')
  36. else:
  37. hfive = h5py.File(self._file_name, 'w')
  38. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  39. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  40. hfive.close()
  41. self.del_redis(r, 'nc_channel_monitor_commit')
  42. self.del_redis(r, 'nc_channel_monitor_notify')
  43. except Exception as ex:
  44. print(ex)
  45. finally:
  46. for i in range(60):
  47. if self._mquit == True:
  48. break
  49. else:
  50. stime.sleep(1)
  51. def del_redis(self, redis, name):
  52. latest_time = int(stime.time()) - 300
  53. for item in redis.hscan_iter(name):
  54. key = str(item[0], encoding="utf-8")
  55. items = re.split(r'-', key)
  56. fdel = True
  57. if len(items) == 6:
  58. (stype, chname, quality, card_type, amount, time) = items
  59. time = int(time)
  60. if latest_time <= time:
  61. fdel = False
  62. if fdel:
  63. redis.hdel(name, key)
  64. pass
  65. def read_redis(self, hfive, redis, name, prefix):
  66. i = 0
  67. for item in redis.hscan_iter(name):
  68. key = str(item[0], encoding="utf-8")
  69. val = str(item[1], encoding="utf-8")
  70. print(f'{prefix}:{i}')
  71. i += 1
  72. self.parase(hfive, key, val, prefix)
  73. def parase(self, hfive, text, val, prefix):
  74. items = re.split(r'-', text)
  75. if len(items) != 6:
  76. return False
  77. (stype, chname, quality, card_type, amount, time) = items
  78. if stype == 'succ':
  79. pos = self.pos_map[f'{prefix}-succ']
  80. elif stype == 'fail':
  81. pos = self.pos_map[f'{prefix}-fail']
  82. else:
  83. return False
  84. time = int(time)
  85. today = self.day_stamp(time)
  86. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  87. if path not in hfive:
  88. hfive[path] = np.zeros((5, 86400))
  89. diff = time - today
  90. if diff < 0:
  91. print(diff)
  92. hfive[path][pos, diff] = val
  93. print(path, pos, diff, val, hfive[path][pos, diff])
  94. pass
  95. def day_stamp(self, stamp):
  96. stamp = int(stamp)
  97. x = stime.gmtime(stamp + 8 * 3600)
  98. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  99. today = stamp - diff.total_seconds()
  100. return int(today)
  101. def _days(self, root):
  102. result = []
  103. try:
  104. for name, sub in root.items():
  105. if isinstance(sub, h5py.Group):
  106. result.append(name)
  107. except Exception as ex:
  108. print(ex)
  109. finally:
  110. return result
  111. def days(self):
  112. try:
  113. hfive = h5py.File(self._file_name, 'r')
  114. root = hfive.require_group('/')
  115. days = self._days(root)
  116. hfive.close()
  117. return days
  118. except Exception as ex:
  119. print(ex)
  120. return []
  121. def paths(self, time_stamp):
  122. try:
  123. day_stamp = self.day_stamp(time_stamp)
  124. hfive = h5py.File(self._file_name, 'r')
  125. group = hfive.require_group(f'/{day_stamp}')
  126. paths = self.dir(group)
  127. hfive.close()
  128. return paths
  129. except Exception as ex:
  130. print(ex)
  131. return []
  132. def dir(self, group):
  133. result = []
  134. for name, sub in group.items():
  135. if isinstance(sub, h5py.Group):
  136. result.extend(self.dir(sub))
  137. else:
  138. result.append(sub.name)
  139. return result
  140. def draw_plot(self, start_time, interval=300, **kwargs):
  141. logger = logging.getLogger('app')
  142. hfive = h5py.File(self._file_name, 'r')
  143. try:
  144. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  145. day_stamp = self.day_stamp(start_time)
  146. start_pos = start_time - day_stamp
  147. cur_day = self.day_stamp(stime.time())
  148. if day_stamp == cur_day:
  149. end_pos = int(stime.time()) - day_stamp
  150. else:
  151. end_pos = -1
  152. fig = Figure(figsize=(16, 8))
  153. ax = fig.subplots()
  154. predata = np.zeros((5, 86400))
  155. x = np.arange(0, 86400, interval)
  156. sub_count = 0
  157. for path, data in self.read_data(hfive, paths):
  158. data = np.array(data)
  159. predata = predata + data
  160. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  161. if ret:
  162. sub_count += 1
  163. if sub_count > 1:
  164. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  165. ax.legend()
  166. ax.grid()
  167. ax.set_title('success ratio')
  168. ax.set(xlabel='time', ylabel='ratio')
  169. fig.autofmt_xdate()
  170. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  171. buf = BytesIO()
  172. fig.savefig(buf, format="png")
  173. return buf
  174. except Exception as ex:
  175. print(ex)
  176. finally:
  177. hfive.close()
  178. def read_data(self, hfive, paths):
  179. for path in paths:
  180. yield path, hfive[path]
  181. def datasets(self, hfive, start_time, **kwargs):
  182. logger = logging.getLogger('app')
  183. day_stamp = self.day_stamp(start_time)
  184. sday = f'{day_stamp}'
  185. root = hfive.require_group('/')
  186. days = self._days(root)
  187. if sday not in days:
  188. return False
  189. group = hfive.require_group(sday)
  190. dsets = self.dir(group)
  191. chname = quality = card_type = amount = None
  192. for key, val in kwargs.items():
  193. if val is None:
  194. continue
  195. if key == 'chname':
  196. chname = val
  197. elif key == 'quality':
  198. quality = f'{val}'
  199. elif key == 'card_type':
  200. card_type = f'{val}'
  201. elif key == 'amount':
  202. amount = f'{val}'
  203. else:
  204. continue
  205. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  206. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  207. filer_text = ''
  208. if chname is not None:
  209. filer_text = chname
  210. if quality is not None:
  211. filer_text = filer_text + f"-qua:{quality}"
  212. if card_type is not None:
  213. filer_text = filer_text + f"-type:{card_type}"
  214. if amount is not None:
  215. filer_text = filer_text + f"-amount:{amount}"
  216. paths = []
  217. for text in dsets:
  218. items = re.split(r'/', text)
  219. if len(items) != 6:
  220. return False
  221. (_, _sday, _chname, _quality, _card_type, _amount) = items
  222. if (chname is not None) and (_chname != chname):
  223. continue
  224. if (quality is not None) and (_quality != quality):
  225. continue
  226. if (card_type is not None) and (_card_type != card_type):
  227. continue
  228. if (amount is not None) and (_amount != amount):
  229. continue
  230. paths.append(text)
  231. return filer_text, paths
  232. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  233. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  234. logging.getLogger('app').debug("path=%s", path)
  235. all = data[2] + data[3]
  236. all = all.reshape((-1, interval))
  237. all = np.sum(all, axis=1)
  238. ySucc = data[2]
  239. ySucc = ySucc.reshape((-1, interval))
  240. ySucc = np.sum(ySucc, axis=1)
  241. if end_pos == -1:
  242. pos = np.where(x >= start_pos)
  243. x = x[pos]
  244. ySucc = ySucc[pos]
  245. all = all[pos]
  246. else:
  247. pos = np.where(start_pos <= x)
  248. x = x[pos]
  249. ySucc = ySucc[pos]
  250. all = all[pos]
  251. pos = np.where(x < end_pos)
  252. x = x[pos]
  253. ySucc = ySucc[pos]
  254. all = all[pos]
  255. succ_count = int(np.sum(ySucc))
  256. all_count = int(np.sum(all))
  257. opened = np.where(ySucc > 0.000001)
  258. if len(opened[0]) == 0:
  259. logging.getLogger('app').debug("path=%s,opened=False", path)
  260. return False
  261. ySucc = ySucc / (all + 0.00000001)
  262. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  263. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  264. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  265. return True
  266. def _label(self, path, count, all):
  267. ratio = 0.00
  268. if all > 0:
  269. ratio = round(count * 100 / all, 2)
  270. items = re.split(r'/', path)
  271. if len(items) == 6:
  272. (_, _sday, _chname, _quality, _card_type, _amount) = items
  273. card_type = ''
  274. if _card_type == '1':
  275. card_type = 'SY'
  276. elif _card_type == '2':
  277. card_type = 'SH'
  278. elif _card_type == '4':
  279. card_type = 'YD'
  280. elif _card_type == '5':
  281. card_type = 'LT'
  282. elif _card_type == '6':
  283. card_type = 'DX'
  284. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  285. else:
  286. if path == '' or path is None:
  287. path = 'average'
  288. return f"{path}:{count}/{all} = {ratio}%"
  289. dataCenter = DataCenter()