DataCenter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. import time as time
  14. class DataCenter(object):
  15. pos_map = {
  16. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  17. }
  18. def __init__(self):
  19. self._mquit = False
  20. self._mRHost = ''
  21. self._mRPort = 6379
  22. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  23. def set_redis(self, rhost, rport):
  24. self._mRHost = rhost
  25. self._mRPort = rport
  26. def stop(self):
  27. self._mquit = True
  28. pass
  29. def prepare_data(self):
  30. while self._mquit == False:
  31. try:
  32. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  33. r = redis.Redis(connection_pool=pool)
  34. if path.exists(self._file_name):
  35. hfive = h5py.File(self._file_name, 'a')
  36. else:
  37. hfive = h5py.File(self._file_name, 'w')
  38. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  39. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  40. hfive.close()
  41. self.del_redis(r, 'nc_channel_monitor_commit')
  42. self.del_redis(r, 'nc_channel_monitor_notify')
  43. except Exception as ex:
  44. print(ex)
  45. finally:
  46. stime.sleep(10)
  47. def del_redis(self, redis, name):
  48. latest_time = int(stime.time()) - 120
  49. for item in redis.hscan_iter(name):
  50. key = str(item[0], encoding="utf-8")
  51. items = re.split(r'-', key)
  52. fdel = True
  53. if len(items) == 6:
  54. (stype, chname, quality, card_type, amount, time) = items
  55. time = int(time)
  56. if latest_time <= time:
  57. fdel = False
  58. if fdel:
  59. redis.hdel(name, key)
  60. pass
  61. def read_redis(self, hfive, redis, name, prefix):
  62. i = 0
  63. for item in redis.hscan_iter(name):
  64. key = str(item[0], encoding="utf-8")
  65. val = str(item[1], encoding="utf-8")
  66. print(f'{prefix}:{i}')
  67. i += 1
  68. self.parase(hfive, key, val, prefix)
  69. def parase(self, hfive, text, val, prefix):
  70. items = re.split(r'-', text)
  71. if len(items) != 6:
  72. return False
  73. (stype, chname, quality, card_type, amount, time) = items
  74. if stype == 'succ':
  75. pos = self.pos_map[f'{prefix}-succ']
  76. elif stype == 'fail':
  77. pos = self.pos_map[f'{prefix}-fail']
  78. else:
  79. return False
  80. time = int(time)
  81. today = self.day_stamp(time)
  82. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  83. if path not in hfive:
  84. hfive[path] = np.zeros((5, 86400))
  85. diff = time - today
  86. if diff < 0:
  87. print(diff)
  88. hfive[path][pos, diff] = val
  89. print(path, pos, diff, val, hfive[path][pos, diff])
  90. pass
  91. def day_stamp(self, stamp):
  92. stamp = int(stamp)
  93. x = stime.gmtime(stamp + 8 * 3600)
  94. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  95. today = stamp - diff.total_seconds()
  96. return int(today)
  97. def _days(self, root):
  98. result = []
  99. try:
  100. for name, sub in root.items():
  101. if isinstance(sub, h5py.Group):
  102. result.append(name)
  103. except Exception as ex:
  104. print(ex)
  105. finally:
  106. return result
  107. def days(self):
  108. try:
  109. hfive = h5py.File(self._file_name, 'r')
  110. root = hfive.require_group('/')
  111. days = self._days(root)
  112. hfive.close()
  113. return days
  114. except Exception as ex:
  115. print(ex)
  116. return []
  117. def paths(self, time_stamp):
  118. try:
  119. day_stamp = self.day_stamp(time_stamp)
  120. hfive = h5py.File(self._file_name, 'r')
  121. group = hfive.require_group(f'/{day_stamp}')
  122. paths = self.dir(group)
  123. hfive.close()
  124. return paths
  125. except Exception as ex:
  126. print(ex)
  127. return []
  128. def dir(self, group):
  129. result = []
  130. for name, sub in group.items():
  131. if isinstance(sub, h5py.Group):
  132. result.extend(self.dir(sub))
  133. else:
  134. result.append(sub.name)
  135. return result
  136. def draw_plot(self, start_time, interval=300, **kwargs):
  137. logger = logging.getLogger('app')
  138. hfive = h5py.File(self._file_name, 'r')
  139. try:
  140. day_stamp = self.day_stamp(start_time)
  141. start_pos = start_time - day_stamp
  142. cur_day = self.day_stamp(stime.time())
  143. if day_stamp == cur_day:
  144. end_pos = int(stime.time()) - day_stamp
  145. else:
  146. end_pos = -1
  147. fig = Figure(figsize=(16, 8))
  148. ax = fig.subplots()
  149. x = np.arange(0, 86400, interval)
  150. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  151. sub_count = 0
  152. predata = np.zeros((5, 86400))
  153. for path, data in self.read_data(hfive, paths):
  154. data = np.array(data)
  155. predata = predata + data
  156. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  157. if ret:
  158. sub_count += 1
  159. if sub_count > 1:
  160. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  161. ax.legend()
  162. ax.grid()
  163. ax.set_title('success ratio')
  164. ax.set(xlabel='time', ylabel='ratio')
  165. fig.autofmt_xdate()
  166. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  167. buf = BytesIO()
  168. fig.savefig(buf, format="png")
  169. return buf
  170. except Exception as ex:
  171. print(ex)
  172. finally:
  173. hfive.close()
  174. def read_data(self, hfive, paths):
  175. for path in paths:
  176. yield path, hfive[path]
  177. def datasets(self, hfive, start_time, **kwargs):
  178. logger = logging.getLogger('app')
  179. day_stamp = self.day_stamp(start_time)
  180. sday = f'{day_stamp}'
  181. root = hfive.require_group('/')
  182. days = self._days(root)
  183. if sday not in days:
  184. return '', []
  185. group = hfive.require_group(sday)
  186. dsets = self.dir(group)
  187. chname = quality = card_type = amount = None
  188. for key, val in kwargs.items():
  189. if val is None:
  190. continue
  191. elif key == 'chname':
  192. chname = val
  193. elif key == 'quality':
  194. quality = f'{val}'
  195. elif key == 'card_type':
  196. card_type = f'{val}'
  197. elif key == 'amount':
  198. amount = f'{val}'
  199. else:
  200. continue
  201. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  202. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  203. filer_text = ''
  204. if chname is not None:
  205. filer_text = chname
  206. if quality is not None:
  207. filer_text = filer_text + f"-qua:{quality}"
  208. if card_type is not None:
  209. filer_text = filer_text + f"-type:{card_type}"
  210. if amount is not None:
  211. filer_text = filer_text + f"-amount:{amount}"
  212. paths = []
  213. for text in dsets:
  214. items = re.split(r'/', text)
  215. if len(items) != 6:
  216. return False
  217. (_, _sday, _chname, _quality, _card_type, _amount) = items
  218. if (chname is not None) and (_chname != chname):
  219. continue
  220. if (quality is not None) and (_quality != quality):
  221. continue
  222. if (card_type is not None) and (_card_type != card_type):
  223. continue
  224. if (amount is not None) and (_amount != amount):
  225. continue
  226. paths.append(text)
  227. return filer_text, paths
  228. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  229. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  230. logging.getLogger('app').debug("path=%s", path)
  231. all = data[2] + data[3]
  232. all = all.reshape((-1, interval))
  233. all = np.sum(all, axis=1)
  234. ySucc = data[2]
  235. ySucc = ySucc.reshape((-1, interval))
  236. ySucc = np.sum(ySucc, axis=1)
  237. if end_pos == -1:
  238. pos = np.where(x >= start_pos)
  239. x = x[pos]
  240. ySucc = ySucc[pos]
  241. all = all[pos]
  242. else:
  243. pos = np.where(start_pos <= x)
  244. x = x[pos]
  245. ySucc = ySucc[pos]
  246. all = all[pos]
  247. pos = np.where(x < end_pos)
  248. x = x[pos]
  249. ySucc = ySucc[pos]
  250. all = all[pos]
  251. succ_count = int(np.sum(ySucc))
  252. all_count = int(np.sum(all))
  253. opened = np.where(ySucc > 0.1)
  254. if len(opened[0]) == 0:
  255. logging.getLogger('app').debug("path=%s,opened=False", path)
  256. return False
  257. ySucc = ySucc / all
  258. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  259. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  260. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  261. return True
  262. def _label(self, path, count, all):
  263. ratio = 0.00
  264. if all > 0:
  265. ratio = round(count * 100 / all, 2)
  266. items = re.split(r'/', path)
  267. if len(items) == 6:
  268. (_, _sday, _chname, _quality, _card_type, _amount) = items
  269. card_type = ''
  270. if _card_type == '1':
  271. card_type = 'SY'
  272. elif _card_type == '2':
  273. card_type = 'SH'
  274. elif _card_type == '4':
  275. card_type = 'YD'
  276. elif _card_type == '5':
  277. card_type = 'LT'
  278. elif _card_type == '6':
  279. card_type = 'DX'
  280. elif _card_type == '7':
  281. card_type = 'TH'
  282. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  283. else:
  284. if path == '' or path is None:
  285. path = 'average'
  286. return f"{path}:{count}/{all} = {ratio}%"
  287. def _cur_min(self):
  288. time_sec = int(time.time())
  289. cur_min = time_sec - time_sec % 60
  290. return cur_min
  291. def _calc_tuple(self, data, start_pos, end_pos):
  292. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  293. logging.getLogger('app').debug("path=%s", path)
  294. cur_data = data[:, start_pos:end_pos]
  295. all = np.sum(cur_data, axis=1)
  296. all = all.tolist()
  297. result = [int(x) for x in all]
  298. return result[:4]
  299. def _repath(self, path):
  300. items = re.split(r'/', path)
  301. if len(items) == 6:
  302. (_, _sday, _chname, _quality, _card_type, _amount) = items
  303. return f"{_chname}-{_amount}-{_card_type}-{_quality}"
  304. else:
  305. return False
  306. def _calc_ratio(self, start_time, end_time):
  307. hfive = None
  308. try:
  309. hfive = h5py.File(self._file_name, 'r')
  310. day_stamp = self.day_stamp(start_time)
  311. start_pos = start_time - day_stamp
  312. end_pos = end_time - day_stamp
  313. result = {'time': end_time}
  314. ratios = {}
  315. filer_text, paths = self.datasets(hfive, start_time)
  316. if len(paths) == 0:
  317. return None
  318. for path, data in self.read_data(hfive, paths):
  319. data = np.array(data)
  320. ratio = self._calc_tuple(data, start_pos, end_pos)
  321. key = self._repath(path)
  322. ratios[key] = ratio
  323. print(path, ratio)
  324. result['ratios'] = ratios
  325. return result
  326. except Exception as ex:
  327. print(ex)
  328. finally:
  329. hfive.close()
  330. pass
  331. def calc_ratio(self):
  332. import json
  333. r = None
  334. try:
  335. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  336. r = redis.Redis(connection_pool=pool)
  337. except Exception as ex:
  338. print(ex)
  339. while True:
  340. try:
  341. time_sec = int(time.time())
  342. day_stamp = self.day_stamp(time_sec)
  343. start_time = 0
  344. if time_sec > day_stamp + 901:
  345. start_time = time_sec - 901
  346. elif time_sec > day_stamp + 301:
  347. start_time = day_stamp
  348. else:
  349. pass
  350. self.log_time(day_stamp,time_sec,start_time)
  351. if start_time >= day_stamp:
  352. ratios = self._calc_ratio(start_time=start_time, end_time=time_sec - 1)
  353. print('ratios=',ratios)
  354. if ratios != None:
  355. r.set(f"nc_channel_ratios", json.dumps(ratios))
  356. r.publish('refill',json.dumps({'type':'ratio','value':0}))
  357. except Exception as ex:
  358. print(ex)
  359. finally:
  360. stime.sleep(10)
  361. def log_time(self,day_stamp, time_sec, start_time):
  362. d = time.asctime(time.localtime(day_stamp))
  363. a = time.asctime(time.localtime(time_sec))
  364. b = time.asctime(time.localtime(start_time))
  365. print('day_stamp=', d, 'cur_time=', a, 'start_time=', b)
  366. def pub_ratio(self):
  367. import json
  368. r = None
  369. try:
  370. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  371. r = redis.Redis(connection_pool=pool)
  372. r.publish('refill',json.dumps({'type':'ratio','value':0}))
  373. except Exception as ex:
  374. print(ex)
  375. dataCenter = DataCenter()