MchDataCenter.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. class MchDataCenter(object):
  14. pos_map = {
  15. 'commit': 0, 'success': 1, 'fail': 2
  16. }
  17. def __init__(self):
  18. self._mquit = False
  19. self._mRHost = ''
  20. self._mRPort = 6379
  21. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  22. def set_redis(self, rhost, rport):
  23. self._mRHost = rhost
  24. self._mRPort = rport
  25. def stop(self):
  26. self._mquit = True
  27. pass
  28. def prepare_data(self):
  29. while self._mquit == False:
  30. try:
  31. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  32. r = redis.Redis(connection_pool=pool)
  33. if path.exists(self._file_name):
  34. hfive = h5py.File(self._file_name, 'a')
  35. else:
  36. hfive = h5py.File(self._file_name, 'w')
  37. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  38. self.read_redis(hfive, r, 'nc_user_monitor_success', 'success')
  39. self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail')
  40. hfive.close()
  41. self.del_redis(r, 'nc_user_monitor_commit')
  42. self.del_redis(r, 'nc_user_monitor_success')
  43. self.del_redis(r, 'nc_user_monitor_fail')
  44. except Exception as ex:
  45. print(ex)
  46. finally:
  47. for i in range(60):
  48. if self._mquit:
  49. break
  50. else:
  51. stime.sleep(1)
  52. def del_redis(self, redis, name):
  53. latest_time = int(stime.time()) - 300
  54. for item in redis.hscan_iter(name):
  55. key = str(item[0], encoding="utf-8")
  56. items = re.split(r'-', key)
  57. fdel = True
  58. if len(items) == 5:
  59. (mchid, quality, card_type, amount, time) = items
  60. time = int(time)
  61. if latest_time <= time:
  62. fdel = False
  63. if fdel:
  64. redis.hdel(name, key)
  65. pass
  66. def read_redis(self, hfive, redis, name, prefix):
  67. i = 0
  68. for item in redis.hscan_iter(name):
  69. key = str(item[0], encoding="utf-8")
  70. val = str(item[1], encoding="utf-8")
  71. print(f'{prefix}:{i}')
  72. i += 1
  73. self.parase(hfive, key, val, prefix)
  74. def parase(self, hfive, text, val, prefix):
  75. items = re.split(r'-', text)
  76. if len(items) != 5:
  77. return False
  78. (mchid, quality, card_type, amount, time) = items
  79. pos = self.pos_map[f'{prefix}']
  80. time = int(time)
  81. today = self.day_stamp(time)
  82. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  83. if path not in hfive:
  84. dim = len(self.pos_map)
  85. hfive[path] = np.zeros((dim, 86400))
  86. diff = time - today
  87. if diff < 0:
  88. print(diff)
  89. hfive[path][pos, diff] = val
  90. print(path, pos, diff, val, hfive[path][pos, diff])
  91. pass
  92. def day_stamp(self, stamp):
  93. stamp = int(stamp)
  94. x = stime.gmtime(stamp + 8 * 3600)
  95. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  96. today = stamp - diff.total_seconds()
  97. return int(today)
  98. def _days(self, root):
  99. result = []
  100. try:
  101. for name, sub in root.items():
  102. if isinstance(sub, h5py.Group):
  103. result.append(name)
  104. except Exception as ex:
  105. print(ex)
  106. finally:
  107. return result
  108. def days(self):
  109. try:
  110. hfive = h5py.File(self._file_name, 'r')
  111. root = hfive.require_group('/')
  112. days = self._days(root)
  113. hfive.close()
  114. return days
  115. except Exception as ex:
  116. print(ex)
  117. return []
  118. def paths(self, time_stamp):
  119. try:
  120. day_stamp = self.day_stamp(time_stamp)
  121. hfive = h5py.File(self._file_name, 'r')
  122. group = hfive.require_group(f'/{day_stamp}')
  123. paths = self.dir(group)
  124. hfive.close()
  125. return paths
  126. except Exception as ex:
  127. print(ex)
  128. return []
  129. def dir(self, group):
  130. result = []
  131. for name, sub in group.items():
  132. if isinstance(sub, h5py.Group):
  133. result.extend(self.dir(sub))
  134. else:
  135. result.append(sub.name)
  136. return result
  137. def _all_none(self, **kwargs):
  138. for key, val in kwargs.items():
  139. if val is not None:
  140. return False
  141. return True
  142. def _merge_path(self,paths):
  143. result = {}
  144. for path in paths:
  145. items = re.split(r'/', path)
  146. if len(items) != 6:
  147. continue
  148. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  149. _mchid = int(_mchid)
  150. if _mchid not in result:
  151. result[_mchid] = []
  152. result[_mchid].append(path)
  153. return result
  154. def draw_plot(self, start_time, interval=300, **kwargs):
  155. logger = logging.getLogger('app')
  156. hfive = h5py.File(self._file_name, 'r')
  157. try:
  158. day_stamp = self.day_stamp(start_time)
  159. start_pos = start_time - day_stamp
  160. cur_day = self.day_stamp(stime.time())
  161. if day_stamp == cur_day:
  162. end_pos = int(stime.time()) - day_stamp
  163. else:
  164. end_pos = -1
  165. fig = Figure(figsize=(16, 8))
  166. ax = fig.subplots()
  167. dim = len(self.pos_map)
  168. predata = np.zeros((dim, 86400))
  169. x = np.arange(0, 86400, interval)
  170. sub_count = 0
  171. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  172. if self._all_none(**kwargs):
  173. paths = self._merge_path(paths)
  174. for mchid, data in self._read_dict_data(hfive, paths):
  175. predata = predata + data
  176. path = f'{mchid}'
  177. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  178. if ret:
  179. sub_count += 1
  180. pass
  181. else:
  182. for path, data in self.read_data(hfive, paths):
  183. data = np.array(data)
  184. predata = predata + data
  185. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  186. if ret:
  187. sub_count += 1
  188. if sub_count > 1:
  189. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  190. ax.legend()
  191. ax.grid()
  192. ax.set_title('success ratio')
  193. ax.set(xlabel='time', ylabel='ratio')
  194. fig.autofmt_xdate()
  195. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  196. buf = BytesIO()
  197. fig.savefig(buf, format="png")
  198. return buf
  199. except Exception as ex:
  200. print(ex)
  201. finally:
  202. hfive.close()
  203. def read_data(self, hfive, paths):
  204. for path in paths:
  205. yield path, hfive[path]
  206. def _read_dict_data(self, hfive, mchPaths):
  207. for mchid, paths in mchPaths.items():
  208. dim = len(self.pos_map)
  209. predata = np.zeros((dim, 86400))
  210. for path in paths:
  211. predata += hfive[path]
  212. yield mchid, predata
  213. def datasets(self, hfive, start_time, **kwargs):
  214. logger = logging.getLogger('app')
  215. day_stamp = self.day_stamp(start_time)
  216. sday = f'{day_stamp}'
  217. root = hfive.require_group('/')
  218. days = self._days(root)
  219. if sday not in days:
  220. return False
  221. group = hfive.require_group(sday)
  222. dsets = self.dir(group)
  223. mchid = quality = card_type = amount = None
  224. for key, val in kwargs.items():
  225. if val is None:
  226. continue
  227. if key == 'mchid':
  228. mchid = val
  229. elif key == 'quality':
  230. quality = f'{val}'
  231. elif key == 'card_type':
  232. card_type = f'{val}'
  233. elif key == 'amount':
  234. amount = f'{val}'
  235. else:
  236. continue
  237. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  238. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  239. filer_text = ''
  240. if mchid is not None:
  241. filer_text = mchid
  242. if quality is not None:
  243. filer_text = filer_text + f"-qua:{quality}"
  244. if card_type is not None:
  245. filer_text = filer_text + f"-type:{card_type}"
  246. if amount is not None:
  247. filer_text = filer_text + f"-amount:{amount}"
  248. paths = []
  249. for text in dsets:
  250. items = re.split(r'/', text)
  251. if len(items) != 6:
  252. return False
  253. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  254. if (mchid is not None) and (_mchid != mchid):
  255. continue
  256. if (quality is not None) and (_quality != quality):
  257. continue
  258. if (card_type is not None) and (_card_type != card_type):
  259. continue
  260. if (amount is not None) and (_amount != amount):
  261. continue
  262. paths.append(text)
  263. return filer_text, paths
  264. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  265. # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2
  266. logging.getLogger('app').debug("path=%s", path)
  267. all = data[1] + data[2]
  268. all = all.reshape((-1, interval))
  269. all = np.sum(all, axis=1)
  270. ySucc = data[1]
  271. ySucc = ySucc.reshape((-1, interval))
  272. ySucc = np.sum(ySucc, axis=1)
  273. if end_pos == -1:
  274. pos = np.where(x >= start_pos)
  275. x = x[pos]
  276. ySucc = ySucc[pos]
  277. all = all[pos]
  278. else:
  279. pos = np.where(start_pos <= x)
  280. x = x[pos]
  281. ySucc = ySucc[pos]
  282. all = all[pos]
  283. pos = np.where(x < end_pos)
  284. x = x[pos]
  285. ySucc = ySucc[pos]
  286. all = all[pos]
  287. succ_count = int(np.sum(ySucc))
  288. all_count = int(np.sum(all))
  289. if all_count < 1:
  290. return False
  291. pos = np.where(ySucc > all)
  292. ySucc[pos] = all[pos]
  293. ySucc = ySucc / (all + 0.00000001)
  294. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  295. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  296. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  297. return True
  298. def _label(self, path, count, all):
  299. ratio = 0.00
  300. if all > 0:
  301. ratio = round(count * 100 / all, 2)
  302. items = re.split(r'/', path)
  303. if len(items) == 6:
  304. (_, _sday, _chname, _quality, _card_type, _amount) = items
  305. card_type = ''
  306. if _card_type == '1':
  307. card_type = 'SY'
  308. elif _card_type == '2':
  309. card_type = 'SH'
  310. elif _card_type == '4':
  311. card_type = 'YD'
  312. elif _card_type == '5':
  313. card_type = 'LT'
  314. elif _card_type == '6':
  315. card_type = 'DX'
  316. elif _card_type == '7':
  317. card_type = 'TH'
  318. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  319. else:
  320. if path == '' or path is None:
  321. path = 'average'
  322. return f"{path}:{count}/{all} = {ratio}%"
  323. pass
  324. #统计机构当前时间之前序列时间成功率
  325. def _merge_mobile_path(self,paths):
  326. result = {}
  327. for path in paths:
  328. items = re.split(r'/', path)
  329. if len(items) != 6:
  330. continue
  331. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  332. _card_type = int(_card_type)
  333. if _card_type not in [4, 5, 6]:
  334. continue
  335. _mchid = int(_mchid)
  336. if _mchid not in result:
  337. result[_mchid] = []
  338. result[_mchid].append(path)
  339. return result
  340. def _merge_data(self, hfive, paths):
  341. dim = len(self.pos_map)
  342. predata = np.zeros((dim, 86400))
  343. for path in paths:
  344. predata += hfive[path]
  345. return predata
  346. def _calc_mratio(self,data,startes,end):
  347. all = data[1] + data[2]
  348. ySucc = data[1]
  349. x = np.arange(0, 86400, 1)
  350. result = {}
  351. for start in startes:
  352. if end - start < 0:
  353. start_pos = 0
  354. else:
  355. start_pos = end - start
  356. pos = np.where(x >= start_pos)
  357. t = x[pos]
  358. _all = all[pos]
  359. _succ = ySucc[pos]
  360. pos = np.where(t < end)
  361. _all = _all[pos]
  362. _succ = _succ[pos]
  363. succ_count = np.sum(_succ)
  364. all_count = np.sum(_all) + 0.00001
  365. ratio = round(succ_count / all_count, 2)
  366. if ratio > 1.0:
  367. ratio = 1.0
  368. result[start] = ratio
  369. return result
  370. def mratios(self, time_stamp,presecs):
  371. paths = self.paths(time_stamp)
  372. mchid_paths = self._merge_mobile_path(paths)
  373. day_stamp = self.day_stamp(time_stamp)
  374. mratios = {}
  375. hfive = h5py.File(self._file_name, 'r')
  376. for mchid, paths in mchid_paths.items():
  377. mdata = self._merge_data(hfive,paths)
  378. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  379. mratios[mchid] = result
  380. hfive.close()
  381. return mratios
  382. def calc_ratio(self):
  383. import json
  384. r = None
  385. try:
  386. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  387. r = redis.Redis(connection_pool=pool)
  388. except Exception as ex:
  389. print(ex)
  390. while True:
  391. try:
  392. time_sec = int(stime.time())
  393. presecs = [900, 1800, 3600, 7200, 86400]
  394. mratios = self.mratios(time_sec, presecs)
  395. if len(mratios) != 0:
  396. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  397. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  398. print('push msg=',mratios)
  399. except Exception as ex:
  400. print(ex)
  401. finally:
  402. stime.sleep(2)
  403. mchDataCenter = MchDataCenter()