MchDataCenter.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. from datetime import datetime
  9. import numpy as np
  10. from matplotlib.figure import Figure
  11. from matplotlib import ticker
  12. from io import BytesIO
  13. import logging
  14. class MchDataCenter(object):
  15. latest_delta = 10
  16. pos_map = {
  17. 'commit': 0, 'success': 1, 'fail': 2
  18. }
  19. def __init__(self):
  20. self._mquit = False
  21. self._mRHost = ''
  22. self._mRPort = 6379
  23. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  24. def set_redis(self, rhost, rport):
  25. self._mRHost = rhost
  26. self._mRPort = rport
  27. def stop(self):
  28. self._mquit = True
  29. pass
  30. def prepare_data(self):
  31. while self._mquit == False:
  32. try:
  33. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  34. r = redis.Redis(connection_pool=pool)
  35. if path.exists(self._file_name):
  36. hfive = h5py.File(self._file_name, 'a')
  37. else:
  38. hfive = h5py.File(self._file_name, 'w')
  39. latest_time = int(stime.time()) - self.latest_delta
  40. lt = stime.localtime(latest_time)
  41. now_str = stime.strftime('%Y-%m-%d %H:%M:%S', lt)
  42. print('start read',now_str)
  43. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  44. self.read_redis(hfive, r, 'nc_user_monitor_success', 'success')
  45. self.read_redis(hfive, r, 'nc_user_monitor_fail', 'fail')
  46. hfive.close()
  47. self.del_redis(r, 'nc_user_monitor_commit',latest_time)
  48. self.del_redis(r, 'nc_user_monitor_success',latest_time)
  49. self.del_redis(r, 'nc_user_monitor_fail',latest_time)
  50. except Exception as ex:
  51. print(ex)
  52. finally:
  53. stime.sleep(1)
  54. def del_redis(self, redis, name, latest_time):
  55. for item in redis.hscan_iter(name):
  56. key = str(item[0], encoding="utf-8")
  57. items = re.split(r'-', key)
  58. fdel = True
  59. if len(items) == 5:
  60. (mchid, quality, card_type, amount, time) = items
  61. time = int(time)
  62. if latest_time <= time:
  63. fdel = False
  64. if fdel:
  65. redis.hdel(name, key)
  66. pass
  67. def read_redis(self, hfive, redis, name, prefix):
  68. for item in redis.hscan_iter(name):
  69. key = str(item[0], encoding="utf-8")
  70. val = str(item[1], encoding="utf-8")
  71. self.parase(hfive, key, val, prefix)
  72. def parase(self, hfive, text, val, prefix):
  73. items = re.split(r'-', text)
  74. if len(items) != 5:
  75. return False
  76. (mchid, quality, card_type, amount, time) = items
  77. pos = self.pos_map[f'{prefix}']
  78. time = int(time)
  79. today = self.day_stamp(time)
  80. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  81. if path not in hfive:
  82. dim = len(self.pos_map)
  83. hfive[path] = np.zeros((dim, 86400))
  84. diff = time - today
  85. if diff < 0:
  86. print(diff)
  87. hfive[path][pos, diff] = val
  88. print(path, pos, prefix, diff, time, val, hfive[path][pos, diff])
  89. pass
  90. def day_stamp(self, stamp):
  91. stamp = int(stamp)
  92. x = stime.gmtime(stamp + 8 * 3600)
  93. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  94. today = stamp - diff.total_seconds()
  95. return int(today)
  96. def _days(self, root):
  97. result = []
  98. try:
  99. for name, sub in root.items():
  100. if isinstance(sub, h5py.Group):
  101. result.append(name)
  102. except Exception as ex:
  103. print(ex)
  104. finally:
  105. return result
  106. def days(self):
  107. try:
  108. hfive = h5py.File(self._file_name, 'r')
  109. root = hfive.require_group('/')
  110. days = self._days(root)
  111. hfive.close()
  112. return days
  113. except Exception as ex:
  114. print(ex)
  115. return []
  116. def paths(self, time_stamp):
  117. try:
  118. day_stamp = self.day_stamp(time_stamp)
  119. hfive = h5py.File(self._file_name, 'r')
  120. group = hfive.require_group(f'/{day_stamp}')
  121. paths = self.dir(group)
  122. hfive.close()
  123. return paths
  124. except Exception as ex:
  125. print(ex)
  126. return []
  127. def dir(self, group):
  128. result = []
  129. for name, sub in group.items():
  130. if isinstance(sub, h5py.Group):
  131. result.extend(self.dir(sub))
  132. else:
  133. result.append(sub.name)
  134. return result
  135. def _all_none(self, **kwargs):
  136. for key, val in kwargs.items():
  137. if val is not None:
  138. return False
  139. return True
  140. def _merge_path(self,paths):
  141. result = {}
  142. for path in paths:
  143. items = re.split(r'/', path)
  144. if len(items) != 6:
  145. continue
  146. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  147. _mchid = int(_mchid)
  148. if _mchid not in result:
  149. result[_mchid] = []
  150. result[_mchid].append(path)
  151. return result
  152. def draw_plot(self, start_time, interval=300, **kwargs):
  153. logger = logging.getLogger('app')
  154. hfive = h5py.File(self._file_name, 'r')
  155. try:
  156. day_stamp = self.day_stamp(start_time)
  157. start_pos = start_time - day_stamp
  158. cur_day = self.day_stamp(stime.time())
  159. if day_stamp == cur_day:
  160. end_pos = int(stime.time()) - day_stamp
  161. else:
  162. end_pos = -1
  163. fig = Figure(figsize=(16, 8))
  164. ax = fig.subplots()
  165. dim = len(self.pos_map)
  166. predata = np.zeros((dim, 86400))
  167. x = np.arange(0, 86400, interval)
  168. sub_count = 0
  169. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  170. if self._all_none(**kwargs):
  171. paths = self._merge_path(paths)
  172. for mchid, data in self._read_dict_data(hfive, paths):
  173. predata = predata + data
  174. path = f'{mchid}'
  175. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  176. if ret:
  177. sub_count += 1
  178. pass
  179. else:
  180. for path, data in self.read_data(hfive, paths):
  181. data = np.array(data)
  182. predata = predata + data
  183. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  184. if ret:
  185. sub_count += 1
  186. if sub_count > 1:
  187. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  188. ax.legend()
  189. ax.grid()
  190. ax.set_title('success ratio')
  191. ax.set(xlabel='time', ylabel='ratio')
  192. fig.autofmt_xdate()
  193. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  194. buf = BytesIO()
  195. fig.savefig(buf, format="png")
  196. return buf
  197. except Exception as ex:
  198. print(ex)
  199. finally:
  200. hfive.close()
  201. def read_data(self, hfive, paths):
  202. for path in paths:
  203. yield path, hfive[path]
  204. def _read_dict_data(self, hfive, mchPaths):
  205. for mchid, paths in mchPaths.items():
  206. dim = len(self.pos_map)
  207. predata = np.zeros((dim, 86400))
  208. for path in paths:
  209. predata += hfive[path]
  210. yield mchid, predata
  211. def datasets(self, hfive, start_time, **kwargs):
  212. logger = logging.getLogger('app')
  213. day_stamp = self.day_stamp(start_time)
  214. sday = f'{day_stamp}'
  215. root = hfive.require_group('/')
  216. days = self._days(root)
  217. if sday not in days:
  218. return False
  219. group = hfive.require_group(sday)
  220. dsets = self.dir(group)
  221. mchid = quality = card_type = amount = None
  222. for key, val in kwargs.items():
  223. if val is None:
  224. continue
  225. if key == 'mchid':
  226. mchid = val
  227. elif key == 'quality':
  228. quality = f'{val}'
  229. elif key == 'card_type':
  230. card_type = f'{val}'
  231. elif key == 'amount':
  232. amount = f'{val}'
  233. else:
  234. continue
  235. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  236. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  237. filer_text = ''
  238. if mchid is not None:
  239. filer_text = mchid
  240. if quality is not None:
  241. filer_text = filer_text + f"-qua:{quality}"
  242. if card_type is not None:
  243. filer_text = filer_text + f"-type:{card_type}"
  244. if amount is not None:
  245. filer_text = filer_text + f"-amount:{amount}"
  246. paths = []
  247. for text in dsets:
  248. items = re.split(r'/', text)
  249. if len(items) != 6:
  250. return False
  251. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  252. if (mchid is not None) and (_mchid != mchid):
  253. continue
  254. if (quality is not None) and (_quality != quality):
  255. continue
  256. if (card_type is not None) and (_card_type != card_type):
  257. continue
  258. if (amount is not None) and (_amount != amount):
  259. continue
  260. paths.append(text)
  261. return filer_text, paths
  262. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  263. # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2
  264. logging.getLogger('app').debug("path=%s", path)
  265. all = data[1] + data[2]
  266. all = all.reshape((-1, interval))
  267. all = np.sum(all, axis=1)
  268. ySucc = data[1]
  269. ySucc = ySucc.reshape((-1, interval))
  270. ySucc = np.sum(ySucc, axis=1)
  271. if end_pos == -1:
  272. pos = np.where(x >= start_pos)
  273. x = x[pos]
  274. ySucc = ySucc[pos]
  275. all = all[pos]
  276. else:
  277. pos = np.where(start_pos <= x)
  278. x = x[pos]
  279. ySucc = ySucc[pos]
  280. all = all[pos]
  281. pos = np.where(x < end_pos)
  282. x = x[pos]
  283. ySucc = ySucc[pos]
  284. all = all[pos]
  285. succ_count = int(np.sum(ySucc))
  286. all_count = int(np.sum(all))
  287. if all_count < 1:
  288. return False
  289. pos = np.where(ySucc > all)
  290. ySucc[pos] = all[pos]
  291. ySucc = ySucc / (all + 0.00000001)
  292. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  293. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  294. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  295. return True
  296. def _label(self, path, count, all):
  297. ratio = 0.00
  298. if all > 0:
  299. ratio = round(count * 100 / all, 2)
  300. items = re.split(r'/', path)
  301. if len(items) == 6:
  302. (_, _sday, _chname, _quality, _card_type, _amount) = items
  303. card_type = ''
  304. if _card_type == '1':
  305. card_type = 'SY'
  306. elif _card_type == '2':
  307. card_type = 'SH'
  308. elif _card_type == '4':
  309. card_type = 'YD'
  310. elif _card_type == '5':
  311. card_type = 'LT'
  312. elif _card_type == '6':
  313. card_type = 'DX'
  314. elif _card_type == '7':
  315. card_type = 'TH'
  316. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  317. else:
  318. if path == '' or path is None:
  319. path = 'average'
  320. return f"{path}:{count}/{all} = {ratio}%"
  321. pass
  322. #统计机构当前时间之前序列时间成功率
  323. def _merge_mobile_path(self,paths):
  324. result = {}
  325. for path in paths:
  326. items = re.split(r'/', path)
  327. if len(items) != 6:
  328. continue
  329. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  330. _card_type = int(_card_type)
  331. if _card_type not in [4, 5, 6]:
  332. continue
  333. _mchid = int(_mchid)
  334. if _mchid not in result:
  335. result[_mchid] = []
  336. result[_mchid].append(path)
  337. return result
  338. def _merge_data(self, hfive, paths):
  339. dim = len(self.pos_map)
  340. predata = np.zeros((dim, 86400))
  341. for path in paths:
  342. predata += hfive[path]
  343. return predata
  344. def _calc_mratio(self,data,startes,end):
  345. succ = data[1]
  346. fail = data[2]
  347. x = np.arange(0, 86400, 1)
  348. result = {}
  349. for start in startes:
  350. if end - start < 0:
  351. start_pos = 0
  352. else:
  353. start_pos = end - start
  354. pos = np.where(x >= start_pos)
  355. t = x[pos]
  356. _fail = fail[pos]
  357. _succ = succ[pos]
  358. pos = np.where(t < end)
  359. _fail = _fail[pos]
  360. _succ = _succ[pos]
  361. succs = int(np.sum(_succ))
  362. fails = int(np.sum(_fail))
  363. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  364. result[start] = [succs, fails, ratio]
  365. return result
  366. def mratios(self, time_stamp,presecs):
  367. paths = self.paths(time_stamp)
  368. mchid_paths = self._merge_mobile_path(paths)
  369. day_stamp = self.day_stamp(time_stamp)
  370. mratios = {}
  371. hfive = h5py.File(self._file_name, 'r')
  372. for mchid, paths in mchid_paths.items():
  373. mdata = self._merge_data(hfive,paths)
  374. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  375. mratios[mchid] = result
  376. hfive.close()
  377. return mratios
  378. def calc_ratio(self):
  379. import json
  380. r = None
  381. try:
  382. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  383. r = redis.Redis(connection_pool=pool)
  384. except Exception as ex:
  385. print(ex)
  386. while True:
  387. try:
  388. time_sec = int(stime.time())
  389. presecs = [900, 1800, 3600, 7200, 86400]
  390. mratios = self.mratios(time_sec, presecs)
  391. if len(mratios) != 0:
  392. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  393. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  394. print('push msg=',mratios)
  395. except Exception as ex:
  396. print(ex)
  397. finally:
  398. stime.sleep(2)
  399. #以下为按照卡类型计算成功率代码
  400. def _merge_mobile_type_path(self,paths,card_type=None):
  401. result = {}
  402. for path in paths:
  403. items = re.split(r'/', path)
  404. if len(items) != 6:
  405. continue
  406. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  407. _card_type = int(_card_type)
  408. if _card_type not in [4, 5, 6]:
  409. continue
  410. if card_type is not None and _card_type != card_type:
  411. continue
  412. _mchid = int(_mchid)
  413. if _mchid not in result:
  414. result[_mchid] = []
  415. result[_mchid].append(path)
  416. return result
  417. def mratio_types(self, time_stamp,presecs):
  418. paths = self.paths(time_stamp)
  419. mchid_paths = self._merge_mobile_type_path(paths)
  420. day_stamp = self.day_stamp(time_stamp)
  421. card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
  422. mratios = {}
  423. hfive = h5py.File(self._file_name, 'r')
  424. for mchid, paths in mchid_paths.items():
  425. mch_ratios = {}
  426. for card_type, name in card_types.items():
  427. print('card_type=', card_type, 'name=', name)
  428. if card_type is None:
  429. cur_paths = paths
  430. else:
  431. cur_paths = self._merge_mobile_type_path(paths, card_type)
  432. if len(cur_paths) == 0:
  433. continue
  434. cur_paths = cur_paths[mchid]
  435. mdata = self._merge_data(hfive,cur_paths)
  436. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  437. mch_ratios[name] = result
  438. mratios[mchid] = mch_ratios
  439. hfive.close()
  440. return mratios
  441. def calc_ratios(self):
  442. import json
  443. r = None
  444. try:
  445. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  446. r = redis.Redis(connection_pool=pool)
  447. except Exception as ex:
  448. print(ex)
  449. while True:
  450. try:
  451. time_sec = int(stime.time())
  452. presecs = [900, 1800, 3600, 7200, 86400]
  453. mratios = self.mratio_types(time_sec, presecs)
  454. if len(mratios) != 0:
  455. r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
  456. # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  457. print('push msg=', mratios)
  458. except Exception as ex:
  459. print(ex)
  460. finally:
  461. stime.sleep(2)
  462. ####################################################################################################################
  463. ####
  464. def _calc_mcount(self,data,startes,end):
  465. succ = data[1]
  466. fail = data[2]
  467. x = np.arange(0, 86400, 1)
  468. result = {}
  469. for start in startes:
  470. if end - start < 0:
  471. start_pos = 0
  472. else:
  473. start_pos = end - start
  474. pos = np.where(x >= start_pos)
  475. t = x[pos]
  476. _succ = succ[pos]
  477. _fail = fail[pos]
  478. pos = np.where(t < end)
  479. _succ = _succ[pos]
  480. _fail = _fail[pos]
  481. succs = int(np.sum(_succ))
  482. fails = int(np.sum(_fail))
  483. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  484. result[start] = [succs, fails, ratio]
  485. return result
  486. def merchant_rmobile_path(self, paths):
  487. result = {}
  488. for path in paths:
  489. items = re.split(r'/', path)
  490. if len(items) != 6:
  491. continue
  492. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  493. _card_type = int(_card_type)
  494. if _card_type not in [4, 5, 6]:
  495. continue
  496. _mchid = int(_mchid)
  497. if _mchid not in result:
  498. result[_mchid] = []
  499. result[_mchid].append(path)
  500. return result
  501. def rmobile_path(self, paths):
  502. result = []
  503. for path in paths:
  504. items = re.split(r'/', path)
  505. if len(items) != 6:
  506. continue
  507. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  508. _card_type = int(_card_type)
  509. if _card_type not in [4, 5, 6]:
  510. continue
  511. result.append(path)
  512. return result
  513. def mch_count(self, paths, presecs,time_stamp):
  514. hfive = h5py.File(self._file_name, 'r')
  515. mdata = self._merge_data(hfive,paths)
  516. day_stamp = self.day_stamp(time_stamp)
  517. result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  518. hfive.close()
  519. return result
  520. def mch_detail_count(self, paths, presecs,time_stamp):
  521. hfive = h5py.File(self._file_name, 'r')
  522. result = {}
  523. for path in paths:
  524. items = re.split(r'/', path)
  525. if len(items) != 6:
  526. continue
  527. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  528. key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
  529. mdata = hfive[path]
  530. day_stamp = self.day_stamp(time_stamp)
  531. result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  532. hfive.close()
  533. return result
  534. def mch_counts(self):
  535. import json
  536. r = None
  537. try:
  538. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  539. r = redis.Redis(connection_pool=pool)
  540. except Exception as ex:
  541. print(ex)
  542. while True:
  543. try:
  544. time_stamp = int(stime.time())
  545. presecs = [900, 1800, 3600, 7200, 86400]
  546. all_paths = self.paths(time_stamp)
  547. mchid_paths = self.merchant_rmobile_path(all_paths)
  548. gross = {}
  549. for mchid, paths in mchid_paths.items():
  550. counts = self.mch_count(paths, presecs, time_stamp)
  551. gross[mchid] = counts
  552. paths = self.rmobile_path(all_paths)
  553. detail = self.mch_detail_count(paths, presecs, time_stamp)
  554. result = {'gross': gross, 'detail': detail}
  555. if len(result) != 0:
  556. r.set(f"nc_merchant_refill_counts", json.dumps(result))
  557. r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
  558. print('push msg=', result)
  559. except Exception as ex:
  560. print(ex)
  561. finally:
  562. stime.sleep(2)
  563. mchDataCenter = MchDataCenter()