MchDataCenter.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. class MchDataCenter(object):
  14. pos_map = {
  15. 'commit': 0, 'notify': 1
  16. }
  17. def __init__(self):
  18. self._mquit = False
  19. self._mRHost = ''
  20. self._mRPort = 6379
  21. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  22. def set_redis(self, rhost, rport):
  23. self._mRHost = rhost
  24. self._mRPort = rport
  25. def stop(self):
  26. self._mquit = True
  27. pass
  28. def prepare_data(self):
  29. while True:
  30. try:
  31. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  32. r = redis.Redis(connection_pool=pool)
  33. if path.exists(self._file_name):
  34. hfive = h5py.File(self._file_name, 'a')
  35. else:
  36. hfive = h5py.File(self._file_name, 'w')
  37. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  38. self.read_redis(hfive, r, 'nc_user_monitor_success', 'notify')
  39. hfive.close()
  40. self.del_redis(r, 'nc_user_monitor_commit')
  41. self.del_redis(r, 'nc_user_monitor_success')
  42. except Exception as ex:
  43. print(ex)
  44. finally:
  45. for i in range(60):
  46. if self._mquit == True:
  47. break
  48. else:
  49. stime.sleep(1)
  50. def del_redis(self, redis, name):
  51. latest_time = int(stime.time()) - 300
  52. for item in redis.hscan_iter(name):
  53. key = str(item[0], encoding="utf-8")
  54. items = re.split(r'-', key)
  55. fdel = True
  56. if len(items) == 5:
  57. (mchid, quality, card_type, amount, time) = items
  58. time = int(time)
  59. if latest_time <= time:
  60. fdel = False
  61. if fdel:
  62. redis.hdel(name, key)
  63. pass
  64. def read_redis(self, hfive, redis, name, prefix):
  65. i = 0
  66. for item in redis.hscan_iter(name):
  67. key = str(item[0], encoding="utf-8")
  68. val = str(item[1], encoding="utf-8")
  69. print(f'{prefix}:{i}')
  70. i += 1
  71. self.parase(hfive, key, val, prefix)
  72. def parase(self, hfive, text, val, prefix):
  73. items = re.split(r'-', text)
  74. if len(items) != 5:
  75. return False
  76. (mchid, quality, card_type, amount, time) = items
  77. pos = self.pos_map[f'{prefix}']
  78. time = int(time)
  79. today = self.day_stamp(time)
  80. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  81. if path not in hfive:
  82. hfive[path] = np.zeros((2, 86400))
  83. diff = time - today
  84. if diff < 0:
  85. print(diff)
  86. hfive[path][pos, diff] = val
  87. print(path, pos, diff, val, hfive[path][pos, diff])
  88. pass
  89. def day_stamp(self, stamp):
  90. stamp = int(stamp)
  91. x = stime.gmtime(stamp + 8 * 3600)
  92. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  93. today = stamp - diff.total_seconds()
  94. return int(today)
  95. def _days(self, root):
  96. result = []
  97. try:
  98. for name, sub in root.items():
  99. if isinstance(sub, h5py.Group):
  100. result.append(name)
  101. except Exception as ex:
  102. print(ex)
  103. finally:
  104. return result
  105. def days(self):
  106. try:
  107. hfive = h5py.File(self._file_name, 'r')
  108. root = hfive.require_group('/')
  109. days = self._days(root)
  110. hfive.close()
  111. return days
  112. except Exception as ex:
  113. print(ex)
  114. return []
  115. def paths(self, time_stamp):
  116. try:
  117. day_stamp = self.day_stamp(time_stamp)
  118. hfive = h5py.File(self._file_name, 'r')
  119. group = hfive.require_group(f'/{day_stamp}')
  120. paths = self.dir(group)
  121. hfive.close()
  122. return paths
  123. except Exception as ex:
  124. print(ex)
  125. return []
  126. def dir(self, group):
  127. result = []
  128. for name, sub in group.items():
  129. if isinstance(sub, h5py.Group):
  130. result.extend(self.dir(sub))
  131. else:
  132. result.append(sub.name)
  133. return result
  134. def _all_none(self, **kwargs):
  135. for key, val in kwargs.items():
  136. if val is not None:
  137. return False
  138. return True
  139. def _merge_path(self,paths):
  140. result = {}
  141. for path in paths:
  142. items = re.split(r'/', path)
  143. if len(items) != 6:
  144. continue
  145. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  146. _mchid = int(_mchid)
  147. if _mchid not in result:
  148. result[_mchid] = []
  149. result[_mchid].append(path)
  150. return result
  151. def draw_plot(self, start_time, interval=300, **kwargs):
  152. logger = logging.getLogger('app')
  153. hfive = h5py.File(self._file_name, 'r')
  154. try:
  155. day_stamp = self.day_stamp(start_time)
  156. start_pos = start_time - day_stamp
  157. cur_day = self.day_stamp(stime.time())
  158. if day_stamp == cur_day:
  159. end_pos = int(stime.time()) - day_stamp
  160. else:
  161. end_pos = -1
  162. fig = Figure(figsize=(16, 8))
  163. ax = fig.subplots()
  164. predata = np.zeros((2, 86400))
  165. x = np.arange(0, 86400, interval)
  166. sub_count = 0
  167. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  168. if self._all_none(**kwargs):
  169. paths = self._merge_path(paths)
  170. for mchid, data in self._read_dict_data(hfive, paths):
  171. predata = predata + data
  172. path = f'{mchid}'
  173. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  174. if ret:
  175. sub_count += 1
  176. pass
  177. else:
  178. for path, data in self.read_data(hfive, paths):
  179. data = np.array(data)
  180. predata = predata + data
  181. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  182. if ret:
  183. sub_count += 1
  184. if sub_count > 1:
  185. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  186. ax.legend()
  187. ax.grid()
  188. ax.set_title('success ratio')
  189. ax.set(xlabel='time', ylabel='ratio')
  190. fig.autofmt_xdate()
  191. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  192. buf = BytesIO()
  193. fig.savefig(buf, format="png")
  194. return buf
  195. except Exception as ex:
  196. print(ex)
  197. finally:
  198. hfive.close()
  199. def read_data(self, hfive, paths):
  200. for path in paths:
  201. yield path, hfive[path]
  202. def _read_dict_data(self, hfive, mchPaths):
  203. for mchid, paths in mchPaths.items():
  204. predata = np.zeros((2, 86400))
  205. for path in paths:
  206. predata += hfive[path]
  207. yield mchid, predata
  208. def datasets(self, hfive, start_time, **kwargs):
  209. logger = logging.getLogger('app')
  210. day_stamp = self.day_stamp(start_time)
  211. sday = f'{day_stamp}'
  212. root = hfive.require_group('/')
  213. days = self._days(root)
  214. if sday not in days:
  215. return False
  216. group = hfive.require_group(sday)
  217. dsets = self.dir(group)
  218. mchid = quality = card_type = amount = None
  219. for key, val in kwargs.items():
  220. if val is None:
  221. continue
  222. if key == 'mchid':
  223. mchid = val
  224. elif key == 'quality':
  225. quality = f'{val}'
  226. elif key == 'card_type':
  227. card_type = f'{val}'
  228. elif key == 'amount':
  229. amount = f'{val}'
  230. else:
  231. continue
  232. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  233. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  234. filer_text = ''
  235. if mchid is not None:
  236. filer_text = mchid
  237. if quality is not None:
  238. filer_text = filer_text + f"-qua:{quality}"
  239. if card_type is not None:
  240. filer_text = filer_text + f"-type:{card_type}"
  241. if amount is not None:
  242. filer_text = filer_text + f"-amount:{amount}"
  243. paths = []
  244. for text in dsets:
  245. items = re.split(r'/', text)
  246. if len(items) != 6:
  247. return False
  248. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  249. if (mchid is not None) and (_mchid != mchid):
  250. continue
  251. if (quality is not None) and (_quality != quality):
  252. continue
  253. if (card_type is not None) and (_card_type != card_type):
  254. continue
  255. if (amount is not None) and (_amount != amount):
  256. continue
  257. paths.append(text)
  258. return filer_text, paths
  259. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  260. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  261. logging.getLogger('app').debug("path=%s", path)
  262. all = data[0]
  263. all = all.reshape((-1, interval))
  264. all = np.sum(all, axis=1)
  265. ySucc = data[1]
  266. ySucc = ySucc.reshape((-1, interval))
  267. ySucc = np.sum(ySucc, axis=1)
  268. if end_pos == -1:
  269. pos = np.where(x >= start_pos)
  270. x = x[pos]
  271. ySucc = ySucc[pos]
  272. all = all[pos]
  273. else:
  274. pos = np.where(start_pos <= x)
  275. x = x[pos]
  276. ySucc = ySucc[pos]
  277. all = all[pos]
  278. pos = np.where(x < end_pos)
  279. x = x[pos]
  280. ySucc = ySucc[pos]
  281. all = all[pos]
  282. succ_count = int(np.sum(ySucc))
  283. all_count = int(np.sum(all))
  284. if all_count < 1:
  285. return False
  286. pos = np.where(ySucc > all)
  287. ySucc[pos] = all[pos]
  288. ySucc = ySucc / (all + 0.00000001)
  289. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  290. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  291. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  292. return True
  293. def _label(self, path, count, all):
  294. ratio = 0.00
  295. if all > 0:
  296. ratio = round(count * 100 / all, 2)
  297. items = re.split(r'/', path)
  298. if len(items) == 6:
  299. (_, _sday, _chname, _quality, _card_type, _amount) = items
  300. card_type = ''
  301. if _card_type == '1':
  302. card_type = 'SY'
  303. elif _card_type == '2':
  304. card_type = 'SH'
  305. elif _card_type == '4':
  306. card_type = 'YD'
  307. elif _card_type == '5':
  308. card_type = 'LT'
  309. elif _card_type == '6':
  310. card_type = 'DX'
  311. elif _card_type == '7':
  312. card_type = 'TH'
  313. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  314. else:
  315. if path == '' or path is None:
  316. path = 'average'
  317. return f"{path}:{count}/{all} = {ratio}%"
  318. mchDataCenter = MchDataCenter()