MchDataCenter.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. class MchDataCenter(object):
  14. pos_map = {
  15. 'commit': 0, 'success': 1, 'fail': 2
  16. }
  17. def __init__(self):
  18. self._mquit = False
  19. self._mRHost = ''
  20. self._mRPort = 6379
  21. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  22. def set_redis(self, rhost, rport):
  23. self._mRHost = rhost
  24. self._mRPort = rport
  25. def stop(self):
  26. self._mquit = True
  27. pass
  28. def prepare_data(self):
  29. while self._mquit == False:
  30. try:
  31. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  32. r = redis.Redis(connection_pool=pool)
  33. if path.exists(self._file_name):
  34. hfive = h5py.File(self._file_name, 'a')
  35. else:
  36. hfive = h5py.File(self._file_name, 'w')
  37. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  38. self.read_redis(hfive, r, 'nc_user_monitor_success', 'success')
  39. self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail')
  40. hfive.close()
  41. self.del_redis(r, 'nc_user_monitor_commit')
  42. self.del_redis(r, 'nc_user_monitor_success')
  43. self.del_redis(r, 'nc_user_monitor_fail')
  44. except Exception as ex:
  45. print(ex)
  46. finally:
  47. for i in range(60):
  48. if self._mquit:
  49. break
  50. else:
  51. stime.sleep(1)
  52. def del_redis(self, redis, name):
  53. latest_time = int(stime.time()) - 300
  54. for item in redis.hscan_iter(name):
  55. key = str(item[0], encoding="utf-8")
  56. items = re.split(r'-', key)
  57. fdel = True
  58. if len(items) == 5:
  59. (mchid, quality, card_type, amount, time) = items
  60. time = int(time)
  61. if latest_time <= time:
  62. fdel = False
  63. if fdel:
  64. redis.hdel(name, key)
  65. pass
  66. def read_redis(self, hfive, redis, name, prefix):
  67. i = 0
  68. for item in redis.hscan_iter(name):
  69. key = str(item[0], encoding="utf-8")
  70. val = str(item[1], encoding="utf-8")
  71. print(f'{prefix}:{i}')
  72. i += 1
  73. self.parase(hfive, key, val, prefix)
  74. def parase(self, hfive, text, val, prefix):
  75. items = re.split(r'-', text)
  76. if len(items) != 5:
  77. return False
  78. (mchid, quality, card_type, amount, time) = items
  79. pos = self.pos_map[f'{prefix}']
  80. time = int(time)
  81. today = self.day_stamp(time)
  82. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  83. if path not in hfive:
  84. dim = len(self.pos_map)
  85. hfive[path] = np.zeros((dim, 86400))
  86. diff = time - today
  87. if diff < 0:
  88. print(diff)
  89. hfive[path][pos, diff] = val
  90. print(path, pos, diff, val, hfive[path][pos, diff])
  91. pass
  92. def day_stamp(self, stamp):
  93. stamp = int(stamp)
  94. x = stime.gmtime(stamp + 8 * 3600)
  95. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  96. today = stamp - diff.total_seconds()
  97. return int(today)
  98. def _days(self, root):
  99. result = []
  100. try:
  101. for name, sub in root.items():
  102. if isinstance(sub, h5py.Group):
  103. result.append(name)
  104. except Exception as ex:
  105. print(ex)
  106. finally:
  107. return result
  108. def days(self):
  109. try:
  110. hfive = h5py.File(self._file_name, 'r')
  111. root = hfive.require_group('/')
  112. days = self._days(root)
  113. hfive.close()
  114. return days
  115. except Exception as ex:
  116. print(ex)
  117. return []
  118. def paths(self, time_stamp):
  119. try:
  120. day_stamp = self.day_stamp(time_stamp)
  121. hfive = h5py.File(self._file_name, 'r')
  122. group = hfive.require_group(f'/{day_stamp}')
  123. paths = self.dir(group)
  124. hfive.close()
  125. return paths
  126. except Exception as ex:
  127. print(ex)
  128. return []
  129. def dir(self, group):
  130. result = []
  131. for name, sub in group.items():
  132. if isinstance(sub, h5py.Group):
  133. result.extend(self.dir(sub))
  134. else:
  135. result.append(sub.name)
  136. return result
  137. def _all_none(self, **kwargs):
  138. for key, val in kwargs.items():
  139. if val is not None:
  140. return False
  141. return True
  142. def _merge_path(self,paths):
  143. result = {}
  144. for path in paths:
  145. items = re.split(r'/', path)
  146. if len(items) != 6:
  147. continue
  148. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  149. _mchid = int(_mchid)
  150. if _mchid not in result:
  151. result[_mchid] = []
  152. result[_mchid].append(path)
  153. return result
  154. def draw_plot(self, start_time, interval=300, **kwargs):
  155. logger = logging.getLogger('app')
  156. hfive = h5py.File(self._file_name, 'r')
  157. try:
  158. day_stamp = self.day_stamp(start_time)
  159. start_pos = start_time - day_stamp
  160. cur_day = self.day_stamp(stime.time())
  161. if day_stamp == cur_day:
  162. end_pos = int(stime.time()) - day_stamp
  163. else:
  164. end_pos = -1
  165. fig = Figure(figsize=(16, 8))
  166. ax = fig.subplots()
  167. dim = len(self.pos_map)
  168. predata = np.zeros((dim, 86400))
  169. x = np.arange(0, 86400, interval)
  170. sub_count = 0
  171. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  172. if self._all_none(**kwargs):
  173. paths = self._merge_path(paths)
  174. for mchid, data in self._read_dict_data(hfive, paths):
  175. predata = predata + data
  176. path = f'{mchid}'
  177. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  178. if ret:
  179. sub_count += 1
  180. pass
  181. else:
  182. for path, data in self.read_data(hfive, paths):
  183. data = np.array(data)
  184. predata = predata + data
  185. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  186. if ret:
  187. sub_count += 1
  188. if sub_count > 1:
  189. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  190. ax.legend()
  191. ax.grid()
  192. ax.set_title('success ratio')
  193. ax.set(xlabel='time', ylabel='ratio')
  194. fig.autofmt_xdate()
  195. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  196. buf = BytesIO()
  197. fig.savefig(buf, format="png")
  198. return buf
  199. except Exception as ex:
  200. print(ex)
  201. finally:
  202. hfive.close()
  203. def read_data(self, hfive, paths):
  204. for path in paths:
  205. yield path, hfive[path]
  206. def _read_dict_data(self, hfive, mchPaths):
  207. for mchid, paths in mchPaths.items():
  208. dim = len(self.pos_map)
  209. predata = np.zeros((dim, 86400))
  210. for path in paths:
  211. predata += hfive[path]
  212. yield mchid, predata
  213. def datasets(self, hfive, start_time, **kwargs):
  214. logger = logging.getLogger('app')
  215. day_stamp = self.day_stamp(start_time)
  216. sday = f'{day_stamp}'
  217. root = hfive.require_group('/')
  218. days = self._days(root)
  219. if sday not in days:
  220. return False
  221. group = hfive.require_group(sday)
  222. dsets = self.dir(group)
  223. mchid = quality = card_type = amount = None
  224. for key, val in kwargs.items():
  225. if val is None:
  226. continue
  227. if key == 'mchid':
  228. mchid = val
  229. elif key == 'quality':
  230. quality = f'{val}'
  231. elif key == 'card_type':
  232. card_type = f'{val}'
  233. elif key == 'amount':
  234. amount = f'{val}'
  235. else:
  236. continue
  237. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  238. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  239. filer_text = ''
  240. if mchid is not None:
  241. filer_text = mchid
  242. if quality is not None:
  243. filer_text = filer_text + f"-qua:{quality}"
  244. if card_type is not None:
  245. filer_text = filer_text + f"-type:{card_type}"
  246. if amount is not None:
  247. filer_text = filer_text + f"-amount:{amount}"
  248. paths = []
  249. for text in dsets:
  250. items = re.split(r'/', text)
  251. if len(items) != 6:
  252. return False
  253. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  254. if (mchid is not None) and (_mchid != mchid):
  255. continue
  256. if (quality is not None) and (_quality != quality):
  257. continue
  258. if (card_type is not None) and (_card_type != card_type):
  259. continue
  260. if (amount is not None) and (_amount != amount):
  261. continue
  262. paths.append(text)
  263. return filer_text, paths
  264. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  265. # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2
  266. logging.getLogger('app').debug("path=%s", path)
  267. all = data[1] + data[2]
  268. all = all.reshape((-1, interval))
  269. all = np.sum(all, axis=1)
  270. ySucc = data[1]
  271. ySucc = ySucc.reshape((-1, interval))
  272. ySucc = np.sum(ySucc, axis=1)
  273. if end_pos == -1:
  274. pos = np.where(x >= start_pos)
  275. x = x[pos]
  276. ySucc = ySucc[pos]
  277. all = all[pos]
  278. else:
  279. pos = np.where(start_pos <= x)
  280. x = x[pos]
  281. ySucc = ySucc[pos]
  282. all = all[pos]
  283. pos = np.where(x < end_pos)
  284. x = x[pos]
  285. ySucc = ySucc[pos]
  286. all = all[pos]
  287. succ_count = int(np.sum(ySucc))
  288. all_count = int(np.sum(all))
  289. if all_count < 1:
  290. return False
  291. pos = np.where(ySucc > all)
  292. ySucc[pos] = all[pos]
  293. ySucc = ySucc / (all + 0.00000001)
  294. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  295. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  296. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  297. return True
  298. def _label(self, path, count, all):
  299. ratio = 0.00
  300. if all > 0:
  301. ratio = round(count * 100 / all, 2)
  302. items = re.split(r'/', path)
  303. if len(items) == 6:
  304. (_, _sday, _chname, _quality, _card_type, _amount) = items
  305. card_type = ''
  306. if _card_type == '1':
  307. card_type = 'SY'
  308. elif _card_type == '2':
  309. card_type = 'SH'
  310. elif _card_type == '4':
  311. card_type = 'YD'
  312. elif _card_type == '5':
  313. card_type = 'LT'
  314. elif _card_type == '6':
  315. card_type = 'DX'
  316. elif _card_type == '7':
  317. card_type = 'TH'
  318. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  319. else:
  320. if path == '' or path is None:
  321. path = 'average'
  322. return f"{path}:{count}/{all} = {ratio}%"
  323. pass
  324. #统计机构当前时间之前序列时间成功率
  325. def _merge_mobile_path(self,paths):
  326. result = {}
  327. for path in paths:
  328. items = re.split(r'/', path)
  329. if len(items) != 6:
  330. continue
  331. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  332. _card_type = int(_card_type)
  333. if _card_type not in [4, 5, 6]:
  334. continue
  335. _mchid = int(_mchid)
  336. if _mchid not in result:
  337. result[_mchid] = []
  338. result[_mchid].append(path)
  339. return result
  340. def _merge_data(self, hfive, paths):
  341. dim = len(self.pos_map)
  342. predata = np.zeros((dim, 86400))
  343. for path in paths:
  344. predata += hfive[path]
  345. return predata
  346. def _calc_mratio(self,data,startes,end):
  347. succ = data[1]
  348. fail = data[2]
  349. x = np.arange(0, 86400, 1)
  350. result = {}
  351. for start in startes:
  352. if end - start < 0:
  353. start_pos = 0
  354. else:
  355. start_pos = end - start
  356. pos = np.where(x >= start_pos)
  357. t = x[pos]
  358. _fail = fail[pos]
  359. _succ = succ[pos]
  360. pos = np.where(t < end)
  361. _fail = _fail[pos]
  362. _succ = _succ[pos]
  363. succs = int(np.sum(_succ))
  364. fails = int(np.sum(_fail))
  365. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  366. result[start] = [succs, fails, ratio]
  367. return result
  368. def mratios(self, time_stamp,presecs):
  369. paths = self.paths(time_stamp)
  370. mchid_paths = self._merge_mobile_path(paths)
  371. day_stamp = self.day_stamp(time_stamp)
  372. mratios = {}
  373. hfive = h5py.File(self._file_name, 'r')
  374. for mchid, paths in mchid_paths.items():
  375. mdata = self._merge_data(hfive,paths)
  376. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  377. mratios[mchid] = result
  378. hfive.close()
  379. return mratios
  380. def calc_ratio(self):
  381. import json
  382. r = None
  383. try:
  384. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  385. r = redis.Redis(connection_pool=pool)
  386. except Exception as ex:
  387. print(ex)
  388. while True:
  389. try:
  390. time_sec = int(stime.time())
  391. presecs = [900, 1800, 3600, 7200, 86400]
  392. mratios = self.mratios(time_sec, presecs)
  393. if len(mratios) != 0:
  394. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  395. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  396. print('push msg=',mratios)
  397. except Exception as ex:
  398. print(ex)
  399. finally:
  400. stime.sleep(2)
  401. #以下为按照卡类型计算成功率代码
  402. def _merge_mobile_type_path(self,paths,card_type=None):
  403. result = {}
  404. for path in paths:
  405. items = re.split(r'/', path)
  406. if len(items) != 6:
  407. continue
  408. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  409. _card_type = int(_card_type)
  410. if _card_type not in [4, 5, 6]:
  411. continue
  412. if card_type is not None and _card_type != card_type:
  413. continue
  414. _mchid = int(_mchid)
  415. if _mchid not in result:
  416. result[_mchid] = []
  417. result[_mchid].append(path)
  418. return result
  419. def mratio_types(self, time_stamp,presecs):
  420. paths = self.paths(time_stamp)
  421. mchid_paths = self._merge_mobile_type_path(paths)
  422. day_stamp = self.day_stamp(time_stamp)
  423. card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
  424. mratios = {}
  425. hfive = h5py.File(self._file_name, 'r')
  426. for mchid, paths in mchid_paths.items():
  427. mch_ratios = {}
  428. for card_type, name in card_types.items():
  429. print('card_type=', card_type, 'name=', name)
  430. if card_type is None:
  431. cur_paths = paths
  432. else:
  433. cur_paths = self._merge_mobile_type_path(paths, card_type)
  434. if len(cur_paths) == 0:
  435. continue
  436. cur_paths = cur_paths[mchid]
  437. mdata = self._merge_data(hfive,cur_paths)
  438. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  439. mch_ratios[name] = result
  440. mratios[mchid] = mch_ratios
  441. hfive.close()
  442. return mratios
  443. def calc_ratios(self):
  444. import json
  445. r = None
  446. try:
  447. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  448. r = redis.Redis(connection_pool=pool)
  449. except Exception as ex:
  450. print(ex)
  451. while True:
  452. try:
  453. time_sec = int(stime.time())
  454. presecs = [900, 1800, 3600, 7200, 86400]
  455. mratios = self.mratio_types(time_sec, presecs)
  456. if len(mratios) != 0:
  457. r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
  458. # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  459. print('push msg=', mratios)
  460. except Exception as ex:
  461. print(ex)
  462. finally:
  463. stime.sleep(2)
  464. ####################################################################################################################
  465. ####
  466. def _calc_mcount(self,data,startes,end):
  467. succ = data[1]
  468. fail = data[2]
  469. x = np.arange(0, 86400, 1)
  470. result = {}
  471. for start in startes:
  472. if end - start < 0:
  473. start_pos = 0
  474. else:
  475. start_pos = end - start
  476. pos = np.where(x >= start_pos)
  477. t = x[pos]
  478. _succ = succ[pos]
  479. _fail = fail[pos]
  480. pos = np.where(t < end)
  481. _succ = _succ[pos]
  482. _fail = _fail[pos]
  483. succs = int(np.sum(_succ))
  484. fails = int(np.sum(_fail))
  485. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  486. result[start] = [succs, fails, ratio]
  487. return result
  488. def merchant_rmobile_path(self, paths):
  489. result = {}
  490. for path in paths:
  491. items = re.split(r'/', path)
  492. if len(items) != 6:
  493. continue
  494. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  495. _card_type = int(_card_type)
  496. if _card_type not in [4, 5, 6]:
  497. continue
  498. _mchid = int(_mchid)
  499. if _mchid not in result:
  500. result[_mchid] = []
  501. result[_mchid].append(path)
  502. return result
  503. def rmobile_path(self, paths):
  504. result = []
  505. for path in paths:
  506. items = re.split(r'/', path)
  507. if len(items) != 6:
  508. continue
  509. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  510. _card_type = int(_card_type)
  511. if _card_type not in [4, 5, 6]:
  512. continue
  513. result.append(path)
  514. return result
  515. def mch_count(self, paths, presecs,time_stamp):
  516. hfive = h5py.File(self._file_name, 'r')
  517. mdata = self._merge_data(hfive,paths)
  518. day_stamp = self.day_stamp(time_stamp)
  519. result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  520. hfive.close()
  521. return result
  522. def mch_detail_count(self, paths, presecs,time_stamp):
  523. hfive = h5py.File(self._file_name, 'r')
  524. result = {}
  525. for path in paths:
  526. items = re.split(r'/', path)
  527. if len(items) != 6:
  528. continue
  529. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  530. key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
  531. mdata = hfive[path]
  532. day_stamp = self.day_stamp(time_stamp)
  533. result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  534. hfive.close()
  535. return result
  536. def mch_counts(self):
  537. import json
  538. r = None
  539. try:
  540. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  541. r = redis.Redis(connection_pool=pool)
  542. except Exception as ex:
  543. print(ex)
  544. while True:
  545. try:
  546. time_stamp = int(stime.time() - 2 * 86400)
  547. presecs = [900, 1800, 3600, 7200, 86400]
  548. all_paths = self.paths(time_stamp)
  549. mchid_paths = self.merchant_rmobile_path(all_paths)
  550. all = {}
  551. for mchid, paths in mchid_paths.items():
  552. counts = self.mch_count(paths, presecs, time_stamp)
  553. all[mchid] = counts
  554. paths = self.rmobile_path(all_paths)
  555. detail = self.mch_detail_count(paths, presecs, time_stamp)
  556. result = {'all': all, 'detail': detail}
  557. if len(result) != 0:
  558. r.set(f"nc_merchant_refill_counts", json.dumps(result))
  559. r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
  560. print('push msg=', result)
  561. except Exception as ex:
  562. print(ex)
  563. finally:
  564. stime.sleep(2)
  565. mchDataCenter = MchDataCenter()