DataCenter.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. class DataCenter(object):
  14. pos_map = {
  15. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  16. }
  17. def __init__(self):
  18. self._mquit = False
  19. self._mRHost = ''
  20. self._mRPort = 6379
  21. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  22. def set_redis(self, rhost, rport):
  23. self._mRHost = rhost
  24. self._mRPort = rport
  25. def stop(self):
  26. self._mquit = True
  27. pass
  28. def prepare_data(self):
  29. while True:
  30. try:
  31. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  32. r = redis.Redis(connection_pool=pool)
  33. if path.exists(self._file_name):
  34. hfive = h5py.File(self._file_name, 'a')
  35. else:
  36. hfive = h5py.File(self._file_name, 'w')
  37. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  38. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  39. hfive.close()
  40. self.del_redis(r, 'nc_channel_monitor_commit')
  41. self.del_redis(r, 'nc_channel_monitor_notify')
  42. except Exception as ex:
  43. print(ex)
  44. finally:
  45. for i in range(60):
  46. if self._mquit == True:
  47. break
  48. else:
  49. stime.sleep(1)
  50. def del_redis(self, redis, name):
  51. latest_time = int(stime.time()) - 300
  52. for item in redis.hscan_iter(name):
  53. key = str(item[0], encoding="utf-8")
  54. items = re.split(r'-', key)
  55. fdel = True
  56. if len(items) == 6:
  57. (stype, chname, quality, card_type, amount, time) = items
  58. time = int(time)
  59. if latest_time <= time:
  60. fdel = False
  61. if fdel:
  62. redis.hdel(name, key)
  63. pass
  64. def read_redis(self, hfive, redis, name, prefix):
  65. i = 0
  66. for item in redis.hscan_iter(name):
  67. key = str(item[0], encoding="utf-8")
  68. val = str(item[1], encoding="utf-8")
  69. print(f'{prefix}:{i}')
  70. i += 1
  71. self.parase(hfive, key, val, prefix)
  72. def parase(self, hfive, text, val, prefix):
  73. items = re.split(r'-', text)
  74. if len(items) != 6:
  75. return False
  76. (stype, chname, quality, card_type, amount, time) = items
  77. if stype == 'succ':
  78. pos = self.pos_map[f'{prefix}-succ']
  79. elif stype == 'fail':
  80. pos = self.pos_map[f'{prefix}-fail']
  81. else:
  82. return False
  83. time = int(time)
  84. today = self.day_stamp(time)
  85. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  86. if path not in hfive:
  87. hfive[path] = np.zeros((5, 86400))
  88. diff = time - today
  89. if diff < 0:
  90. print(diff)
  91. hfive[path][pos, diff] = val
  92. print(path, pos, diff, val, hfive[path][pos, diff])
  93. pass
  94. def day_stamp(self, stamp):
  95. stamp = int(stamp)
  96. x = stime.gmtime(stamp + 8 * 3600)
  97. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  98. today = stamp - diff.total_seconds()
  99. return int(today)
  100. def _days(self, root):
  101. result = []
  102. try:
  103. for name, sub in root.items():
  104. if isinstance(sub, h5py.Group):
  105. result.append(name)
  106. except Exception as ex:
  107. print(ex)
  108. finally:
  109. return result
  110. def days(self):
  111. try:
  112. hfive = h5py.File(self._file_name, 'r')
  113. root = hfive.require_group('/')
  114. days = self._days(root)
  115. hfive.close()
  116. return days
  117. except Exception as ex:
  118. print(ex)
  119. return []
  120. def paths(self, time_stamp):
  121. try:
  122. day_stamp = self.day_stamp(time_stamp)
  123. hfive = h5py.File(self._file_name, 'r')
  124. group = hfive.require_group(f'/{day_stamp}')
  125. paths = self.dir(group)
  126. hfive.close()
  127. return paths
  128. except Exception as ex:
  129. print(ex)
  130. return []
  131. def dir(self, group):
  132. result = []
  133. for name, sub in group.items():
  134. if isinstance(sub, h5py.Group):
  135. result.extend(self.dir(sub))
  136. else:
  137. result.append(sub.name)
  138. return result
  139. def draw_plot(self, start_time, interval=300, **kwargs):
  140. logger = logging.getLogger('app')
  141. hfive = h5py.File(self._file_name, 'r')
  142. try:
  143. day_stamp = self.day_stamp(start_time)
  144. start_pos = start_time - day_stamp
  145. cur_day = self.day_stamp(stime.time())
  146. if day_stamp == cur_day:
  147. end_pos = int(stime.time()) - day_stamp
  148. else:
  149. end_pos = -1
  150. fig = Figure(figsize=(16, 8))
  151. ax = fig.subplots()
  152. x = np.arange(0, 86400, interval)
  153. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  154. sub_count = 0
  155. predata = np.zeros((5, 86400))
  156. for path, data in self.read_data(hfive, paths):
  157. data = np.array(data)
  158. predata = predata + data
  159. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  160. if ret:
  161. sub_count += 1
  162. if sub_count > 1:
  163. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  164. ax.legend()
  165. ax.grid()
  166. ax.set_title('success ratio')
  167. ax.set(xlabel='time', ylabel='ratio')
  168. fig.autofmt_xdate()
  169. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  170. buf = BytesIO()
  171. fig.savefig(buf, format="png")
  172. return buf
  173. except Exception as ex:
  174. print(ex)
  175. finally:
  176. hfive.close()
  177. def read_data(self, hfive, paths):
  178. for path in paths:
  179. yield path, hfive[path]
  180. def datasets(self, hfive, start_time, **kwargs):
  181. logger = logging.getLogger('app')
  182. day_stamp = self.day_stamp(start_time)
  183. sday = f'{day_stamp}'
  184. root = hfive.require_group('/')
  185. days = self._days(root)
  186. if sday not in days:
  187. return False
  188. group = hfive.require_group(sday)
  189. dsets = self.dir(group)
  190. chname = quality = card_type = amount = None
  191. for key, val in kwargs.items():
  192. if val is None:
  193. continue
  194. elif key == 'chname':
  195. chname = val
  196. elif key == 'quality':
  197. quality = f'{val}'
  198. elif key == 'card_type':
  199. card_type = f'{val}'
  200. elif key == 'amount':
  201. amount = f'{val}'
  202. else:
  203. continue
  204. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  205. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  206. filer_text = ''
  207. if chname is not None:
  208. filer_text = chname
  209. if quality is not None:
  210. filer_text = filer_text + f"-qua:{quality}"
  211. if card_type is not None:
  212. filer_text = filer_text + f"-type:{card_type}"
  213. if amount is not None:
  214. filer_text = filer_text + f"-amount:{amount}"
  215. paths = []
  216. for text in dsets:
  217. items = re.split(r'/', text)
  218. if len(items) != 6:
  219. return False
  220. (_, _sday, _chname, _quality, _card_type, _amount) = items
  221. if (chname is not None) and (_chname != chname):
  222. continue
  223. if (quality is not None) and (_quality != quality):
  224. continue
  225. if (card_type is not None) and (_card_type != card_type):
  226. continue
  227. if (amount is not None) and (_amount != amount):
  228. continue
  229. paths.append(text)
  230. return filer_text, paths
  231. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  232. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  233. logging.getLogger('app').debug("path=%s", path)
  234. all = data[2] + data[3]
  235. all = all.reshape((-1, interval))
  236. all = np.sum(all, axis=1)
  237. ySucc = data[2]
  238. ySucc = ySucc.reshape((-1, interval))
  239. ySucc = np.sum(ySucc, axis=1)
  240. if end_pos == -1:
  241. pos = np.where(x >= start_pos)
  242. x = x[pos]
  243. ySucc = ySucc[pos]
  244. all = all[pos]
  245. else:
  246. pos = np.where(start_pos <= x)
  247. x = x[pos]
  248. ySucc = ySucc[pos]
  249. all = all[pos]
  250. pos = np.where(x < end_pos)
  251. x = x[pos]
  252. ySucc = ySucc[pos]
  253. all = all[pos]
  254. succ_count = int(np.sum(ySucc))
  255. all_count = int(np.sum(all))
  256. opened = np.where(ySucc > 0.1)
  257. if len(opened[0]) == 0:
  258. logging.getLogger('app').debug("path=%s,opened=False", path)
  259. return False
  260. ySucc = ySucc / all
  261. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  262. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  263. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  264. return True
  265. def _label(self, path, count, all):
  266. ratio = 0.00
  267. if all > 0:
  268. ratio = round(count * 100 / all, 2)
  269. items = re.split(r'/', path)
  270. if len(items) == 6:
  271. (_, _sday, _chname, _quality, _card_type, _amount) = items
  272. card_type = ''
  273. if _card_type == '1':
  274. card_type = 'SY'
  275. elif _card_type == '2':
  276. card_type = 'SH'
  277. elif _card_type == '4':
  278. card_type = 'YD'
  279. elif _card_type == '5':
  280. card_type = 'LT'
  281. elif _card_type == '6':
  282. card_type = 'DX'
  283. elif _card_type == '7':
  284. card_type = 'TH'
  285. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  286. else:
  287. if path == '' or path is None:
  288. path = 'average'
  289. return f"{path}:{count}/{all} = {ratio}%"
  290. def calc_ratio(self,start_time,end_time):
  291. pass
  292. dataCenter = DataCenter()