MchDataCenter.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. class MchDataCenter(object):
  14. pos_map = {
  15. 'commit': 0, 'success': 1, 'fail': 2
  16. }
  17. def __init__(self):
  18. self._mquit = False
  19. self._mRHost = ''
  20. self._mRPort = 6379
  21. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  22. def set_redis(self, rhost, rport):
  23. self._mRHost = rhost
  24. self._mRPort = rport
  25. def stop(self):
  26. self._mquit = True
  27. pass
  28. def prepare_data(self):
  29. while self._mquit == False:
  30. try:
  31. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  32. r = redis.Redis(connection_pool=pool)
  33. if path.exists(self._file_name):
  34. hfive = h5py.File(self._file_name, 'a')
  35. else:
  36. hfive = h5py.File(self._file_name, 'w')
  37. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  38. self.read_redis(hfive, r, 'nc_user_monitor_success', 'success')
  39. self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail')
  40. hfive.close()
  41. self.del_redis(r, 'nc_user_monitor_commit')
  42. self.del_redis(r, 'nc_user_monitor_success')
  43. self.del_redis(r, 'nc_user_monitor_fail')
  44. except Exception as ex:
  45. print(ex)
  46. finally:
  47. for i in range(60):
  48. if self._mquit:
  49. break
  50. else:
  51. stime.sleep(1)
  52. def del_redis(self, redis, name):
  53. latest_time = int(stime.time()) - 2
  54. for item in redis.hscan_iter(name):
  55. key = str(item[0], encoding="utf-8")
  56. items = re.split(r'-', key)
  57. fdel = True
  58. if len(items) == 5:
  59. (mchid, quality, card_type, amount, time) = items
  60. time = int(time)
  61. if latest_time <= time:
  62. fdel = False
  63. if fdel:
  64. redis.hdel(name, key)
  65. pass
  66. def read_redis(self, hfive, redis, name, prefix):
  67. for item in redis.hscan_iter(name):
  68. key = str(item[0], encoding="utf-8")
  69. val = str(item[1], encoding="utf-8")
  70. self.parase(hfive, key, val, prefix)
  71. def parase(self, hfive, text, val, prefix):
  72. items = re.split(r'-', text)
  73. if len(items) != 5:
  74. return False
  75. (mchid, quality, card_type, amount, time) = items
  76. pos = self.pos_map[f'{prefix}']
  77. time = int(time)
  78. today = self.day_stamp(time)
  79. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  80. if path not in hfive:
  81. dim = len(self.pos_map)
  82. hfive[path] = np.zeros((dim, 86400))
  83. diff = time - today
  84. if diff < 0:
  85. print(diff)
  86. hfive[path][pos, diff] = val
  87. print(path, pos, diff, val, hfive[path][pos, diff])
  88. pass
  89. def day_stamp(self, stamp):
  90. stamp = int(stamp)
  91. x = stime.gmtime(stamp + 8 * 3600)
  92. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  93. today = stamp - diff.total_seconds()
  94. return int(today)
  95. def _days(self, root):
  96. result = []
  97. try:
  98. for name, sub in root.items():
  99. if isinstance(sub, h5py.Group):
  100. result.append(name)
  101. except Exception as ex:
  102. print(ex)
  103. finally:
  104. return result
  105. def days(self):
  106. try:
  107. hfive = h5py.File(self._file_name, 'r')
  108. root = hfive.require_group('/')
  109. days = self._days(root)
  110. hfive.close()
  111. return days
  112. except Exception as ex:
  113. print(ex)
  114. return []
  115. def paths(self, time_stamp):
  116. try:
  117. day_stamp = self.day_stamp(time_stamp)
  118. hfive = h5py.File(self._file_name, 'r')
  119. group = hfive.require_group(f'/{day_stamp}')
  120. paths = self.dir(group)
  121. hfive.close()
  122. return paths
  123. except Exception as ex:
  124. print(ex)
  125. return []
  126. def dir(self, group):
  127. result = []
  128. for name, sub in group.items():
  129. if isinstance(sub, h5py.Group):
  130. result.extend(self.dir(sub))
  131. else:
  132. result.append(sub.name)
  133. return result
  134. def _all_none(self, **kwargs):
  135. for key, val in kwargs.items():
  136. if val is not None:
  137. return False
  138. return True
  139. def _merge_path(self,paths):
  140. result = {}
  141. for path in paths:
  142. items = re.split(r'/', path)
  143. if len(items) != 6:
  144. continue
  145. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  146. _mchid = int(_mchid)
  147. if _mchid not in result:
  148. result[_mchid] = []
  149. result[_mchid].append(path)
  150. return result
  151. def draw_plot(self, start_time, interval=300, **kwargs):
  152. logger = logging.getLogger('app')
  153. hfive = h5py.File(self._file_name, 'r')
  154. try:
  155. day_stamp = self.day_stamp(start_time)
  156. start_pos = start_time - day_stamp
  157. cur_day = self.day_stamp(stime.time())
  158. if day_stamp == cur_day:
  159. end_pos = int(stime.time()) - day_stamp
  160. else:
  161. end_pos = -1
  162. fig = Figure(figsize=(16, 8))
  163. ax = fig.subplots()
  164. dim = len(self.pos_map)
  165. predata = np.zeros((dim, 86400))
  166. x = np.arange(0, 86400, interval)
  167. sub_count = 0
  168. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  169. if self._all_none(**kwargs):
  170. paths = self._merge_path(paths)
  171. for mchid, data in self._read_dict_data(hfive, paths):
  172. predata = predata + data
  173. path = f'{mchid}'
  174. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  175. if ret:
  176. sub_count += 1
  177. pass
  178. else:
  179. for path, data in self.read_data(hfive, paths):
  180. data = np.array(data)
  181. predata = predata + data
  182. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  183. if ret:
  184. sub_count += 1
  185. if sub_count > 1:
  186. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  187. ax.legend()
  188. ax.grid()
  189. ax.set_title('success ratio')
  190. ax.set(xlabel='time', ylabel='ratio')
  191. fig.autofmt_xdate()
  192. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  193. buf = BytesIO()
  194. fig.savefig(buf, format="png")
  195. return buf
  196. except Exception as ex:
  197. print(ex)
  198. finally:
  199. hfive.close()
  200. def read_data(self, hfive, paths):
  201. for path in paths:
  202. yield path, hfive[path]
  203. def _read_dict_data(self, hfive, mchPaths):
  204. for mchid, paths in mchPaths.items():
  205. dim = len(self.pos_map)
  206. predata = np.zeros((dim, 86400))
  207. for path in paths:
  208. predata += hfive[path]
  209. yield mchid, predata
  210. def datasets(self, hfive, start_time, **kwargs):
  211. logger = logging.getLogger('app')
  212. day_stamp = self.day_stamp(start_time)
  213. sday = f'{day_stamp}'
  214. root = hfive.require_group('/')
  215. days = self._days(root)
  216. if sday not in days:
  217. return False
  218. group = hfive.require_group(sday)
  219. dsets = self.dir(group)
  220. mchid = quality = card_type = amount = None
  221. for key, val in kwargs.items():
  222. if val is None:
  223. continue
  224. if key == 'mchid':
  225. mchid = val
  226. elif key == 'quality':
  227. quality = f'{val}'
  228. elif key == 'card_type':
  229. card_type = f'{val}'
  230. elif key == 'amount':
  231. amount = f'{val}'
  232. else:
  233. continue
  234. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  235. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  236. filer_text = ''
  237. if mchid is not None:
  238. filer_text = mchid
  239. if quality is not None:
  240. filer_text = filer_text + f"-qua:{quality}"
  241. if card_type is not None:
  242. filer_text = filer_text + f"-type:{card_type}"
  243. if amount is not None:
  244. filer_text = filer_text + f"-amount:{amount}"
  245. paths = []
  246. for text in dsets:
  247. items = re.split(r'/', text)
  248. if len(items) != 6:
  249. return False
  250. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  251. if (mchid is not None) and (_mchid != mchid):
  252. continue
  253. if (quality is not None) and (_quality != quality):
  254. continue
  255. if (card_type is not None) and (_card_type != card_type):
  256. continue
  257. if (amount is not None) and (_amount != amount):
  258. continue
  259. paths.append(text)
  260. return filer_text, paths
  261. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  262. # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2
  263. logging.getLogger('app').debug("path=%s", path)
  264. all = data[1] + data[2]
  265. all = all.reshape((-1, interval))
  266. all = np.sum(all, axis=1)
  267. ySucc = data[1]
  268. ySucc = ySucc.reshape((-1, interval))
  269. ySucc = np.sum(ySucc, axis=1)
  270. if end_pos == -1:
  271. pos = np.where(x >= start_pos)
  272. x = x[pos]
  273. ySucc = ySucc[pos]
  274. all = all[pos]
  275. else:
  276. pos = np.where(start_pos <= x)
  277. x = x[pos]
  278. ySucc = ySucc[pos]
  279. all = all[pos]
  280. pos = np.where(x < end_pos)
  281. x = x[pos]
  282. ySucc = ySucc[pos]
  283. all = all[pos]
  284. succ_count = int(np.sum(ySucc))
  285. all_count = int(np.sum(all))
  286. if all_count < 1:
  287. return False
  288. pos = np.where(ySucc > all)
  289. ySucc[pos] = all[pos]
  290. ySucc = ySucc / (all + 0.00000001)
  291. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  292. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  293. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  294. return True
  295. def _label(self, path, count, all):
  296. ratio = 0.00
  297. if all > 0:
  298. ratio = round(count * 100 / all, 2)
  299. items = re.split(r'/', path)
  300. if len(items) == 6:
  301. (_, _sday, _chname, _quality, _card_type, _amount) = items
  302. card_type = ''
  303. if _card_type == '1':
  304. card_type = 'SY'
  305. elif _card_type == '2':
  306. card_type = 'SH'
  307. elif _card_type == '4':
  308. card_type = 'YD'
  309. elif _card_type == '5':
  310. card_type = 'LT'
  311. elif _card_type == '6':
  312. card_type = 'DX'
  313. elif _card_type == '7':
  314. card_type = 'TH'
  315. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  316. else:
  317. if path == '' or path is None:
  318. path = 'average'
  319. return f"{path}:{count}/{all} = {ratio}%"
  320. pass
  321. #统计机构当前时间之前序列时间成功率
  322. def _merge_mobile_path(self,paths):
  323. result = {}
  324. for path in paths:
  325. items = re.split(r'/', path)
  326. if len(items) != 6:
  327. continue
  328. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  329. _card_type = int(_card_type)
  330. if _card_type not in [4, 5, 6]:
  331. continue
  332. _mchid = int(_mchid)
  333. if _mchid not in result:
  334. result[_mchid] = []
  335. result[_mchid].append(path)
  336. return result
  337. def _merge_data(self, hfive, paths):
  338. dim = len(self.pos_map)
  339. predata = np.zeros((dim, 86400))
  340. for path in paths:
  341. predata += hfive[path]
  342. return predata
  343. def _calc_mratio(self,data,startes,end):
  344. succ = data[1]
  345. fail = data[2]
  346. x = np.arange(0, 86400, 1)
  347. result = {}
  348. for start in startes:
  349. if end - start < 0:
  350. start_pos = 0
  351. else:
  352. start_pos = end - start
  353. pos = np.where(x >= start_pos)
  354. t = x[pos]
  355. _fail = fail[pos]
  356. _succ = succ[pos]
  357. pos = np.where(t < end)
  358. _fail = _fail[pos]
  359. _succ = _succ[pos]
  360. succs = int(np.sum(_succ))
  361. fails = int(np.sum(_fail))
  362. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  363. result[start] = [succs, fails, ratio]
  364. return result
  365. def mratios(self, time_stamp,presecs):
  366. paths = self.paths(time_stamp)
  367. mchid_paths = self._merge_mobile_path(paths)
  368. day_stamp = self.day_stamp(time_stamp)
  369. mratios = {}
  370. hfive = h5py.File(self._file_name, 'r')
  371. for mchid, paths in mchid_paths.items():
  372. mdata = self._merge_data(hfive,paths)
  373. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  374. mratios[mchid] = result
  375. hfive.close()
  376. return mratios
  377. def calc_ratio(self):
  378. import json
  379. r = None
  380. try:
  381. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  382. r = redis.Redis(connection_pool=pool)
  383. except Exception as ex:
  384. print(ex)
  385. while True:
  386. try:
  387. time_sec = int(stime.time())
  388. presecs = [900, 1800, 3600, 7200, 86400]
  389. mratios = self.mratios(time_sec, presecs)
  390. if len(mratios) != 0:
  391. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  392. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  393. print('push msg=',mratios)
  394. except Exception as ex:
  395. print(ex)
  396. finally:
  397. stime.sleep(2)
  398. #以下为按照卡类型计算成功率代码
  399. def _merge_mobile_type_path(self,paths,card_type=None):
  400. result = {}
  401. for path in paths:
  402. items = re.split(r'/', path)
  403. if len(items) != 6:
  404. continue
  405. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  406. _card_type = int(_card_type)
  407. if _card_type not in [4, 5, 6]:
  408. continue
  409. if card_type is not None and _card_type != card_type:
  410. continue
  411. _mchid = int(_mchid)
  412. if _mchid not in result:
  413. result[_mchid] = []
  414. result[_mchid].append(path)
  415. return result
  416. def mratio_types(self, time_stamp,presecs):
  417. paths = self.paths(time_stamp)
  418. mchid_paths = self._merge_mobile_type_path(paths)
  419. day_stamp = self.day_stamp(time_stamp)
  420. card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
  421. mratios = {}
  422. hfive = h5py.File(self._file_name, 'r')
  423. for mchid, paths in mchid_paths.items():
  424. mch_ratios = {}
  425. for card_type, name in card_types.items():
  426. print('card_type=', card_type, 'name=', name)
  427. if card_type is None:
  428. cur_paths = paths
  429. else:
  430. cur_paths = self._merge_mobile_type_path(paths, card_type)
  431. if len(cur_paths) == 0:
  432. continue
  433. cur_paths = cur_paths[mchid]
  434. mdata = self._merge_data(hfive,cur_paths)
  435. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  436. mch_ratios[name] = result
  437. mratios[mchid] = mch_ratios
  438. hfive.close()
  439. return mratios
  440. def calc_ratios(self):
  441. import json
  442. r = None
  443. try:
  444. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  445. r = redis.Redis(connection_pool=pool)
  446. except Exception as ex:
  447. print(ex)
  448. while True:
  449. try:
  450. time_sec = int(stime.time())
  451. presecs = [900, 1800, 3600, 7200, 86400]
  452. mratios = self.mratio_types(time_sec, presecs)
  453. if len(mratios) != 0:
  454. r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
  455. # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  456. print('push msg=', mratios)
  457. except Exception as ex:
  458. print(ex)
  459. finally:
  460. stime.sleep(2)
  461. ####################################################################################################################
  462. ####
  463. def _calc_mcount(self,data,startes,end):
  464. succ = data[1]
  465. fail = data[2]
  466. x = np.arange(0, 86400, 1)
  467. result = {}
  468. for start in startes:
  469. if end - start < 0:
  470. start_pos = 0
  471. else:
  472. start_pos = end - start
  473. pos = np.where(x >= start_pos)
  474. t = x[pos]
  475. _succ = succ[pos]
  476. _fail = fail[pos]
  477. pos = np.where(t < end)
  478. _succ = _succ[pos]
  479. _fail = _fail[pos]
  480. succs = int(np.sum(_succ))
  481. fails = int(np.sum(_fail))
  482. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  483. result[start] = [succs, fails, ratio]
  484. return result
  485. def merchant_rmobile_path(self, paths):
  486. result = {}
  487. for path in paths:
  488. items = re.split(r'/', path)
  489. if len(items) != 6:
  490. continue
  491. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  492. _card_type = int(_card_type)
  493. if _card_type not in [4, 5, 6]:
  494. continue
  495. _mchid = int(_mchid)
  496. if _mchid not in result:
  497. result[_mchid] = []
  498. result[_mchid].append(path)
  499. return result
  500. def rmobile_path(self, paths):
  501. result = []
  502. for path in paths:
  503. items = re.split(r'/', path)
  504. if len(items) != 6:
  505. continue
  506. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  507. _card_type = int(_card_type)
  508. if _card_type not in [4, 5, 6]:
  509. continue
  510. result.append(path)
  511. return result
  512. def mch_count(self, paths, presecs,time_stamp):
  513. hfive = h5py.File(self._file_name, 'r')
  514. mdata = self._merge_data(hfive,paths)
  515. day_stamp = self.day_stamp(time_stamp)
  516. result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  517. hfive.close()
  518. return result
  519. def mch_detail_count(self, paths, presecs,time_stamp):
  520. hfive = h5py.File(self._file_name, 'r')
  521. result = {}
  522. for path in paths:
  523. items = re.split(r'/', path)
  524. if len(items) != 6:
  525. continue
  526. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  527. key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
  528. mdata = hfive[path]
  529. day_stamp = self.day_stamp(time_stamp)
  530. result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  531. hfive.close()
  532. return result
  533. def mch_counts(self):
  534. import json
  535. r = None
  536. try:
  537. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  538. r = redis.Redis(connection_pool=pool)
  539. except Exception as ex:
  540. print(ex)
  541. while True:
  542. try:
  543. time_stamp = int(stime.time())
  544. presecs = [900, 1800, 3600, 7200, 86400]
  545. all_paths = self.paths(time_stamp)
  546. mchid_paths = self.merchant_rmobile_path(all_paths)
  547. gross = {}
  548. for mchid, paths in mchid_paths.items():
  549. counts = self.mch_count(paths, presecs, time_stamp)
  550. gross[mchid] = counts
  551. paths = self.rmobile_path(all_paths)
  552. detail = self.mch_detail_count(paths, presecs, time_stamp)
  553. result = {'gross': gross, 'detail': detail}
  554. if len(result) != 0:
  555. r.set(f"nc_merchant_refill_counts", json.dumps(result))
  556. r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
  557. print('push msg=', result)
  558. except Exception as ex:
  559. print(ex)
  560. finally:
  561. stime.sleep(2)
  562. mchDataCenter = MchDataCenter()