DataCenter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. import time as time
  14. class DataCenter(object):
  15. pos_map = {
  16. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  17. }
  18. def __init__(self):
  19. self._mquit = False
  20. self._mRHost = ''
  21. self._mRPort = 6379
  22. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  23. def set_redis(self, rhost, rport):
  24. self._mRHost = rhost
  25. self._mRPort = rport
  26. def stop(self):
  27. self._mquit = True
  28. pass
  29. def prepare_data(self):
  30. while self._mquit == False:
  31. try:
  32. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  33. r = redis.Redis(connection_pool=pool)
  34. if path.exists(self._file_name):
  35. hfive = h5py.File(self._file_name, 'a')
  36. else:
  37. hfive = h5py.File(self._file_name, 'w')
  38. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  39. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  40. hfive.close()
  41. self.del_redis(r, 'nc_channel_monitor_commit')
  42. self.del_redis(r, 'nc_channel_monitor_notify')
  43. except Exception as ex:
  44. print(ex)
  45. finally:
  46. time_sec = int(time.time())
  47. cur_min = time_sec - time_sec % 60
  48. for i in range(66):
  49. time_sec = int(time.time())
  50. next_min = time_sec - time_sec % 60
  51. if next_min > cur_min or self._mquit:
  52. break
  53. else:
  54. stime.sleep(1)
  55. def del_redis(self, redis, name):
  56. latest_time = int(stime.time()) - 120
  57. for item in redis.hscan_iter(name):
  58. key = str(item[0], encoding="utf-8")
  59. items = re.split(r'-', key)
  60. fdel = True
  61. if len(items) == 6:
  62. (stype, chname, quality, card_type, amount, time) = items
  63. time = int(time)
  64. if latest_time <= time:
  65. fdel = False
  66. if fdel:
  67. redis.hdel(name, key)
  68. pass
  69. def read_redis(self, hfive, redis, name, prefix):
  70. i = 0
  71. for item in redis.hscan_iter(name):
  72. key = str(item[0], encoding="utf-8")
  73. val = str(item[1], encoding="utf-8")
  74. print(f'{prefix}:{i}')
  75. i += 1
  76. self.parase(hfive, key, val, prefix)
  77. def parase(self, hfive, text, val, prefix):
  78. items = re.split(r'-', text)
  79. if len(items) != 6:
  80. return False
  81. (stype, chname, quality, card_type, amount, time) = items
  82. if stype == 'succ':
  83. pos = self.pos_map[f'{prefix}-succ']
  84. elif stype == 'fail':
  85. pos = self.pos_map[f'{prefix}-fail']
  86. else:
  87. return False
  88. time = int(time)
  89. today = self.day_stamp(time)
  90. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  91. if path not in hfive:
  92. hfive[path] = np.zeros((5, 86400))
  93. diff = time - today
  94. if diff < 0:
  95. print(diff)
  96. hfive[path][pos, diff] = val
  97. print(path, pos, diff, val, hfive[path][pos, diff])
  98. pass
  99. def day_stamp(self, stamp):
  100. stamp = int(stamp)
  101. x = stime.gmtime(stamp + 8 * 3600)
  102. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  103. today = stamp - diff.total_seconds()
  104. return int(today)
  105. def _days(self, root):
  106. result = []
  107. try:
  108. for name, sub in root.items():
  109. if isinstance(sub, h5py.Group):
  110. result.append(name)
  111. except Exception as ex:
  112. print(ex)
  113. finally:
  114. return result
  115. def days(self):
  116. try:
  117. hfive = h5py.File(self._file_name, 'r')
  118. root = hfive.require_group('/')
  119. days = self._days(root)
  120. hfive.close()
  121. return days
  122. except Exception as ex:
  123. print(ex)
  124. return []
  125. def paths(self, time_stamp):
  126. try:
  127. day_stamp = self.day_stamp(time_stamp)
  128. hfive = h5py.File(self._file_name, 'r')
  129. group = hfive.require_group(f'/{day_stamp}')
  130. paths = self.dir(group)
  131. hfive.close()
  132. return paths
  133. except Exception as ex:
  134. print(ex)
  135. return []
  136. def dir(self, group):
  137. result = []
  138. for name, sub in group.items():
  139. if isinstance(sub, h5py.Group):
  140. result.extend(self.dir(sub))
  141. else:
  142. result.append(sub.name)
  143. return result
  144. def draw_plot(self, start_time, interval=300, **kwargs):
  145. logger = logging.getLogger('app')
  146. hfive = h5py.File(self._file_name, 'r')
  147. try:
  148. day_stamp = self.day_stamp(start_time)
  149. start_pos = start_time - day_stamp
  150. cur_day = self.day_stamp(stime.time())
  151. if day_stamp == cur_day:
  152. end_pos = int(stime.time()) - day_stamp
  153. else:
  154. end_pos = -1
  155. fig = Figure(figsize=(16, 8))
  156. ax = fig.subplots()
  157. x = np.arange(0, 86400, interval)
  158. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  159. sub_count = 0
  160. predata = np.zeros((5, 86400))
  161. for path, data in self.read_data(hfive, paths):
  162. data = np.array(data)
  163. predata = predata + data
  164. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  165. if ret:
  166. sub_count += 1
  167. if sub_count > 1:
  168. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  169. ax.legend()
  170. ax.grid()
  171. ax.set_title('success ratio')
  172. ax.set(xlabel='time', ylabel='ratio')
  173. fig.autofmt_xdate()
  174. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  175. buf = BytesIO()
  176. fig.savefig(buf, format="png")
  177. return buf
  178. except Exception as ex:
  179. print(ex)
  180. finally:
  181. hfive.close()
  182. def read_data(self, hfive, paths):
  183. for path in paths:
  184. yield path, hfive[path]
  185. def datasets(self, hfive, start_time, **kwargs):
  186. logger = logging.getLogger('app')
  187. day_stamp = self.day_stamp(start_time)
  188. sday = f'{day_stamp}'
  189. root = hfive.require_group('/')
  190. days = self._days(root)
  191. if sday not in days:
  192. return '', []
  193. group = hfive.require_group(sday)
  194. dsets = self.dir(group)
  195. chname = quality = card_type = amount = None
  196. for key, val in kwargs.items():
  197. if val is None:
  198. continue
  199. elif key == 'chname':
  200. chname = val
  201. elif key == 'quality':
  202. quality = f'{val}'
  203. elif key == 'card_type':
  204. card_type = f'{val}'
  205. elif key == 'amount':
  206. amount = f'{val}'
  207. else:
  208. continue
  209. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  210. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  211. filer_text = ''
  212. if chname is not None:
  213. filer_text = chname
  214. if quality is not None:
  215. filer_text = filer_text + f"-qua:{quality}"
  216. if card_type is not None:
  217. filer_text = filer_text + f"-type:{card_type}"
  218. if amount is not None:
  219. filer_text = filer_text + f"-amount:{amount}"
  220. paths = []
  221. for text in dsets:
  222. items = re.split(r'/', text)
  223. if len(items) != 6:
  224. return False
  225. (_, _sday, _chname, _quality, _card_type, _amount) = items
  226. if (chname is not None) and (_chname != chname):
  227. continue
  228. if (quality is not None) and (_quality != quality):
  229. continue
  230. if (card_type is not None) and (_card_type != card_type):
  231. continue
  232. if (amount is not None) and (_amount != amount):
  233. continue
  234. paths.append(text)
  235. return filer_text, paths
  236. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  237. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  238. logging.getLogger('app').debug("path=%s", path)
  239. all = data[2] + data[3]
  240. all = all.reshape((-1, interval))
  241. all = np.sum(all, axis=1)
  242. ySucc = data[2]
  243. ySucc = ySucc.reshape((-1, interval))
  244. ySucc = np.sum(ySucc, axis=1)
  245. if end_pos == -1:
  246. pos = np.where(x >= start_pos)
  247. x = x[pos]
  248. ySucc = ySucc[pos]
  249. all = all[pos]
  250. else:
  251. pos = np.where(start_pos <= x)
  252. x = x[pos]
  253. ySucc = ySucc[pos]
  254. all = all[pos]
  255. pos = np.where(x < end_pos)
  256. x = x[pos]
  257. ySucc = ySucc[pos]
  258. all = all[pos]
  259. succ_count = int(np.sum(ySucc))
  260. all_count = int(np.sum(all))
  261. opened = np.where(ySucc > 0.1)
  262. if len(opened[0]) == 0:
  263. logging.getLogger('app').debug("path=%s,opened=False", path)
  264. return False
  265. ySucc = ySucc / all
  266. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  267. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  268. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  269. return True
  270. def _label(self, path, count, all):
  271. ratio = 0.00
  272. if all > 0:
  273. ratio = round(count * 100 / all, 2)
  274. items = re.split(r'/', path)
  275. if len(items) == 6:
  276. (_, _sday, _chname, _quality, _card_type, _amount) = items
  277. card_type = ''
  278. if _card_type == '1':
  279. card_type = 'SY'
  280. elif _card_type == '2':
  281. card_type = 'SH'
  282. elif _card_type == '4':
  283. card_type = 'YD'
  284. elif _card_type == '5':
  285. card_type = 'LT'
  286. elif _card_type == '6':
  287. card_type = 'DX'
  288. elif _card_type == '7':
  289. card_type = 'TH'
  290. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  291. else:
  292. if path == '' or path is None:
  293. path = 'average'
  294. return f"{path}:{count}/{all} = {ratio}%"
  295. def _cur_min(self):
  296. time_sec = int(time.time())
  297. cur_min = time_sec - time_sec % 60
  298. return cur_min
  299. def _calc_tuple(self, data, start_pos, end_pos):
  300. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  301. logging.getLogger('app').debug("path=%s", path)
  302. cur_data = data[:, start_pos:end_pos]
  303. all = np.sum(cur_data, axis=1)
  304. all = all.tolist()
  305. result = [int(x) for x in all]
  306. return result[:4]
  307. def _repath(self, path):
  308. items = re.split(r'/', path)
  309. if len(items) == 6:
  310. (_, _sday, _chname, _quality, _card_type, _amount) = items
  311. return f"{_chname}-{_quality}-{_card_type}-{_amount}"
  312. else:
  313. return False
  314. def _calc_ratio(self, start_time, end_time):
  315. hfive = None
  316. try:
  317. hfive = h5py.File(self._file_name, 'r')
  318. day_stamp = self.day_stamp(start_time)
  319. start_pos = start_time - day_stamp
  320. end_pos = end_time - day_stamp
  321. result = {time: end_time}
  322. ratios = {}
  323. filer_text, paths = self.datasets(hfive, start_time)
  324. if len(paths) == 0:
  325. return None
  326. for path, data in self.read_data(hfive, paths):
  327. data = np.array(data)
  328. ratio = self._calc_tuple(data, start_pos, end_pos)
  329. key = self._repath(path)
  330. ratios[key] = ratio
  331. print(path, ratio)
  332. result['ratios'] = ratios
  333. return result
  334. except Exception as ex:
  335. print(ex)
  336. finally:
  337. hfive.close()
  338. pass
  339. def calc_ratio(self):
  340. import json
  341. r = None
  342. try:
  343. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  344. r = redis.Redis(connection_pool=pool)
  345. except Exception as ex:
  346. print(ex)
  347. cur_min = 0
  348. while True:
  349. try:
  350. time_sec = int(time.time())
  351. cur_min = time_sec - time_sec % 60
  352. start_time = cur_min - 300
  353. day_stamp = self.day_stamp(time_sec)
  354. if start_time >= day_stamp:
  355. self.log_time(day_stamp,time_sec,start_time,cur_min)
  356. ratios = self._calc_ratio(start_time=start_time, end_time=cur_min)
  357. if ratios != None:
  358. r.set(f"nc_channel_ratios", json.dumps(ratios))
  359. except Exception as ex:
  360. print(ex)
  361. finally:
  362. for i in range(66):
  363. time_sec = int(time.time())
  364. next_min = time_sec - time_sec % 60
  365. if (next_min > cur_min and time_sec % 60 == 2) or self._mquit:
  366. break
  367. else:
  368. stime.sleep(1)
  369. def log_time(self,day_stamp, time_sec, start_time, cur_min):
  370. d = time.asctime(time.localtime(day_stamp))
  371. a = time.asctime(time.localtime(time_sec))
  372. b = time.asctime(time.localtime(start_time))
  373. c = time.asctime(time.localtime(cur_min))
  374. print('day_stamp=', d, 'cur_time=', a, 'start_time=', b, 'end_time=', c)
  375. dataCenter = DataCenter()