MchDataCenter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. import time as time
  14. class MchDataCenter(object):
  15. pos_map = {
  16. 'commit': 0, 'notify': 1
  17. }
  18. def __init__(self):
  19. self._mquit = False
  20. self._mRHost = ''
  21. self._mRPort = 6379
  22. self._file_name = '/var/www/html/data/stdata/user.hdf5'
  23. def set_redis(self, rhost, rport):
  24. self._mRHost = rhost
  25. self._mRPort = rport
  26. def stop(self):
  27. self._mquit = True
  28. pass
  29. def prepare_data(self):
  30. while self._mquit == False:
  31. try:
  32. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  33. r = redis.Redis(connection_pool=pool)
  34. if path.exists(self._file_name):
  35. hfive = h5py.File(self._file_name, 'a')
  36. else:
  37. hfive = h5py.File(self._file_name, 'w')
  38. self.read_redis(hfive, r, 'nc_user_monitor_commit', 'commit')
  39. self.read_redis(hfive, r, 'nc_user_monitor_success', 'notify')
  40. hfive.close()
  41. self.del_redis(r, 'nc_user_monitor_commit')
  42. self.del_redis(r, 'nc_user_monitor_success')
  43. except Exception as ex:
  44. print(ex)
  45. finally:
  46. for i in range(60):
  47. if self._mquit:
  48. break
  49. else:
  50. stime.sleep(1)
  51. def del_redis(self, redis, name):
  52. latest_time = int(stime.time()) - 300
  53. for item in redis.hscan_iter(name):
  54. key = str(item[0], encoding="utf-8")
  55. items = re.split(r'-', key)
  56. fdel = True
  57. if len(items) == 5:
  58. (mchid, quality, card_type, amount, time) = items
  59. time = int(time)
  60. if latest_time <= time:
  61. fdel = False
  62. if fdel:
  63. redis.hdel(name, key)
  64. pass
  65. def read_redis(self, hfive, redis, name, prefix):
  66. i = 0
  67. for item in redis.hscan_iter(name):
  68. key = str(item[0], encoding="utf-8")
  69. val = str(item[1], encoding="utf-8")
  70. print(f'{prefix}:{i}')
  71. i += 1
  72. self.parase(hfive, key, val, prefix)
  73. def parase(self, hfive, text, val, prefix):
  74. items = re.split(r'-', text)
  75. if len(items) != 5:
  76. return False
  77. (mchid, quality, card_type, amount, time) = items
  78. pos = self.pos_map[f'{prefix}']
  79. time = int(time)
  80. today = self.day_stamp(time)
  81. path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
  82. if path not in hfive:
  83. hfive[path] = np.zeros((2, 86400))
  84. diff = time - today
  85. if diff < 0:
  86. print(diff)
  87. hfive[path][pos, diff] = val
  88. print(path, pos, diff, val, hfive[path][pos, diff])
  89. pass
  90. def day_stamp(self, stamp):
  91. stamp = int(stamp)
  92. x = stime.gmtime(stamp + 8 * 3600)
  93. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  94. today = stamp - diff.total_seconds()
  95. return int(today)
  96. def _days(self, root):
  97. result = []
  98. try:
  99. for name, sub in root.items():
  100. if isinstance(sub, h5py.Group):
  101. result.append(name)
  102. except Exception as ex:
  103. print(ex)
  104. finally:
  105. return result
  106. def days(self):
  107. try:
  108. hfive = h5py.File(self._file_name, 'r')
  109. root = hfive.require_group('/')
  110. days = self._days(root)
  111. hfive.close()
  112. return days
  113. except Exception as ex:
  114. print(ex)
  115. return []
  116. def paths(self, time_stamp):
  117. try:
  118. day_stamp = self.day_stamp(time_stamp)
  119. hfive = h5py.File(self._file_name, 'r')
  120. group = hfive.require_group(f'/{day_stamp}')
  121. paths = self.dir(group)
  122. hfive.close()
  123. return paths
  124. except Exception as ex:
  125. print(ex)
  126. return []
  127. def dir(self, group):
  128. result = []
  129. for name, sub in group.items():
  130. if isinstance(sub, h5py.Group):
  131. result.extend(self.dir(sub))
  132. else:
  133. result.append(sub.name)
  134. return result
  135. def _all_none(self, **kwargs):
  136. for key, val in kwargs.items():
  137. if val is not None:
  138. return False
  139. return True
  140. def _merge_path(self,paths):
  141. result = {}
  142. for path in paths:
  143. items = re.split(r'/', path)
  144. if len(items) != 6:
  145. continue
  146. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  147. _mchid = int(_mchid)
  148. if _mchid not in result:
  149. result[_mchid] = []
  150. result[_mchid].append(path)
  151. return result
  152. def draw_plot(self, start_time, interval=300, **kwargs):
  153. logger = logging.getLogger('app')
  154. hfive = h5py.File(self._file_name, 'r')
  155. try:
  156. day_stamp = self.day_stamp(start_time)
  157. start_pos = start_time - day_stamp
  158. cur_day = self.day_stamp(stime.time())
  159. if day_stamp == cur_day:
  160. end_pos = int(stime.time()) - day_stamp
  161. else:
  162. end_pos = -1
  163. fig = Figure(figsize=(16, 8))
  164. ax = fig.subplots()
  165. predata = np.zeros((2, 86400))
  166. x = np.arange(0, 86400, interval)
  167. sub_count = 0
  168. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  169. if self._all_none(**kwargs):
  170. paths = self._merge_path(paths)
  171. for mchid, data in self._read_dict_data(hfive, paths):
  172. predata = predata + data
  173. path = f'{mchid}'
  174. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  175. if ret:
  176. sub_count += 1
  177. pass
  178. else:
  179. for path, data in self.read_data(hfive, paths):
  180. data = np.array(data)
  181. predata = predata + data
  182. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  183. if ret:
  184. sub_count += 1
  185. if sub_count > 1:
  186. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  187. ax.legend()
  188. ax.grid()
  189. ax.set_title('success ratio')
  190. ax.set(xlabel='time', ylabel='ratio')
  191. fig.autofmt_xdate()
  192. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  193. buf = BytesIO()
  194. fig.savefig(buf, format="png")
  195. return buf
  196. except Exception as ex:
  197. print(ex)
  198. finally:
  199. hfive.close()
  200. def read_data(self, hfive, paths):
  201. for path in paths:
  202. yield path, hfive[path]
  203. def _read_dict_data(self, hfive, mchPaths):
  204. for mchid, paths in mchPaths.items():
  205. predata = np.zeros((2, 86400))
  206. for path in paths:
  207. predata += hfive[path]
  208. yield mchid, predata
  209. def datasets(self, hfive, start_time, **kwargs):
  210. logger = logging.getLogger('app')
  211. day_stamp = self.day_stamp(start_time)
  212. sday = f'{day_stamp}'
  213. root = hfive.require_group('/')
  214. days = self._days(root)
  215. if sday not in days:
  216. return False
  217. group = hfive.require_group(sday)
  218. dsets = self.dir(group)
  219. mchid = quality = card_type = amount = None
  220. for key, val in kwargs.items():
  221. if val is None:
  222. continue
  223. if key == 'mchid':
  224. mchid = val
  225. elif key == 'quality':
  226. quality = f'{val}'
  227. elif key == 'card_type':
  228. card_type = f'{val}'
  229. elif key == 'amount':
  230. amount = f'{val}'
  231. else:
  232. continue
  233. return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
  234. def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
  235. filer_text = ''
  236. if mchid is not None:
  237. filer_text = mchid
  238. if quality is not None:
  239. filer_text = filer_text + f"-qua:{quality}"
  240. if card_type is not None:
  241. filer_text = filer_text + f"-type:{card_type}"
  242. if amount is not None:
  243. filer_text = filer_text + f"-amount:{amount}"
  244. paths = []
  245. for text in dsets:
  246. items = re.split(r'/', text)
  247. if len(items) != 6:
  248. return False
  249. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  250. if (mchid is not None) and (_mchid != mchid):
  251. continue
  252. if (quality is not None) and (_quality != quality):
  253. continue
  254. if (card_type is not None) and (_card_type != card_type):
  255. continue
  256. if (amount is not None) and (_amount != amount):
  257. continue
  258. paths.append(text)
  259. return filer_text, paths
  260. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  261. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  262. logging.getLogger('app').debug("path=%s", path)
  263. all = data[0]
  264. all = all.reshape((-1, interval))
  265. all = np.sum(all, axis=1)
  266. ySucc = data[1]
  267. ySucc = ySucc.reshape((-1, interval))
  268. ySucc = np.sum(ySucc, axis=1)
  269. if end_pos == -1:
  270. pos = np.where(x >= start_pos)
  271. x = x[pos]
  272. ySucc = ySucc[pos]
  273. all = all[pos]
  274. else:
  275. pos = np.where(start_pos <= x)
  276. x = x[pos]
  277. ySucc = ySucc[pos]
  278. all = all[pos]
  279. pos = np.where(x < end_pos)
  280. x = x[pos]
  281. ySucc = ySucc[pos]
  282. all = all[pos]
  283. succ_count = int(np.sum(ySucc))
  284. all_count = int(np.sum(all))
  285. if all_count < 1:
  286. return False
  287. pos = np.where(ySucc > all)
  288. ySucc[pos] = all[pos]
  289. ySucc = ySucc / (all + 0.00000001)
  290. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  291. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  292. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  293. return True
  294. def _label(self, path, count, all):
  295. ratio = 0.00
  296. if all > 0:
  297. ratio = round(count * 100 / all, 2)
  298. items = re.split(r'/', path)
  299. if len(items) == 6:
  300. (_, _sday, _chname, _quality, _card_type, _amount) = items
  301. card_type = ''
  302. if _card_type == '1':
  303. card_type = 'SY'
  304. elif _card_type == '2':
  305. card_type = 'SH'
  306. elif _card_type == '4':
  307. card_type = 'YD'
  308. elif _card_type == '5':
  309. card_type = 'LT'
  310. elif _card_type == '6':
  311. card_type = 'DX'
  312. elif _card_type == '7':
  313. card_type = 'TH'
  314. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  315. else:
  316. if path == '' or path is None:
  317. path = 'average'
  318. return f"{path}:{count}/{all} = {ratio}%"
  319. pass
  320. #统计机构当前时间之前序列时间成功率
  321. def _merge_mobile_path(self,paths):
  322. result = {}
  323. for path in paths:
  324. items = re.split(r'/', path)
  325. if len(items) != 6:
  326. continue
  327. (_, _sday, _mchid, _quality, _card_type, _amount) = items
  328. _card_type = int(_card_type)
  329. if _card_type not in [4, 5, 6]:
  330. continue
  331. _mchid = int(_mchid)
  332. if _mchid not in result:
  333. result[_mchid] = []
  334. result[_mchid].append(path)
  335. return result
  336. def _merge_data(self, hfive, paths):
  337. predata = np.zeros((2, 86400))
  338. for path in paths:
  339. predata += hfive[path]
  340. return predata
  341. def _calc_mratio(self,data,startes,end):
  342. all = data[0]
  343. ySucc = data[1]
  344. x = np.arange(0, 86400, 1)
  345. result = {}
  346. for start in startes:
  347. if end - start < 0:
  348. start_pos = 0
  349. else:
  350. start_pos = end - start
  351. pos = np.where(x >= start_pos)
  352. t = x[pos]
  353. _all = all[pos]
  354. _succ = ySucc[pos]
  355. pos = np.where(t < end)
  356. _all = _all[pos]
  357. _succ = _succ[pos]
  358. succ_count = np.sum(_succ)
  359. all_count = np.sum(_all) + 0.00001
  360. ratio = round(succ_count / all_count, 2)
  361. if ratio > 1.0:
  362. ratio = 1.0
  363. result[start] = ratio
  364. return result
  365. def mratios(self, time_stamp,presecs):
  366. paths = self.paths(time_stamp)
  367. mchid_paths = self._merge_mobile_path(paths)
  368. day_stamp = self.day_stamp(time_stamp)
  369. mratios = {}
  370. hfive = h5py.File(self._file_name, 'a')
  371. for mchid, paths in mchid_paths.items():
  372. mdata = self._merge_data(hfive,paths)
  373. result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
  374. mratios[mchid] = result
  375. hfive.close()
  376. return mratios
  377. def calc_ratio(self):
  378. import json
  379. r = None
  380. try:
  381. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  382. r = redis.Redis(connection_pool=pool)
  383. except Exception as ex:
  384. print(ex)
  385. while True:
  386. try:
  387. time_sec = int(time.time())
  388. presecs = [900, 1800, 3600, 7200, 86400]
  389. mratios = self.mratios(time_sec, presecs)
  390. if len(mratios) != 0:
  391. r.set(f"nc_merchant_ratios", json.dumps(mratios))
  392. r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
  393. print('push msg=',mratios)
  394. except Exception as ex:
  395. print(ex)
  396. finally:
  397. stime.sleep(2)
  398. mchDataCenter = MchDataCenter()