QueueListener.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. import json
  2. import os
  3. import time as stime
  4. import redis
  5. import h5py
  6. from os import path
  7. import re
  8. from datetime import timedelta
  9. from datetime import datetime
  10. import numpy as np
  11. from matplotlib.figure import Figure
  12. from matplotlib import ticker
  13. from io import BytesIO
  14. import logging
  15. from refill import ProfitHandler
  16. class QueueListener(object):
  17. queue_name = 'REFILL_MONITOR_QUEUE'
  18. latest_delta = 10
  19. pos_map = {
  20. 'commit': 0, 'success': 1, 'fail': 2
  21. }
  22. def __init__(self):
  23. self._mquit = False
  24. self._mRHost = ''
  25. self._mRPort = 6379
  26. self._file_names = {
  27. 'profit': {
  28. 'name': '/var/www/html/data/stdata/profit.hdf5',
  29. 'handler': None
  30. }
  31. }
  32. def set_redis(self, rhost, rport):
  33. self._mRHost = rhost
  34. self._mRPort = rport
  35. def stop(self):
  36. self._mquit = True
  37. pass
  38. def prepare_data(self):
  39. def open(file):
  40. if path.exists(file):
  41. hfive = h5py.File(file, 'a')
  42. else:
  43. hfive = h5py.File(file, 'w')
  44. return hfive
  45. while self._mquit == False:
  46. try:
  47. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  48. r = redis.Redis(connection_pool=pool)
  49. for _key,val in self._file_names.items():
  50. hfive = open(val['name'])
  51. if _key == 'profit':
  52. val['handler'] = ProfitHandler(hfive)
  53. self.read(self.queue_name,r)
  54. except Exception as ex:
  55. print(ex)
  56. finally:
  57. stime.sleep(1)
  58. def read(self,queue,redis):
  59. while self._mquit == False:
  60. item = redis.brpop(queue, 1)
  61. if item is None:
  62. continue
  63. else:
  64. val = json.loads(item[1])
  65. self._parse(val)
  66. def _parse(self,val):
  67. method = val['method']
  68. params = val['params']
  69. def parase(self, hfive, text, val, prefix):
  70. items = re.split(r'-', text)
  71. if len(items) != 5:
  72. return False
  73. (mchid, quality, card_type, amount, time) = items
  74. pos = self.pos_map[f'{prefix}']
  75. time = int(time)
  76. today = self.day_stamp(time)
  77. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  78. if path not in hfive:
  79. dim = len(self.pos_map)
  80. hfive[path] = np.zeros((dim, 86400))
  81. diff = time - today
  82. if diff < 0:
  83. print(diff)
  84. hfive[path][pos, diff] = val
  85. print(path, pos, prefix, diff, time, val, hfive[path][pos, diff])
  86. pass
  87. def day_stamp(self, stamp):
  88. stamp = int(stamp)
  89. x = stime.gmtime(stamp + 8 * 3600)
  90. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  91. today = stamp - diff.total_seconds()
  92. return int(today)
  93. def _days(self, root):
  94. result = []
  95. try:
  96. for name, sub in root.items():
  97. if isinstance(sub, h5py.Group):
  98. result.append(name)
  99. except Exception as ex:
  100. print(ex)
  101. finally:
  102. return result
  103. def days(self):
  104. try:
  105. hfive = h5py.File(self._file_name, 'r')
  106. root = hfive.require_group('/')
  107. days = self._days(root)
  108. hfive.close()
  109. return days
  110. except Exception as ex:
  111. print(ex)
  112. return []
  113. def paths(self, time_stamp):
  114. try:
  115. day_stamp = self.day_stamp(time_stamp)
  116. hfive = h5py.File(self._file_name, 'r')
  117. group = hfive.require_group(f'/{day_stamp}')
  118. paths = self.dir(group)
  119. hfive.close()
  120. return paths
  121. except Exception as ex:
  122. print(ex)
  123. return []
  124. def dir(self, group):
  125. result = []
  126. for name, sub in group.items():
  127. if isinstance(sub, h5py.Group):
  128. result.extend(self.dir(sub))
  129. else:
  130. result.append(sub.name)
  131. return result
  132. def _all_none(self, **kwargs):
  133. for key, val in kwargs.items():
  134. if val is not None:
  135. return False
  136. return True
  137. def _merge_path(self,paths):
  138. result = {}
  139. for path in paths:
  140. items = re.split(r'/', path)
  141. if len(items) != 6:
  142. continue
  143. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  144. _mchid = int(_mchid)
  145. if _mchid not in result:
  146. result[_mchid] = []
  147. result[_mchid].append(path)
  148. return result
  149. def draw_plot(self, start_time, interval=300, **kwargs):
  150. logger = logging.getLogger('app')
  151. hfive = h5py.File(self._file_name, 'r')
  152. try:
  153. day_stamp = self.day_stamp(start_time)
  154. start_pos = start_time - day_stamp
  155. cur_day = self.day_stamp(stime.time())
  156. if day_stamp == cur_day:
  157. end_pos = int(stime.time()) - day_stamp
  158. else:
  159. end_pos = -1
  160. fig = Figure(figsize=(16, 8))
  161. ax = fig.subplots()
  162. dim = len(self.pos_map)
  163. predata = np.zeros((dim, 86400))
  164. x = np.arange(0, 86400, interval)
  165. sub_count = 0
  166. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  167. if self._all_none(**kwargs):
  168. paths = self._merge_path(paths)
  169. for mchid, data in self._read_dict_data(hfive, paths):
  170. predata = predata + data
  171. path = f'{mchid}'
  172. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  173. if ret:
  174. sub_count += 1
  175. pass
  176. else:
  177. for path, data in self.read_data(hfive, paths):
  178. data = np.array(data)
  179. predata = predata + data
  180. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  181. if ret:
  182. sub_count += 1
  183. if sub_count > 1:
  184. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  185. ax.legend()
  186. ax.grid()
  187. ax.set_title('success ratio')
  188. ax.set(xlabel='time', ylabel='ratio')
  189. fig.autofmt_xdate()
  190. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  191. buf = BytesIO()
  192. fig.savefig(buf, format="png")
  193. return buf
  194. except Exception as ex:
  195. print(ex)
  196. finally:
  197. hfive.close()
  198. def read_data(self, hfive, paths):
  199. for path in paths:
  200. yield path, hfive[path]
  201. def _read_dict_data(self, hfive, mchPaths):
  202. for mchid, paths in mchPaths.items():
  203. dim = len(self.pos_map)
  204. predata = np.zeros((dim, 86400))
  205. for path in paths:
  206. predata += hfive[path]
  207. yield mchid, predata
  208. def datasets(self, hfive, start_time, **kwargs):
  209. logger = logging.getLogger('app')
  210. day_stamp = self.day_stamp(start_time)
  211. sday = f'{day_stamp}'
  212. root = hfive.require_group('/')
  213. days = self._days(root)
  214. if sday not in days:
  215. return False
  216. group = hfive.require_group(sday)
  217. dsets = self.dir(group)
  218. mchid = quality = card_type = amount = None
  219. for key, val in kwargs.items():
  220. if val is None:
  221. continue
  222. if key == 'mchid':
  223. mchid = val
  224. elif key == 'quality':
  225. quality = f'{val}'
  226. elif key == 'card_type':
  227. card_type = f'{val}'
  228. elif key == 'amount':
  229. amount = f'{val}'
  230. else:
  231. continue
  232. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  233. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  234. filer_text = ''
  235. if mchid is not None:
  236. filer_text = mchid
  237. if quality is not None:
  238. filer_text = filer_text + f"-qua:{quality}"
  239. if card_type is not None:
  240. filer_text = filer_text + f"-type:{card_type}"
  241. if amount is not None:
  242. filer_text = filer_text + f"-amount:{amount}"
  243. paths = []
  244. for text in dsets:
  245. items = re.split(r'/', text)
  246. if len(items) != 6:
  247. return False
  248. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  249. if (mchid is not None) and (_mchid != mchid):
  250. continue
  251. if (quality is not None) and (_quality != quality):
  252. continue
  253. if (card_type is not None) and (_card_type != card_type):
  254. continue
  255. if (amount is not None) and (_amount != amount):
  256. continue
  257. paths.append(text)
  258. return filer_text, paths
  259. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  260. # 'commit': 0, 'succ': 1, 'fail': 2
  261. logging.getLogger('app').debug("path=%s", path)
  262. all = data[1] + data[2]
  263. all = all.reshape((-1, interval))
  264. all = np.sum(all, axis=1)
  265. commit = data[0]
  266. commit = commit.reshape((-1, interval))
  267. commit = np.sum(commit, axis=1)
  268. ySucc = data[1]
  269. ySucc = ySucc.reshape((-1, interval))
  270. ySucc = np.sum(ySucc, axis=1)
  271. yFail = data[2]
  272. yFail = yFail.reshape((-1, interval))
  273. yFail = np.sum(yFail, axis=1)
  274. if end_pos == -1:
  275. pos = np.where(x >= start_pos)
  276. x = x[pos]
  277. ySucc = ySucc[pos]
  278. all = all[pos]
  279. commit = commit[pos]
  280. yFail = yFail[pos]
  281. else:
  282. pos = np.where(start_pos <= x)
  283. x = x[pos]
  284. ySucc = ySucc[pos]
  285. all = all[pos]
  286. commit = commit[pos]
  287. yFail = yFail[pos]
  288. pos = np.where(x < end_pos)
  289. x = x[pos]
  290. ySucc = ySucc[pos]
  291. all = all[pos]
  292. commit = commit[pos]
  293. yFail = yFail[pos]
  294. succ_count = int(np.sum(ySucc))
  295. all_count = int(np.sum(all))
  296. commit_count = int(np.sum(commit))
  297. fail_count = int(np.sum(yFail))
  298. if all_count < 1:
  299. return False
  300. pos = np.where(ySucc > all)
  301. ySucc[pos] = all[pos]
  302. ySucc = ySucc / (all + 0.00000001)
  303. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  304. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  305. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count,commit_count,fail_count))
  306. return True
  307. def _label(self, path, succ_count, all, commit_count, fail_count):
  308. ratio = 0.00
  309. if all > 0:
  310. ratio = round(succ_count * 100 / all, 2)
  311. items = re.split(r'/', path)
  312. if len(items) == 6:
  313. (_, _sday, _chname, _quality, _card_type, _amount) = items
  314. card_type = ''
  315. if _card_type == '1':
  316. card_type = 'SY'
  317. elif _card_type == '2':
  318. card_type = 'SH'
  319. elif _card_type == '4':
  320. card_type = 'YD'
  321. elif _card_type == '5':
  322. card_type = 'LT'
  323. elif _card_type == '6':
  324. card_type = 'DX'
  325. elif _card_type == '7':
  326. card_type = 'TH'
  327. return f"{_chname}-{_quality}-{card_type}-{_amount}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
  328. else:
  329. if path == '' or path is None:
  330. path = 'average'
  331. return f"{path}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
  332. pass
  333. #统计机构当前时间之前序列时间成功率
  334. def _merge_mobile_path(self,paths):
  335. result = {}
  336. for path in paths:
  337. items = re.split(r'/', path)
  338. if len(items) != 6:
  339. continue
  340. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  341. _card_type = int(_card_type)
  342. if _card_type not in [4, 5, 6]:
  343. continue
  344. _mchid = int(_mchid)
  345. if _mchid not in result:
  346. result[_mchid] = []
  347. result[_mchid].append(path)
  348. return result
  349. def _merge_data(self, hfive, paths):
  350. dim = len(self.pos_map)
  351. predata = np.zeros((dim, 86400))
  352. for path in paths:
  353. predata += hfive[path]
  354. return predata
  355. def _calc_mratio(self,data,startes,end):
  356. succ = data[1]
  357. fail = data[2]
  358. x = np.arange(0, 86400, 1)
  359. result = {}
  360. for start in startes:
  361. if end - start < 0:
  362. start_pos = 0
  363. else:
  364. start_pos = end - start
  365. pos = np.where(x >= start_pos)
  366. t = x[pos]
  367. _fail = fail[pos]
  368. _succ = succ[pos]
  369. pos = np.where(t < end)
  370. _fail = _fail[pos]
  371. _succ = _succ[pos]
  372. succs = int(np.sum(_succ))
  373. fails = int(np.sum(_fail))
  374. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  375. result[start] = [succs, fails, ratio]
  376. return result
  377. def mratios(self, time_stamp,presecs):
  378. paths = self.paths(time_stamp)
  379. mchid_paths = self._merge_mobile_path(paths)
  380. day_stamp = self.day_stamp(time_stamp)
  381. mratios = {}
  382. hfive = h5py.File(self._file_name, 'r')
  383. for mchid, paths in mchid_paths.items():
  384. mdata = self._merge_data(hfive,paths)
  385. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  386. mratios[mchid] = result
  387. hfive.close()
  388. return mratios
  389. def calc_ratio(self):
  390. import json
  391. r = None
  392. try:
  393. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  394. r = redis.Redis(connection_pool=pool)
  395. except Exception as ex:
  396. print(ex)
  397. while True:
  398. try:
  399. time_sec = int(stime.time())
  400. presecs = [900, 1800, 3600, 7200, 86400]
  401. mratios = self.mratios(time_sec, presecs)
  402. if len(mratios) != 0:
  403. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  404. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  405. print('push msg=',mratios)
  406. except Exception as ex:
  407. print(ex)
  408. finally:
  409. stime.sleep(2)
  410. #以下为按照卡类型计算成功率代码
  411. def _merge_mobile_type_path(self,paths,card_type=None):
  412. result = {}
  413. for path in paths:
  414. items = re.split(r'/', path)
  415. if len(items) != 6:
  416. continue
  417. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  418. _card_type = int(_card_type)
  419. if _card_type not in [4, 5, 6]:
  420. continue
  421. if card_type is not None and _card_type != card_type:
  422. continue
  423. _mchid = int(_mchid)
  424. if _mchid not in result:
  425. result[_mchid] = []
  426. result[_mchid].append(path)
  427. return result
  428. def mratio_types(self, time_stamp,presecs):
  429. paths = self.paths(time_stamp)
  430. mchid_paths = self._merge_mobile_type_path(paths)
  431. day_stamp = self.day_stamp(time_stamp)
  432. card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
  433. mratios = {}
  434. hfive = h5py.File(self._file_name, 'r')
  435. for mchid, paths in mchid_paths.items():
  436. mch_ratios = {}
  437. for card_type, name in card_types.items():
  438. print('card_type=', card_type, 'name=', name)
  439. if card_type is None:
  440. cur_paths = paths
  441. else:
  442. cur_paths = self._merge_mobile_type_path(paths, card_type)
  443. if len(cur_paths) == 0:
  444. continue
  445. cur_paths = cur_paths[mchid]
  446. mdata = self._merge_data(hfive,cur_paths)
  447. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  448. mch_ratios[name] = result
  449. mratios[mchid] = mch_ratios
  450. hfive.close()
  451. return mratios
  452. def calc_ratios(self):
  453. import json
  454. r = None
  455. try:
  456. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  457. r = redis.Redis(connection_pool=pool)
  458. except Exception as ex:
  459. print(ex)
  460. while True:
  461. try:
  462. time_sec = int(stime.time())
  463. presecs = [900, 1800, 3600, 7200, 86400]
  464. mratios = self.mratio_types(time_sec, presecs)
  465. if len(mratios) != 0:
  466. r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
  467. # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  468. print('push msg=', mratios)
  469. except Exception as ex:
  470. print(ex)
  471. finally:
  472. stime.sleep(2)
  473. ####################################################################################################################
  474. ####
  475. def _calc_mcount(self,data,startes,end):
  476. succ = data[1]
  477. fail = data[2]
  478. x = np.arange(0, 86400, 1)
  479. result = {}
  480. for start in startes:
  481. if end - start < 0:
  482. start_pos = 0
  483. else:
  484. start_pos = end - start
  485. pos = np.where(x >= start_pos)
  486. t = x[pos]
  487. _succ = succ[pos]
  488. _fail = fail[pos]
  489. pos = np.where(t < end)
  490. _succ = _succ[pos]
  491. _fail = _fail[pos]
  492. succs = int(np.sum(_succ))
  493. fails = int(np.sum(_fail))
  494. ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
  495. result[start] = [succs, fails, ratio]
  496. return result
  497. def merchant_rmobile_path(self, paths):
  498. result = {}
  499. for path in paths:
  500. items = re.split(r'/', path)
  501. if len(items) != 6:
  502. continue
  503. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  504. _card_type = int(_card_type)
  505. if _card_type not in [4, 5, 6]:
  506. continue
  507. _mchid = int(_mchid)
  508. if _mchid not in result:
  509. result[_mchid] = []
  510. result[_mchid].append(path)
  511. return result
  512. def rmobile_path(self, paths):
  513. result = []
  514. for path in paths:
  515. items = re.split(r'/', path)
  516. if len(items) != 6:
  517. continue
  518. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  519. _card_type = int(_card_type)
  520. if _card_type not in [4, 5, 6]:
  521. continue
  522. result.append(path)
  523. return result
  524. def mch_count(self, paths, presecs,time_stamp):
  525. hfive = h5py.File(self._file_name, 'r')
  526. mdata = self._merge_data(hfive,paths)
  527. day_stamp = self.day_stamp(time_stamp)
  528. result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  529. hfive.close()
  530. return result
  531. def mch_detail_count(self, paths, presecs,time_stamp):
  532. hfive = h5py.File(self._file_name, 'r')
  533. result = {}
  534. for path in paths:
  535. items = re.split(r'/', path)
  536. if len(items) != 6:
  537. continue
  538. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  539. key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
  540. mdata = hfive[path]
  541. day_stamp = self.day_stamp(time_stamp)
  542. result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
  543. hfive.close()
  544. return result
  545. def mch_counts(self):
  546. import json
  547. r = None
  548. try:
  549. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  550. r = redis.Redis(connection_pool=pool)
  551. except Exception as ex:
  552. print(ex)
  553. while True:
  554. try:
  555. time_stamp = int(stime.time())
  556. presecs = [900, 1800, 3600, 7200, 86400]
  557. all_paths = self.paths(time_stamp)
  558. mchid_paths = self.merchant_rmobile_path(all_paths)
  559. gross = {}
  560. for mchid, paths in mchid_paths.items():
  561. counts = self.mch_count(paths, presecs, time_stamp)
  562. gross[mchid] = counts
  563. paths = self.rmobile_path(all_paths)
  564. detail = self.mch_detail_count(paths, presecs, time_stamp)
  565. result = {'gross': gross, 'detail': detail}
  566. if len(result) != 0:
  567. r.set(f"nc_merchant_refill_counts", json.dumps(result))
  568. r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
  569. print('push msg=', result)
  570. except Exception as ex:
  571. print(ex)
  572. finally:
  573. stime.sleep(2)
  574. queueListener = QueueListener()