DataCenter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. import time as time
  14. class DataCenter(object):
  15. pos_map = {
  16. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  17. }
  18. def __init__(self):
  19. self._mquit = False
  20. self._mRHost = ''
  21. self._mRPort = 6379
  22. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  23. def set_redis(self, rhost, rport):
  24. self._mRHost = rhost
  25. self._mRPort = rport
  26. def stop(self):
  27. self._mquit = True
  28. pass
  29. def prepare_data(self):
  30. while True:
  31. try:
  32. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  33. r = redis.Redis(connection_pool=pool)
  34. if path.exists(self._file_name):
  35. hfive = h5py.File(self._file_name, 'a')
  36. else:
  37. hfive = h5py.File(self._file_name, 'w')
  38. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  39. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  40. hfive.close()
  41. self.del_redis(r, 'nc_channel_monitor_commit')
  42. self.del_redis(r, 'nc_channel_monitor_notify')
  43. except Exception as ex:
  44. print(ex)
  45. finally:
  46. for i in range(60):
  47. if self._mquit == True:
  48. break
  49. else:
  50. stime.sleep(1)
  51. def del_redis(self, redis, name):
  52. latest_time = int(stime.time()) - 120
  53. for item in redis.hscan_iter(name):
  54. key = str(item[0], encoding="utf-8")
  55. items = re.split(r'-', key)
  56. fdel = True
  57. if len(items) == 6:
  58. (stype, chname, quality, card_type, amount, time) = items
  59. time = int(time)
  60. if latest_time <= time:
  61. fdel = False
  62. if fdel:
  63. redis.hdel(name, key)
  64. pass
  65. def read_redis(self, hfive, redis, name, prefix):
  66. i = 0
  67. for item in redis.hscan_iter(name):
  68. key = str(item[0], encoding="utf-8")
  69. val = str(item[1], encoding="utf-8")
  70. print(f'{prefix}:{i}')
  71. i += 1
  72. self.parase(hfive, key, val, prefix)
  73. def parase(self, hfive, text, val, prefix):
  74. items = re.split(r'-', text)
  75. if len(items) != 6:
  76. return False
  77. (stype, chname, quality, card_type, amount, time) = items
  78. if stype == 'succ':
  79. pos = self.pos_map[f'{prefix}-succ']
  80. elif stype == 'fail':
  81. pos = self.pos_map[f'{prefix}-fail']
  82. else:
  83. return False
  84. time = int(time)
  85. today = self.day_stamp(time)
  86. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  87. if path not in hfive:
  88. hfive[path] = np.zeros((5, 86400))
  89. diff = time - today
  90. if diff < 0:
  91. print(diff)
  92. hfive[path][pos, diff] = val
  93. print(path, pos, diff, val, hfive[path][pos, diff])
  94. pass
  95. def day_stamp(self, stamp):
  96. stamp = int(stamp)
  97. x = stime.gmtime(stamp + 8 * 3600)
  98. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  99. today = stamp - diff.total_seconds()
  100. return int(today)
  101. def _days(self, root):
  102. result = []
  103. try:
  104. for name, sub in root.items():
  105. if isinstance(sub, h5py.Group):
  106. result.append(name)
  107. except Exception as ex:
  108. print(ex)
  109. finally:
  110. return result
  111. def days(self):
  112. try:
  113. hfive = h5py.File(self._file_name, 'r')
  114. root = hfive.require_group('/')
  115. days = self._days(root)
  116. hfive.close()
  117. return days
  118. except Exception as ex:
  119. print(ex)
  120. return []
  121. def paths(self, time_stamp):
  122. try:
  123. day_stamp = self.day_stamp(time_stamp)
  124. hfive = h5py.File(self._file_name, 'r')
  125. group = hfive.require_group(f'/{day_stamp}')
  126. paths = self.dir(group)
  127. hfive.close()
  128. return paths
  129. except Exception as ex:
  130. print(ex)
  131. return []
  132. def dir(self, group):
  133. result = []
  134. for name, sub in group.items():
  135. if isinstance(sub, h5py.Group):
  136. result.extend(self.dir(sub))
  137. else:
  138. result.append(sub.name)
  139. return result
  140. def draw_plot(self, start_time, interval=300, **kwargs):
  141. logger = logging.getLogger('app')
  142. hfive = h5py.File(self._file_name, 'r')
  143. try:
  144. day_stamp = self.day_stamp(start_time)
  145. start_pos = start_time - day_stamp
  146. cur_day = self.day_stamp(stime.time())
  147. if day_stamp == cur_day:
  148. end_pos = int(stime.time()) - day_stamp
  149. else:
  150. end_pos = -1
  151. fig = Figure(figsize=(16, 8))
  152. ax = fig.subplots()
  153. x = np.arange(0, 86400, interval)
  154. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  155. sub_count = 0
  156. predata = np.zeros((5, 86400))
  157. for path, data in self.read_data(hfive, paths):
  158. data = np.array(data)
  159. predata = predata + data
  160. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  161. if ret:
  162. sub_count += 1
  163. if sub_count > 1:
  164. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  165. ax.legend()
  166. ax.grid()
  167. ax.set_title('success ratio')
  168. ax.set(xlabel='time', ylabel='ratio')
  169. fig.autofmt_xdate()
  170. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  171. buf = BytesIO()
  172. fig.savefig(buf, format="png")
  173. return buf
  174. except Exception as ex:
  175. print(ex)
  176. finally:
  177. hfive.close()
  178. def read_data(self, hfive, paths):
  179. for path in paths:
  180. yield path, hfive[path]
  181. def datasets(self, hfive, start_time, **kwargs):
  182. logger = logging.getLogger('app')
  183. day_stamp = self.day_stamp(start_time)
  184. sday = f'{day_stamp}'
  185. root = hfive.require_group('/')
  186. days = self._days(root)
  187. if sday not in days:
  188. return '', []
  189. group = hfive.require_group(sday)
  190. dsets = self.dir(group)
  191. chname = quality = card_type = amount = None
  192. for key, val in kwargs.items():
  193. if val is None:
  194. continue
  195. elif key == 'chname':
  196. chname = val
  197. elif key == 'quality':
  198. quality = f'{val}'
  199. elif key == 'card_type':
  200. card_type = f'{val}'
  201. elif key == 'amount':
  202. amount = f'{val}'
  203. else:
  204. continue
  205. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  206. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  207. filer_text = ''
  208. if chname is not None:
  209. filer_text = chname
  210. if quality is not None:
  211. filer_text = filer_text + f"-qua:{quality}"
  212. if card_type is not None:
  213. filer_text = filer_text + f"-type:{card_type}"
  214. if amount is not None:
  215. filer_text = filer_text + f"-amount:{amount}"
  216. paths = []
  217. for text in dsets:
  218. items = re.split(r'/', text)
  219. if len(items) != 6:
  220. return False
  221. (_, _sday, _chname, _quality, _card_type, _amount) = items
  222. if (chname is not None) and (_chname != chname):
  223. continue
  224. if (quality is not None) and (_quality != quality):
  225. continue
  226. if (card_type is not None) and (_card_type != card_type):
  227. continue
  228. if (amount is not None) and (_amount != amount):
  229. continue
  230. paths.append(text)
  231. return filer_text, paths
  232. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  233. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  234. logging.getLogger('app').debug("path=%s", path)
  235. all = data[2] + data[3]
  236. all = all.reshape((-1, interval))
  237. all = np.sum(all, axis=1)
  238. ySucc = data[2]
  239. ySucc = ySucc.reshape((-1, interval))
  240. ySucc = np.sum(ySucc, axis=1)
  241. if end_pos == -1:
  242. pos = np.where(x >= start_pos)
  243. x = x[pos]
  244. ySucc = ySucc[pos]
  245. all = all[pos]
  246. else:
  247. pos = np.where(start_pos <= x)
  248. x = x[pos]
  249. ySucc = ySucc[pos]
  250. all = all[pos]
  251. pos = np.where(x < end_pos)
  252. x = x[pos]
  253. ySucc = ySucc[pos]
  254. all = all[pos]
  255. succ_count = int(np.sum(ySucc))
  256. all_count = int(np.sum(all))
  257. opened = np.where(ySucc > 0.1)
  258. if len(opened[0]) == 0:
  259. logging.getLogger('app').debug("path=%s,opened=False", path)
  260. return False
  261. ySucc = ySucc / all
  262. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  263. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  264. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  265. return True
  266. def _label(self, path, count, all):
  267. ratio = 0.00
  268. if all > 0:
  269. ratio = round(count * 100 / all, 2)
  270. items = re.split(r'/', path)
  271. if len(items) == 6:
  272. (_, _sday, _chname, _quality, _card_type, _amount) = items
  273. card_type = ''
  274. if _card_type == '1':
  275. card_type = 'SY'
  276. elif _card_type == '2':
  277. card_type = 'SH'
  278. elif _card_type == '4':
  279. card_type = 'YD'
  280. elif _card_type == '5':
  281. card_type = 'LT'
  282. elif _card_type == '6':
  283. card_type = 'DX'
  284. elif _card_type == '7':
  285. card_type = 'TH'
  286. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  287. else:
  288. if path == '' or path is None:
  289. path = 'average'
  290. return f"{path}:{count}/{all} = {ratio}%"
  291. def _cur_min(self):
  292. time_sec = int(time.time())
  293. cur_min = time_sec - time_sec % 60
  294. return cur_min
  295. def _calc_tuple(self, data, start_pos, end_pos):
  296. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  297. logging.getLogger('app').debug("path=%s", path)
  298. cur_data = data[:, start_pos:end_pos]
  299. all = np.sum(cur_data, axis=1)
  300. all = all.tolist()
  301. result = [int(x) for x in all]
  302. return result[:4]
  303. def _repath(self, path):
  304. items = re.split(r'/', path)
  305. if len(items) == 6:
  306. (_, _sday, _chname, _quality, _card_type, _amount) = items
  307. return f"{_chname}-{_quality}-{_card_type}-{_amount}"
  308. else:
  309. return False
  310. def _calc_ratio(self, start_time, end_time):
  311. hfive = None
  312. try:
  313. hfive = h5py.File(self._file_name, 'r')
  314. day_stamp = self.day_stamp(start_time)
  315. start_pos = start_time - day_stamp
  316. end_pos = end_time - day_stamp
  317. result = {time: end_time}
  318. ratios = {}
  319. filer_text, paths = self.datasets(hfive, start_time)
  320. if len(paths) == 0:
  321. return None
  322. for path, data in self.read_data(hfive, paths):
  323. data = np.array(data)
  324. ratio = self._calc_tuple(data, start_pos, end_pos)
  325. key = self._repath(path)
  326. ratios[key] = ratio
  327. print(path, ratio)
  328. result['ratios'] = ratios
  329. return result
  330. except Exception as ex:
  331. print(ex)
  332. finally:
  333. hfive.close()
  334. pass
  335. def calc_ratio(self):
  336. import json
  337. r = None
  338. try:
  339. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  340. r = redis.Redis(connection_pool=pool)
  341. except Exception as ex:
  342. print(ex)
  343. cur_min = 0
  344. while True:
  345. try:
  346. time_sec = int(time.time())
  347. cur_min = time_sec - time_sec % 60
  348. start_time = cur_min - 300
  349. day_stamp = self.day_stamp(time_sec)
  350. if start_time >= day_stamp:
  351. self.log_time(day_stamp,time_sec,start_time,cur_min)
  352. ratios = self._calc_ratio(start_time=start_time, end_time=cur_min)
  353. if ratios != None:
  354. r.set('nc_channel_ratios', json.dumps(ratios))
  355. except Exception as ex:
  356. print(ex)
  357. finally:
  358. for i in range(60):
  359. time_sec = int(time.time())
  360. next_min = time_sec - time_sec % 60
  361. if next_min > cur_min:
  362. break
  363. else:
  364. stime.sleep(1)
  365. def log_time(self,day_stamp, time_sec, start_time, cur_min):
  366. d = time.asctime(time.localtime(day_stamp))
  367. a = time.asctime(time.localtime(time_sec))
  368. b = time.asctime(time.localtime(start_time))
  369. c = time.asctime(time.localtime(cur_min))
  370. print('day_stamp=', d, 'cur_time=', a, 'start_time=', b, 'end_time=', c)
  371. dataCenter = DataCenter()