DataCenter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. import time as time
  14. class DataCenter(object):
  15. pos_map = {
  16. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  17. }
  18. def __init__(self):
  19. self._mquit = False
  20. self._mRHost = ''
  21. self._mRPort = 6379
  22. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  23. def set_redis(self, rhost, rport):
  24. self._mRHost = rhost
  25. self._mRPort = rport
  26. def stop(self):
  27. self._mquit = True
  28. pass
  29. def prepare_data(self):
  30. while self._mquit == False:
  31. try:
  32. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  33. r = redis.Redis(connection_pool=pool)
  34. if path.exists(self._file_name):
  35. hfive = h5py.File(self._file_name, 'a')
  36. else:
  37. hfive = h5py.File(self._file_name, 'w')
  38. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  39. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  40. hfive.close()
  41. self.del_redis(r, 'nc_channel_monitor_commit')
  42. self.del_redis(r, 'nc_channel_monitor_notify')
  43. except Exception as ex:
  44. print(ex)
  45. finally:
  46. stime.sleep(1)
  47. def del_redis(self, redis, name):
  48. latest_time = int(stime.time()) - 2
  49. for item in redis.hscan_iter(name):
  50. key = str(item[0], encoding="utf-8")
  51. items = re.split(r'-', key)
  52. fDel = True
  53. if len(items) == 6:
  54. (stype, chname, quality, card_type, amount, time) = items
  55. time = int(time)
  56. if latest_time <= time:
  57. fDel = False
  58. if fDel:
  59. redis.hdel(name, key)
  60. pass
  61. def read_redis(self, hfive, redis, name, prefix):
  62. i = 0
  63. for item in redis.hscan_iter(name):
  64. key = str(item[0], encoding="utf-8")
  65. val = str(item[1], encoding="utf-8")
  66. print(f'{prefix}:{i}')
  67. i += 1
  68. self.parase(hfive, key, val, prefix)
  69. def parase(self, hfive, text, val, prefix):
  70. items = re.split(r'-', text)
  71. if len(items) != 6:
  72. return False
  73. (stype, chname, quality, card_type, amount, time) = items
  74. if stype == 'succ':
  75. pos = self.pos_map[f'{prefix}-succ']
  76. elif stype == 'fail':
  77. pos = self.pos_map[f'{prefix}-fail']
  78. else:
  79. return False
  80. time = int(time)
  81. today = self.day_stamp(time)
  82. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  83. if path not in hfive:
  84. hfive[path] = np.zeros((5, 86400))
  85. diff = time - today
  86. hfive[path][pos, diff] = val
  87. pass
  88. def day_stamp(self, stamp):
  89. stamp = int(stamp)
  90. x = stime.gmtime(stamp + 8 * 3600)
  91. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  92. today = stamp - diff.total_seconds()
  93. return int(today)
  94. def _days(self, root):
  95. result = []
  96. try:
  97. for name, sub in root.items():
  98. if isinstance(sub, h5py.Group):
  99. result.append(name)
  100. except Exception as ex:
  101. print(ex)
  102. finally:
  103. return result
  104. def days(self):
  105. try:
  106. hfive = h5py.File(self._file_name, 'r')
  107. root = hfive.require_group('/')
  108. days = self._days(root)
  109. hfive.close()
  110. return days
  111. except Exception as ex:
  112. print(ex)
  113. return []
  114. def paths(self, time_stamp):
  115. try:
  116. day_stamp = self.day_stamp(time_stamp)
  117. hfive = h5py.File(self._file_name, 'r')
  118. group = hfive.require_group(f'/{day_stamp}')
  119. paths = self.dir(group)
  120. hfive.close()
  121. return paths
  122. except Exception as ex:
  123. print(ex)
  124. return []
  125. def dir(self, group):
  126. result = []
  127. for name, sub in group.items():
  128. if isinstance(sub, h5py.Group):
  129. result.extend(self.dir(sub))
  130. else:
  131. result.append(sub.name)
  132. return result
  133. def draw_plot(self, start_time, interval=300, **kwargs):
  134. logger = logging.getLogger('app')
  135. hfive = h5py.File(self._file_name, 'r')
  136. try:
  137. day_stamp = self.day_stamp(start_time)
  138. start_pos = start_time - day_stamp
  139. cur_day = self.day_stamp(stime.time())
  140. if day_stamp == cur_day:
  141. end_pos = int(stime.time()) - day_stamp
  142. else:
  143. end_pos = -1
  144. fig = Figure(figsize=(16, 8))
  145. ax = fig.subplots()
  146. x = np.arange(0, 86400, interval)
  147. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  148. sub_count = 0
  149. predata = np.zeros((5, 86400))
  150. for path, data in self.read_data(hfive, paths):
  151. data = np.array(data)
  152. predata = predata + data
  153. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  154. if ret:
  155. sub_count += 1
  156. if sub_count > 1:
  157. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  158. ax.legend()
  159. ax.grid()
  160. ax.set_title('success ratio')
  161. ax.set(xlabel='time', ylabel='ratio')
  162. fig.autofmt_xdate()
  163. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  164. buf = BytesIO()
  165. fig.savefig(buf, format="png")
  166. return buf
  167. except Exception as ex:
  168. print(ex)
  169. finally:
  170. hfive.close()
  171. def read_data(self, hfive, paths):
  172. for path in paths:
  173. yield path, hfive[path]
  174. def datasets(self, hfive, start_time, **kwargs):
  175. logger = logging.getLogger('app')
  176. day_stamp = self.day_stamp(start_time)
  177. sday = f'{day_stamp}'
  178. root = hfive.require_group('/')
  179. days = self._days(root)
  180. if sday not in days:
  181. return '', []
  182. group = hfive.require_group(sday)
  183. dsets = self.dir(group)
  184. chname = quality = card_type = amount = None
  185. for key, val in kwargs.items():
  186. if val is None:
  187. continue
  188. elif key == 'chname':
  189. chname = val
  190. elif key == 'quality':
  191. quality = f'{val}'
  192. elif key == 'card_type':
  193. card_type = f'{val}'
  194. elif key == 'amount':
  195. amount = f'{val}'
  196. else:
  197. continue
  198. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  199. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  200. filer_text = ''
  201. if chname is not None:
  202. filer_text = chname
  203. if quality is not None:
  204. filer_text = filer_text + f"-qua:{quality}"
  205. if card_type is not None:
  206. filer_text = filer_text + f"-type:{card_type}"
  207. if amount is not None:
  208. filer_text = filer_text + f"-amount:{amount}"
  209. paths = []
  210. for text in dsets:
  211. items = re.split(r'/', text)
  212. if len(items) != 6:
  213. return False
  214. (_, _sday, _chname, _quality, _card_type, _amount) = items
  215. if (chname is not None) and (_chname != chname):
  216. continue
  217. if (quality is not None) and (_quality != quality):
  218. continue
  219. if (card_type is not None) and (_card_type != card_type):
  220. continue
  221. if (amount is not None) and (_amount != amount):
  222. continue
  223. paths.append(text)
  224. return filer_text, paths
  225. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  226. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  227. logging.getLogger('app').debug("path=%s", path)
  228. all = data[2] + data[3]
  229. all = all.reshape((-1, interval))
  230. all = np.sum(all, axis=1)
  231. ySucc = data[2]
  232. ySucc = ySucc.reshape((-1, interval))
  233. ySucc = np.sum(ySucc, axis=1)
  234. if end_pos == -1:
  235. pos = np.where(x >= start_pos)
  236. x = x[pos]
  237. ySucc = ySucc[pos]
  238. all = all[pos]
  239. else:
  240. pos = np.where(start_pos <= x)
  241. x = x[pos]
  242. ySucc = ySucc[pos]
  243. all = all[pos]
  244. pos = np.where(x < end_pos)
  245. x = x[pos]
  246. ySucc = ySucc[pos]
  247. all = all[pos]
  248. succ_count = int(np.sum(ySucc))
  249. all_count = int(np.sum(all))
  250. opened = np.where(ySucc > 0.1)
  251. if len(opened[0]) == 0:
  252. logging.getLogger('app').debug("path=%s,opened=False", path)
  253. return False
  254. ySucc = ySucc / all
  255. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  256. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  257. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  258. return True
  259. def _label(self, path, count, all):
  260. ratio = 0.00
  261. if all > 0:
  262. ratio = round(count * 100 / all, 2)
  263. items = re.split(r'/', path)
  264. if len(items) == 6:
  265. (_, _sday, _chname, _quality, _card_type, _amount) = items
  266. card_type = ''
  267. if _card_type == '1':
  268. card_type = 'SY'
  269. elif _card_type == '2':
  270. card_type = 'SH'
  271. elif _card_type == '4':
  272. card_type = 'YD'
  273. elif _card_type == '5':
  274. card_type = 'LT'
  275. elif _card_type == '6':
  276. card_type = 'DX'
  277. elif _card_type == '7':
  278. card_type = 'TH'
  279. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  280. else:
  281. if path == '' or path is None:
  282. path = 'average'
  283. return f"{path}:{count}/{all} = {ratio}%"
  284. def _cur_min(self):
  285. time_sec = int(time.time())
  286. cur_min = time_sec - time_sec % 60
  287. return cur_min
  288. def _calc_tuple(self, data, start_pos, end_pos):
  289. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  290. logging.getLogger('app').debug("path=%s", path)
  291. cur_data = data[:, start_pos:end_pos]
  292. all = np.sum(cur_data, axis=1)
  293. all = all.tolist()
  294. result = [int(x) for x in all]
  295. return result[:4]
  296. def _repath(self, path):
  297. items = re.split(r'/', path)
  298. if len(items) == 6:
  299. (_, _sday, _chname, _quality, _card_type, _amount) = items
  300. return f"{_chname}-{_amount}-{_card_type}-{_quality}"
  301. else:
  302. return False
  303. def _calc_ratio(self, start_time, end_time):
  304. hfive = None
  305. try:
  306. hfive = h5py.File(self._file_name, 'r')
  307. day_stamp = self.day_stamp(start_time)
  308. start_pos = start_time - day_stamp
  309. end_pos = end_time - day_stamp
  310. result = {'time': end_time}
  311. ratios = {}
  312. filer_text, paths = self.datasets(hfive, start_time)
  313. if len(paths) == 0:
  314. return None
  315. for path, data in self.read_data(hfive, paths):
  316. data = np.array(data)
  317. ratio = self._calc_tuple(data, start_pos, end_pos)
  318. key = self._repath(path)
  319. ratios[key] = ratio
  320. print(path, ratio)
  321. result['ratios'] = ratios
  322. return result
  323. except Exception as ex:
  324. print(ex)
  325. finally:
  326. hfive.close()
  327. pass
  328. def calc_ratio(self):
  329. import json
  330. r = None
  331. try:
  332. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  333. r = redis.Redis(connection_pool=pool)
  334. except Exception as ex:
  335. print(ex)
  336. while True:
  337. try:
  338. time_sec = int(time.time())
  339. day_stamp = self.day_stamp(time_sec)
  340. start_time = 0
  341. if time_sec > day_stamp + 901:
  342. start_time = time_sec - 901
  343. elif time_sec > day_stamp + 301:
  344. start_time = day_stamp
  345. else:
  346. pass
  347. self.log_time(day_stamp,time_sec,start_time)
  348. if start_time >= day_stamp:
  349. ratios = self._calc_ratio(start_time=start_time, end_time=time_sec - 1)
  350. print('ratios=',ratios)
  351. if ratios != None:
  352. r.set(f"nc_channel_ratios", json.dumps(ratios))
  353. r.publish('refill',json.dumps({'type':'ratio','value':0}))
  354. except Exception as ex:
  355. print(ex)
  356. finally:
  357. stime.sleep(1)
  358. def log_time(self,day_stamp, time_sec, start_time):
  359. d = time.asctime(time.localtime(day_stamp))
  360. a = time.asctime(time.localtime(time_sec))
  361. b = time.asctime(time.localtime(start_time))
  362. print('day_stamp=', d, 'cur_time=', a, 'start_time=', b)
  363. def pub_ratio(self):
  364. import json
  365. r = None
  366. try:
  367. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  368. r = redis.Redis(connection_pool=pool)
  369. r.publish('refill',json.dumps({'type':'ratio','value':0}))
  370. except Exception as ex:
  371. print(ex)
  372. dataCenter = DataCenter()