MchDataCenter.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. from datetime import datetime
  9. import numpy as np
  10. from matplotlib.figure import Figure
  11. from matplotlib import ticker
  12. from io import BytesIO
  13. import logging
  14. class MchDataCenter(object):
  15. latest_delta = 10
  16. pos_map = {
  17. 'commit': 0, 'success': 1, 'fail': 2
  18. }
  19. def __init__(self):
  20. self._mquit = False
  21. self._mRHost = ''
  22. self._mRPort = 6379
  23. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  24. def set_redis(self, rhost, rport):
  25. self._mRHost = rhost
  26. self._mRPort = rport
  27. def stop(self):
  28. self._mquit = True
  29. pass
  30. def prepare_data(self):
  31. while self._mquit == False:
  32. try:
  33. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  34. r = redis.Redis(connection_pool=pool)
  35. if path.exists(self._file_name):
  36. hfive = h5py.File(self._file_name, 'a')
  37. else:
  38. hfive = h5py.File(self._file_name, 'w')
  39. latest_time = int(stime.time()) - self.latest_delta
  40. lt = stime.localtime(latest_time)
  41. now_str = stime.strftime('%Y-%m-%d %H:%M:%S', lt)
  42. print('start read',now_str)
  43. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  44. self.read_redis(hfive, r, 'nc_user_monitor_success', 'success')
  45. self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail')
  46. hfive.close()
  47. self.del_redis(r, 'nc_user_monitor_commit',latest_time)
  48. self.del_redis(r, 'nc_user_monitor_success',latest_time)
  49. self.del_redis(r, 'nc_user_monitor_fail',latest_time)
  50. except Exception as ex:
  51. print(ex)
  52. finally:
  53. stime.sleep(1)
  54. def del_redis(self, redis, name, latest_time):
  55. for item in redis.hscan_iter(name):
  56. key = str(item[0], encoding="utf-8")
  57. items = re.split(r'-', key)
  58. flag_del = True
  59. if len(items) == 5:
  60. (mchid, quality, card_type, amount, time) = items
  61. time = int(time)
  62. if latest_time <= time:
  63. flag_del = False
  64. else:
  65. print('delete one error key:', key)
  66. if flag_del:
  67. redis.hdel(name, key)
  68. pass
  69. def read_redis(self, hfive, redis, name, prefix):
  70. for item in redis.hscan_iter(name):
  71. key = str(item[0], encoding="utf-8")
  72. val = str(item[1], encoding="utf-8")
  73. self.parase(hfive, key, val, prefix)
  74. def parase(self, hfive, text, val, prefix):
  75. items = re.split(r'-', text)
  76. if len(items) != 5:
  77. return False
  78. (mchid, quality, card_type, amount, time) = items
  79. pos = self.pos_map[f'{prefix}']
  80. time = int(time)
  81. today = self.day_stamp(time)
  82. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  83. if path not in hfive:
  84. dim = len(self.pos_map)
  85. hfive[path] = np.zeros((dim, 86400))
  86. diff = time - today
  87. if diff < 0:
  88. print(diff)
  89. hfive[path][pos, diff] = val
  90. print(path, pos, prefix, diff, time, val, hfive[path][pos, diff])
  91. pass
  92. def day_stamp(self, stamp):
  93. stamp = int(stamp)
  94. x = stime.gmtime(stamp + 8 * 3600)
  95. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  96. today = stamp - diff.total_seconds()
  97. return int(today)
  98. def _days(self, root):
  99. result = []
  100. try:
  101. for name, sub in root.items():
  102. if isinstance(sub, h5py.Group):
  103. result.append(name)
  104. except Exception as ex:
  105. print(ex)
  106. finally:
  107. return result
  108. def days(self):
  109. try:
  110. hfive = h5py.File(self._file_name, 'r')
  111. root = hfive.require_group('/')
  112. days = self._days(root)
  113. hfive.close()
  114. return days
  115. except Exception as ex:
  116. print(ex)
  117. return []
  118. def paths(self, time_stamp):
  119. try:
  120. day_stamp = self.day_stamp(time_stamp)
  121. hfive = h5py.File(self._file_name, 'r')
  122. group = hfive.require_group(f'/{day_stamp}')
  123. paths = self.dir(group)
  124. hfive.close()
  125. return paths
  126. except Exception as ex:
  127. print(ex)
  128. return []
  129. def dir(self, group):
  130. result = []
  131. for name, sub in group.items():
  132. if isinstance(sub, h5py.Group):
  133. result.extend(self.dir(sub))
  134. else:
  135. result.append(sub.name)
  136. return result
  137. def _all_none(self, **kwargs):
  138. for key, val in kwargs.items():
  139. if val is not None:
  140. return False
  141. return True
  142. def _merge_path(self,paths):
  143. result = {}
  144. for path in paths:
  145. items = re.split(r'/', path)
  146. if len(items) != 6:
  147. continue
  148. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  149. _mchid = int(_mchid)
  150. if _mchid not in result:
  151. result[_mchid] = []
  152. result[_mchid].append(path)
  153. return result
  154. def draw_plot(self, start_time, interval=300, **kwargs):
  155. logger = logging.getLogger('app')
  156. hfive = h5py.File(self._file_name, 'r')
  157. try:
  158. day_stamp = self.day_stamp(start_time)
  159. start_pos = start_time - day_stamp
  160. cur_day = self.day_stamp(stime.time())
  161. if day_stamp == cur_day:
  162. end_pos = int(stime.time()) - day_stamp
  163. else:
  164. end_pos = -1
  165. fig = Figure(figsize=(16, 8))
  166. ax = fig.subplots()
  167. dim = len(self.pos_map)
  168. predata = np.zeros((dim, 86400))
  169. x = np.arange(0, 86400, interval)
  170. sub_count = 0
  171. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  172. if self._all_none(**kwargs):
  173. paths = self._merge_path(paths)
  174. for mchid, data in self._read_dict_data(hfive, paths):
  175. predata = predata + data
  176. path = f'{mchid}'
  177. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  178. if ret:
  179. sub_count += 1
  180. pass
  181. else:
  182. for path, data in self.read_data(hfive, paths):
  183. data = np.array(data)
  184. predata = predata + data
  185. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  186. if ret:
  187. sub_count += 1
  188. if sub_count > 1:
  189. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  190. ax.legend()
  191. ax.grid()
  192. ax.set_title('success ratio')
  193. ax.set(xlabel='time', ylabel='ratio')
  194. fig.autofmt_xdate()
  195. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  196. buf = BytesIO()
  197. fig.savefig(buf, format="png")
  198. return buf
  199. except Exception as ex:
  200. print(ex)
  201. finally:
  202. hfive.close()
  203. def read_data(self, hfive, paths):
  204. for path in paths:
  205. yield path, hfive[path]
  206. def _read_dict_data(self, hfive, mchPaths):
  207. for mchid, paths in mchPaths.items():
  208. dim = len(self.pos_map)
  209. predata = np.zeros((dim, 86400))
  210. for path in paths:
  211. predata += hfive[path]
  212. yield mchid, predata
  213. def datasets(self, hfive, start_time, **kwargs):
  214. logger = logging.getLogger('app')
  215. day_stamp = self.day_stamp(start_time)
  216. sday = f'{day_stamp}'
  217. root = hfive.require_group('/')
  218. days = self._days(root)
  219. if sday not in days:
  220. return False
  221. group = hfive.require_group(sday)
  222. dsets = self.dir(group)
  223. mchid = quality = card_type = amount = None
  224. for key, val in kwargs.items():
  225. if val is None:
  226. continue
  227. if key == 'mchid':
  228. mchid = val
  229. elif key == 'quality':
  230. quality = f'{val}'
  231. elif key == 'card_type':
  232. card_type = f'{val}'
  233. elif key == 'amount':
  234. amount = f'{val}'
  235. else:
  236. continue
  237. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  238. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  239. filer_text = ''
  240. if mchid is not None:
  241. filer_text = mchid
  242. if quality is not None:
  243. filer_text = filer_text + f"-qua:{quality}"
  244. if card_type is not None:
  245. filer_text = filer_text + f"-type:{card_type}"
  246. if amount is not None:
  247. filer_text = filer_text + f"-amount:{amount}"
  248. paths = []
  249. for text in dsets:
  250. items = re.split(r'/', text)
  251. if len(items) != 6:
  252. return False
  253. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  254. if (mchid is not None) and (_mchid != mchid):
  255. continue
  256. if (quality is not None) and (_quality != quality):
  257. continue
  258. if (card_type is not None) and (_card_type != card_type):
  259. continue
  260. if (amount is not None) and (_amount != amount):
  261. continue
  262. paths.append(text)
  263. return filer_text, paths
  264. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  265. # 'commit': 0, 'succ': 1, 'fail': 2
  266. logging.getLogger('app').debug("path=%s", path)
  267. all = data[1] + data[2]
  268. all = all.reshape((-1, interval))
  269. all = np.sum(all, axis=1)
  270. commit = data[0]
  271. commit = commit.reshape((-1, interval))
  272. commit = np.sum(commit, axis=1)
  273. ySucc = data[1]
  274. ySucc = ySucc.reshape((-1, interval))
  275. ySucc = np.sum(ySucc, axis=1)
  276. yFail = data[2]
  277. yFail = yFail.reshape((-1, interval))
  278. yFail = np.sum(yFail, axis=1)
  279. if end_pos == -1:
  280. pos = np.where(x >= start_pos)
  281. x = x[pos]
  282. ySucc = ySucc[pos]
  283. all = all[pos]
  284. commit = commit[pos]
  285. yFail = yFail[pos]
  286. else:
  287. pos = np.where(start_pos <= x)
  288. x = x[pos]
  289. ySucc = ySucc[pos]
  290. all = all[pos]
  291. commit = commit[pos]
  292. yFail = yFail[pos]
  293. pos = np.where(x < end_pos)
  294. x = x[pos]
  295. ySucc = ySucc[pos]
  296. all = all[pos]
  297. commit = commit[pos]
  298. yFail = yFail[pos]
  299. succ_count = int(np.sum(ySucc))
  300. all_count = int(np.sum(all))
  301. commit_count = int(np.sum(commit))
  302. fail_count = int(np.sum(yFail))
  303. if all_count < 1:
  304. return False
  305. pos = np.where(ySucc > all)
  306. ySucc[pos] = all[pos]
  307. ySucc = ySucc / (all + 0.00000001)
  308. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  309. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  310. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count,commit_count,fail_count))
  311. return True
  312. def _label(self, path, succ_count, all, commit_count, fail_count):
  313. ratio = 0.00
  314. if all > 0:
  315. ratio = round(succ_count * 100 / all, 2)
  316. items = re.split(r'/', path)
  317. if len(items) == 6:
  318. (_, _sday, _chname, _quality, _card_type, _amount) = items
  319. card_type = ''
  320. if _card_type == '1':
  321. card_type = 'SY'
  322. elif _card_type == '2':
  323. card_type = 'SH'
  324. elif _card_type == '4':
  325. card_type = 'YD'
  326. elif _card_type == '5':
  327. card_type = 'LT'
  328. elif _card_type == '6':
  329. card_type = 'DX'
  330. elif _card_type == '7':
  331. card_type = 'TH'
  332. return f"{_chname}-{_quality}-{card_type}-{_amount}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
  333. else:
  334. if path == '' or path is None:
  335. path = 'average'
  336. return f"{path}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
  337. pass
  338. #统计机构当前时间之前序列时间成功率
  339. def _merge_mobile_path(self,paths):
  340. result = {}
  341. for path in paths:
  342. items = re.split(r'/', path)
  343. if len(items) != 6:
  344. continue
  345. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  346. _card_type = int(_card_type)
  347. if _card_type not in [4, 5, 6]:
  348. continue
  349. _mchid = int(_mchid)
  350. if _mchid not in result:
  351. result[_mchid] = []
  352. result[_mchid].append(path)
  353. return result
  354. def _merge_data(self, hfive, paths):
  355. dim = len(self.pos_map)
  356. predata = np.zeros((dim, 86400))
  357. for path in paths:
  358. predata += hfive[path]
  359. return predata
  360. def _calc_mratio(self,data,startes,end):
  361. succ = data[1]
  362. fail = data[2]
  363. x = np.arange(0, 86400, 1)
  364. result = {}
  365. for start in startes:
  366. if end - start < 0:
  367. start_pos = 0
  368. else:
  369. start_pos = end - start
  370. pos = np.where(x >= start_pos)
  371. t = x[pos]
  372. _fail = fail[pos]
  373. _succ = succ[pos]
  374. pos = np.where(t < end)
  375. _fail = _fail[pos]
  376. _succ = _succ[pos]
  377. succs = int(np.sum(_succ))
  378. fails = int(np.sum(_fail))
  379. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  380. result[start] = [succs, fails, ratio]
  381. return result
  382. def mratios(self, time_stamp,presecs):
  383. paths = self.paths(time_stamp)
  384. mchid_paths = self._merge_mobile_path(paths)
  385. day_stamp = self.day_stamp(time_stamp)
  386. mratios = {}
  387. hfive = h5py.File(self._file_name, 'r')
  388. for mchid, paths in mchid_paths.items():
  389. mdata = self._merge_data(hfive,paths)
  390. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  391. mratios[mchid] = result
  392. hfive.close()
  393. return mratios
  394. def calc_ratio(self):
  395. import json
  396. r = None
  397. try:
  398. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  399. r = redis.Redis(connection_pool=pool)
  400. except Exception as ex:
  401. print(ex)
  402. while True:
  403. try:
  404. time_sec = int(stime.time())
  405. presecs = [900, 1800, 3600, 7200, 86400]
  406. mratios = self.mratios(time_sec, presecs)
  407. if len(mratios) != 0:
  408. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  409. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  410. print('push msg=',mratios)
  411. except Exception as ex:
  412. print(ex)
  413. finally:
  414. stime.sleep(2)
  415. #以下为按照卡类型计算成功率代码
  416. def _merge_mobile_type_path(self,paths,card_type=None):
  417. result = {}
  418. for path in paths:
  419. items = re.split(r'/', path)
  420. if len(items) != 6:
  421. continue
  422. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  423. _card_type = int(_card_type)
  424. if _card_type not in [4, 5, 6]:
  425. continue
  426. if card_type is not None and _card_type != card_type:
  427. continue
  428. _mchid = int(_mchid)
  429. if _mchid not in result:
  430. result[_mchid] = []
  431. result[_mchid].append(path)
  432. return result
  433. def mratio_types(self, time_stamp,presecs):
  434. paths = self.paths(time_stamp)
  435. mchid_paths = self._merge_mobile_type_path(paths)
  436. day_stamp = self.day_stamp(time_stamp)
  437. card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
  438. mratios = {}
  439. hfive = h5py.File(self._file_name, 'r')
  440. for mchid, paths in mchid_paths.items():
  441. mch_ratios = {}
  442. for card_type, name in card_types.items():
  443. print('card_type=', card_type, 'name=', name)
  444. if card_type is None:
  445. cur_paths = paths
  446. else:
  447. cur_paths = self._merge_mobile_type_path(paths, card_type)
  448. if len(cur_paths) == 0:
  449. continue
  450. cur_paths = cur_paths[mchid]
  451. mdata = self._merge_data(hfive,cur_paths)
  452. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  453. mch_ratios[name] = result
  454. mratios[mchid] = mch_ratios
  455. hfive.close()
  456. return mratios
  457. def calc_ratios(self):
  458. import json
  459. r = None
  460. try:
  461. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  462. r = redis.Redis(connection_pool=pool)
  463. except Exception as ex:
  464. print(ex)
  465. while True:
  466. try:
  467. time_sec = int(stime.time())
  468. presecs = [900, 1800, 3600, 7200, 86400]
  469. mratios = self.mratio_types(time_sec, presecs)
  470. if len(mratios) != 0:
  471. r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
  472. # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  473. print('push msg=', mratios)
  474. except Exception as ex:
  475. print(ex)
  476. finally:
  477. stime.sleep(2)
  478. ####################################################################################################################
  479. ####
  480. def _calc_mcount(self,data,startes,end):
  481. succ = data[1]
  482. fail = data[2]
  483. x = np.arange(0, 86400, 1)
  484. result = {}
  485. for start in startes:
  486. if end - start < 0:
  487. start_pos = 0
  488. else:
  489. start_pos = end - start
  490. pos = np.where(x >= start_pos)
  491. t = x[pos]
  492. _succ = succ[pos]
  493. _fail = fail[pos]
  494. pos = np.where(t < end)
  495. _succ = _succ[pos]
  496. _fail = _fail[pos]
  497. succs = int(np.sum(_succ))
  498. fails = int(np.sum(_fail))
  499. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  500. result[start] = [succs, fails, ratio]
  501. return result
  502. def merchant_rmobile_path(self, paths):
  503. result = {}
  504. for path in paths:
  505. items = re.split(r'/', path)
  506. if len(items) != 6:
  507. continue
  508. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  509. _card_type = int(_card_type)
  510. if _card_type not in [4, 5, 6]:
  511. continue
  512. _mchid = int(_mchid)
  513. if _mchid not in result:
  514. result[_mchid] = []
  515. result[_mchid].append(path)
  516. return result
  517. def rmobile_path(self, paths):
  518. result = []
  519. for path in paths:
  520. items = re.split(r'/', path)
  521. if len(items) != 6:
  522. continue
  523. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  524. _card_type = int(_card_type)
  525. if _card_type not in [4, 5, 6]:
  526. continue
  527. result.append(path)
  528. return result
  529. def mch_count(self, paths, presecs,time_stamp):
  530. hfive = h5py.File(self._file_name, 'r')
  531. mdata = self._merge_data(hfive,paths)
  532. day_stamp = self.day_stamp(time_stamp)
  533. result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  534. hfive.close()
  535. return result
  536. def mch_detail_count(self, paths, presecs,time_stamp):
  537. hfive = h5py.File(self._file_name, 'r')
  538. result = {}
  539. for path in paths:
  540. items = re.split(r'/', path)
  541. if len(items) != 6:
  542. continue
  543. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  544. key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
  545. mdata = hfive[path]
  546. day_stamp = self.day_stamp(time_stamp)
  547. result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  548. hfive.close()
  549. return result
  550. def mch_counts(self):
  551. import json
  552. r = None
  553. try:
  554. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  555. r = redis.Redis(connection_pool=pool)
  556. except Exception as ex:
  557. print(ex)
  558. while True:
  559. try:
  560. time_stamp = int(stime.time())
  561. presecs = [900, 1800, 3600, 7200, 86400]
  562. all_paths = self.paths(time_stamp)
  563. mchid_paths = self.merchant_rmobile_path(all_paths)
  564. gross = {}
  565. for mchid, paths in mchid_paths.items():
  566. counts = self.mch_count(paths, presecs, time_stamp)
  567. gross[mchid] = counts
  568. paths = self.rmobile_path(all_paths)
  569. detail = self.mch_detail_count(paths, presecs, time_stamp)
  570. result = {'gross': gross, 'detail': detail}
  571. if len(result) != 0:
  572. r.set(f"nc_merchant_refill_counts", json.dumps(result))
  573. r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
  574. print('push msg=', result)
  575. except Exception as ex:
  576. print(ex)
  577. finally:
  578. stime.sleep(2)
  579. mchDataCenter = MchDataCenter()