MchDataCenter.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. class MchDataCenter(object):
  14. latest_delta = 2
  15. pos_map = {
  16. 'commit': 0, 'success': 1, 'fail': 2
  17. }
  18. def __init__(self):
  19. self._mquit = False
  20. self._mRHost = ''
  21. self._mRPort = 6379
  22. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  23. def set_redis(self, rhost, rport):
  24. self._mRHost = rhost
  25. self._mRPort = rport
  26. def stop(self):
  27. self._mquit = True
  28. pass
  29. def prepare_data(self):
  30. while self._mquit == False:
  31. try:
  32. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  33. r = redis.Redis(connection_pool=pool)
  34. if path.exists(self._file_name):
  35. hfive = h5py.File(self._file_name, 'a')
  36. else:
  37. hfive = h5py.File(self._file_name, 'w')
  38. latest_time = int(stime.time()) - self.latest_delta
  39. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  40. self.read_redis(hfive, r, 'nc_user_monitor_success', 'success')
  41. self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail')
  42. hfive.close()
  43. self.del_redis(r, 'nc_user_monitor_commit',latest_time)
  44. self.del_redis(r, 'nc_user_monitor_success',latest_time)
  45. self.del_redis(r, 'nc_user_monitor_fail',latest_time)
  46. except Exception as ex:
  47. print(ex)
  48. finally:
  49. stime.sleep(0.1)
  50. def del_redis(self, redis, name,latest_time):
  51. for item in redis.hscan_iter(name):
  52. key = str(item[0], encoding="utf-8")
  53. items = re.split(r'-', key)
  54. fdel = True
  55. if len(items) == 5:
  56. (mchid, quality, card_type, amount, time) = items
  57. time = int(time)
  58. if latest_time <= time:
  59. fdel = False
  60. if fdel:
  61. redis.hdel(name, key)
  62. pass
  63. def read_redis(self, hfive, redis, name, prefix):
  64. for item in redis.hscan_iter(name):
  65. key = str(item[0], encoding="utf-8")
  66. val = str(item[1], encoding="utf-8")
  67. self.parase(hfive, key, val, prefix)
  68. def parase(self, hfive, text, val, prefix):
  69. items = re.split(r'-', text)
  70. if len(items) != 5:
  71. return False
  72. (mchid, quality, card_type, amount, time) = items
  73. pos = self.pos_map[f'{prefix}']
  74. time = int(time)
  75. today = self.day_stamp(time)
  76. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  77. if path not in hfive:
  78. dim = len(self.pos_map)
  79. hfive[path] = np.zeros((dim, 86400))
  80. diff = time - today
  81. if diff < 0:
  82. print(diff)
  83. hfive[path][pos, diff] = val
  84. print(path, pos, diff, val, hfive[path][pos, diff])
  85. pass
  86. def day_stamp(self, stamp):
  87. stamp = int(stamp)
  88. x = stime.gmtime(stamp + 8 * 3600)
  89. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  90. today = stamp - diff.total_seconds()
  91. return int(today)
  92. def _days(self, root):
  93. result = []
  94. try:
  95. for name, sub in root.items():
  96. if isinstance(sub, h5py.Group):
  97. result.append(name)
  98. except Exception as ex:
  99. print(ex)
  100. finally:
  101. return result
  102. def days(self):
  103. try:
  104. hfive = h5py.File(self._file_name, 'r')
  105. root = hfive.require_group('/')
  106. days = self._days(root)
  107. hfive.close()
  108. return days
  109. except Exception as ex:
  110. print(ex)
  111. return []
  112. def paths(self, time_stamp):
  113. try:
  114. day_stamp = self.day_stamp(time_stamp)
  115. hfive = h5py.File(self._file_name, 'r')
  116. group = hfive.require_group(f'/{day_stamp}')
  117. paths = self.dir(group)
  118. hfive.close()
  119. return paths
  120. except Exception as ex:
  121. print(ex)
  122. return []
  123. def dir(self, group):
  124. result = []
  125. for name, sub in group.items():
  126. if isinstance(sub, h5py.Group):
  127. result.extend(self.dir(sub))
  128. else:
  129. result.append(sub.name)
  130. return result
  131. def _all_none(self, **kwargs):
  132. for key, val in kwargs.items():
  133. if val is not None:
  134. return False
  135. return True
  136. def _merge_path(self,paths):
  137. result = {}
  138. for path in paths:
  139. items = re.split(r'/', path)
  140. if len(items) != 6:
  141. continue
  142. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  143. _mchid = int(_mchid)
  144. if _mchid not in result:
  145. result[_mchid] = []
  146. result[_mchid].append(path)
  147. return result
  148. def draw_plot(self, start_time, interval=300, **kwargs):
  149. logger = logging.getLogger('app')
  150. hfive = h5py.File(self._file_name, 'r')
  151. try:
  152. day_stamp = self.day_stamp(start_time)
  153. start_pos = start_time - day_stamp
  154. cur_day = self.day_stamp(stime.time())
  155. if day_stamp == cur_day:
  156. end_pos = int(stime.time()) - day_stamp
  157. else:
  158. end_pos = -1
  159. fig = Figure(figsize=(16, 8))
  160. ax = fig.subplots()
  161. dim = len(self.pos_map)
  162. predata = np.zeros((dim, 86400))
  163. x = np.arange(0, 86400, interval)
  164. sub_count = 0
  165. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  166. if self._all_none(**kwargs):
  167. paths = self._merge_path(paths)
  168. for mchid, data in self._read_dict_data(hfive, paths):
  169. predata = predata + data
  170. path = f'{mchid}'
  171. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  172. if ret:
  173. sub_count += 1
  174. pass
  175. else:
  176. for path, data in self.read_data(hfive, paths):
  177. data = np.array(data)
  178. predata = predata + data
  179. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  180. if ret:
  181. sub_count += 1
  182. if sub_count > 1:
  183. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  184. ax.legend()
  185. ax.grid()
  186. ax.set_title('success ratio')
  187. ax.set(xlabel='time', ylabel='ratio')
  188. fig.autofmt_xdate()
  189. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  190. buf = BytesIO()
  191. fig.savefig(buf, format="png")
  192. return buf
  193. except Exception as ex:
  194. print(ex)
  195. finally:
  196. hfive.close()
  197. def read_data(self, hfive, paths):
  198. for path in paths:
  199. yield path, hfive[path]
  200. def _read_dict_data(self, hfive, mchPaths):
  201. for mchid, paths in mchPaths.items():
  202. dim = len(self.pos_map)
  203. predata = np.zeros((dim, 86400))
  204. for path in paths:
  205. predata += hfive[path]
  206. yield mchid, predata
  207. def datasets(self, hfive, start_time, **kwargs):
  208. logger = logging.getLogger('app')
  209. day_stamp = self.day_stamp(start_time)
  210. sday = f'{day_stamp}'
  211. root = hfive.require_group('/')
  212. days = self._days(root)
  213. if sday not in days:
  214. return False
  215. group = hfive.require_group(sday)
  216. dsets = self.dir(group)
  217. mchid = quality = card_type = amount = None
  218. for key, val in kwargs.items():
  219. if val is None:
  220. continue
  221. if key == 'mchid':
  222. mchid = val
  223. elif key == 'quality':
  224. quality = f'{val}'
  225. elif key == 'card_type':
  226. card_type = f'{val}'
  227. elif key == 'amount':
  228. amount = f'{val}'
  229. else:
  230. continue
  231. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  232. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  233. filer_text = ''
  234. if mchid is not None:
  235. filer_text = mchid
  236. if quality is not None:
  237. filer_text = filer_text + f"-qua:{quality}"
  238. if card_type is not None:
  239. filer_text = filer_text + f"-type:{card_type}"
  240. if amount is not None:
  241. filer_text = filer_text + f"-amount:{amount}"
  242. paths = []
  243. for text in dsets:
  244. items = re.split(r'/', text)
  245. if len(items) != 6:
  246. return False
  247. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  248. if (mchid is not None) and (_mchid != mchid):
  249. continue
  250. if (quality is not None) and (_quality != quality):
  251. continue
  252. if (card_type is not None) and (_card_type != card_type):
  253. continue
  254. if (amount is not None) and (_amount != amount):
  255. continue
  256. paths.append(text)
  257. return filer_text, paths
  258. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  259. # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2
  260. logging.getLogger('app').debug("path=%s", path)
  261. all = data[1] + data[2]
  262. all = all.reshape((-1, interval))
  263. all = np.sum(all, axis=1)
  264. ySucc = data[1]
  265. ySucc = ySucc.reshape((-1, interval))
  266. ySucc = np.sum(ySucc, axis=1)
  267. if end_pos == -1:
  268. pos = np.where(x >= start_pos)
  269. x = x[pos]
  270. ySucc = ySucc[pos]
  271. all = all[pos]
  272. else:
  273. pos = np.where(start_pos <= x)
  274. x = x[pos]
  275. ySucc = ySucc[pos]
  276. all = all[pos]
  277. pos = np.where(x < end_pos)
  278. x = x[pos]
  279. ySucc = ySucc[pos]
  280. all = all[pos]
  281. succ_count = int(np.sum(ySucc))
  282. all_count = int(np.sum(all))
  283. if all_count < 1:
  284. return False
  285. pos = np.where(ySucc > all)
  286. ySucc[pos] = all[pos]
  287. ySucc = ySucc / (all + 0.00000001)
  288. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  289. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  290. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  291. return True
  292. def _label(self, path, count, all):
  293. ratio = 0.00
  294. if all > 0:
  295. ratio = round(count * 100 / all, 2)
  296. items = re.split(r'/', path)
  297. if len(items) == 6:
  298. (_, _sday, _chname, _quality, _card_type, _amount) = items
  299. card_type = ''
  300. if _card_type == '1':
  301. card_type = 'SY'
  302. elif _card_type == '2':
  303. card_type = 'SH'
  304. elif _card_type == '4':
  305. card_type = 'YD'
  306. elif _card_type == '5':
  307. card_type = 'LT'
  308. elif _card_type == '6':
  309. card_type = 'DX'
  310. elif _card_type == '7':
  311. card_type = 'TH'
  312. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  313. else:
  314. if path == '' or path is None:
  315. path = 'average'
  316. return f"{path}:{count}/{all} = {ratio}%"
  317. pass
  318. #统计机构当前时间之前序列时间成功率
  319. def _merge_mobile_path(self,paths):
  320. result = {}
  321. for path in paths:
  322. items = re.split(r'/', path)
  323. if len(items) != 6:
  324. continue
  325. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  326. _card_type = int(_card_type)
  327. if _card_type not in [4, 5, 6]:
  328. continue
  329. _mchid = int(_mchid)
  330. if _mchid not in result:
  331. result[_mchid] = []
  332. result[_mchid].append(path)
  333. return result
  334. def _merge_data(self, hfive, paths):
  335. dim = len(self.pos_map)
  336. predata = np.zeros((dim, 86400))
  337. for path in paths:
  338. predata += hfive[path]
  339. return predata
  340. def _calc_mratio(self,data,startes,end):
  341. succ = data[1]
  342. fail = data[2]
  343. x = np.arange(0, 86400, 1)
  344. result = {}
  345. for start in startes:
  346. if end - start < 0:
  347. start_pos = 0
  348. else:
  349. start_pos = end - start
  350. pos = np.where(x >= start_pos)
  351. t = x[pos]
  352. _fail = fail[pos]
  353. _succ = succ[pos]
  354. pos = np.where(t < end)
  355. _fail = _fail[pos]
  356. _succ = _succ[pos]
  357. succs = int(np.sum(_succ))
  358. fails = int(np.sum(_fail))
  359. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  360. result[start] = [succs, fails, ratio]
  361. return result
  362. def mratios(self, time_stamp,presecs):
  363. paths = self.paths(time_stamp)
  364. mchid_paths = self._merge_mobile_path(paths)
  365. day_stamp = self.day_stamp(time_stamp)
  366. mratios = {}
  367. hfive = h5py.File(self._file_name, 'r')
  368. for mchid, paths in mchid_paths.items():
  369. mdata = self._merge_data(hfive,paths)
  370. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  371. mratios[mchid] = result
  372. hfive.close()
  373. return mratios
  374. def calc_ratio(self):
  375. import json
  376. r = None
  377. try:
  378. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  379. r = redis.Redis(connection_pool=pool)
  380. except Exception as ex:
  381. print(ex)
  382. while True:
  383. try:
  384. time_sec = int(stime.time())
  385. presecs = [900, 1800, 3600, 7200, 86400]
  386. mratios = self.mratios(time_sec, presecs)
  387. if len(mratios) != 0:
  388. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  389. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  390. print('push msg=',mratios)
  391. except Exception as ex:
  392. print(ex)
  393. finally:
  394. stime.sleep(2)
  395. #以下为按照卡类型计算成功率代码
  396. def _merge_mobile_type_path(self,paths,card_type=None):
  397. result = {}
  398. for path in paths:
  399. items = re.split(r'/', path)
  400. if len(items) != 6:
  401. continue
  402. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  403. _card_type = int(_card_type)
  404. if _card_type not in [4, 5, 6]:
  405. continue
  406. if card_type is not None and _card_type != card_type:
  407. continue
  408. _mchid = int(_mchid)
  409. if _mchid not in result:
  410. result[_mchid] = []
  411. result[_mchid].append(path)
  412. return result
  413. def mratio_types(self, time_stamp,presecs):
  414. paths = self.paths(time_stamp)
  415. mchid_paths = self._merge_mobile_type_path(paths)
  416. day_stamp = self.day_stamp(time_stamp)
  417. card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
  418. mratios = {}
  419. hfive = h5py.File(self._file_name, 'r')
  420. for mchid, paths in mchid_paths.items():
  421. mch_ratios = {}
  422. for card_type, name in card_types.items():
  423. print('card_type=', card_type, 'name=', name)
  424. if card_type is None:
  425. cur_paths = paths
  426. else:
  427. cur_paths = self._merge_mobile_type_path(paths, card_type)
  428. if len(cur_paths) == 0:
  429. continue
  430. cur_paths = cur_paths[mchid]
  431. mdata = self._merge_data(hfive,cur_paths)
  432. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  433. mch_ratios[name] = result
  434. mratios[mchid] = mch_ratios
  435. hfive.close()
  436. return mratios
  437. def calc_ratios(self):
  438. import json
  439. r = None
  440. try:
  441. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  442. r = redis.Redis(connection_pool=pool)
  443. except Exception as ex:
  444. print(ex)
  445. while True:
  446. try:
  447. time_sec = int(stime.time())
  448. presecs = [900, 1800, 3600, 7200, 86400]
  449. mratios = self.mratio_types(time_sec, presecs)
  450. if len(mratios) != 0:
  451. r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
  452. # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  453. print('push msg=', mratios)
  454. except Exception as ex:
  455. print(ex)
  456. finally:
  457. stime.sleep(2)
  458. ####################################################################################################################
  459. ####
  460. def _calc_mcount(self,data,startes,end):
  461. succ = data[1]
  462. fail = data[2]
  463. x = np.arange(0, 86400, 1)
  464. result = {}
  465. for start in startes:
  466. if end - start < 0:
  467. start_pos = 0
  468. else:
  469. start_pos = end - start
  470. pos = np.where(x >= start_pos)
  471. t = x[pos]
  472. _succ = succ[pos]
  473. _fail = fail[pos]
  474. pos = np.where(t < end)
  475. _succ = _succ[pos]
  476. _fail = _fail[pos]
  477. succs = int(np.sum(_succ))
  478. fails = int(np.sum(_fail))
  479. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  480. result[start] = [succs, fails, ratio]
  481. return result
  482. def merchant_rmobile_path(self, paths):
  483. result = {}
  484. for path in paths:
  485. items = re.split(r'/', path)
  486. if len(items) != 6:
  487. continue
  488. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  489. _card_type = int(_card_type)
  490. if _card_type not in [4, 5, 6]:
  491. continue
  492. _mchid = int(_mchid)
  493. if _mchid not in result:
  494. result[_mchid] = []
  495. result[_mchid].append(path)
  496. return result
  497. def rmobile_path(self, paths):
  498. result = []
  499. for path in paths:
  500. items = re.split(r'/', path)
  501. if len(items) != 6:
  502. continue
  503. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  504. _card_type = int(_card_type)
  505. if _card_type not in [4, 5, 6]:
  506. continue
  507. result.append(path)
  508. return result
  509. def mch_count(self, paths, presecs,time_stamp):
  510. hfive = h5py.File(self._file_name, 'r')
  511. mdata = self._merge_data(hfive,paths)
  512. day_stamp = self.day_stamp(time_stamp)
  513. result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  514. hfive.close()
  515. return result
  516. def mch_detail_count(self, paths, presecs,time_stamp):
  517. hfive = h5py.File(self._file_name, 'r')
  518. result = {}
  519. for path in paths:
  520. items = re.split(r'/', path)
  521. if len(items) != 6:
  522. continue
  523. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  524. key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
  525. mdata = hfive[path]
  526. day_stamp = self.day_stamp(time_stamp)
  527. result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  528. hfive.close()
  529. return result
  530. def mch_counts(self):
  531. import json
  532. r = None
  533. try:
  534. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  535. r = redis.Redis(connection_pool=pool)
  536. except Exception as ex:
  537. print(ex)
  538. while True:
  539. try:
  540. time_stamp = int(stime.time())
  541. presecs = [900, 1800, 3600, 7200, 86400]
  542. all_paths = self.paths(time_stamp)
  543. mchid_paths = self.merchant_rmobile_path(all_paths)
  544. gross = {}
  545. for mchid, paths in mchid_paths.items():
  546. counts = self.mch_count(paths, presecs, time_stamp)
  547. gross[mchid] = counts
  548. paths = self.rmobile_path(all_paths)
  549. detail = self.mch_detail_count(paths, presecs, time_stamp)
  550. result = {'gross': gross, 'detail': detail}
  551. if len(result) != 0:
  552. r.set(f"nc_merchant_refill_counts", json.dumps(result))
  553. r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
  554. print('push msg=', result)
  555. except Exception as ex:
  556. print(ex)
  557. finally:
  558. stime.sleep(2)
  559. mchDataCenter = MchDataCenter()