DataCenter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. import os
  2. import time as stime
  3. import redis
  4. import h5py
  5. from os import path
  6. import re
  7. from datetime import timedelta
  8. import numpy as np
  9. from matplotlib.figure import Figure
  10. from matplotlib import ticker
  11. from io import BytesIO
  12. import logging
  13. import time as time
  14. class DataCenter(object):
  15. latest_delta = 10
  16. pos_map = {
  17. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  18. }
  19. def __init__(self):
  20. self._mquit = False
  21. self._mRHost = ''
  22. self._mRPort = 6379
  23. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  24. def set_redis(self, rhost, rport):
  25. self._mRHost = rhost
  26. self._mRPort = rport
  27. def stop(self):
  28. self._mquit = True
  29. pass
  30. def prepare_data(self):
  31. while self._mquit == False:
  32. try:
  33. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  34. r = redis.Redis(connection_pool=pool)
  35. if path.exists(self._file_name):
  36. hfive = h5py.File(self._file_name, 'a')
  37. else:
  38. hfive = h5py.File(self._file_name, 'w')
  39. latest_time = int(stime.time()) - self.latest_delta
  40. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  41. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  42. hfive.close()
  43. self.del_redis(r, 'nc_channel_monitor_commit', latest_time)
  44. self.del_redis(r, 'nc_channel_monitor_notify', latest_time)
  45. except Exception as ex:
  46. print(ex)
  47. finally:
  48. stime.sleep(1)
  49. def del_redis(self, redis, name,latest_time):
  50. for item in redis.hscan_iter(name):
  51. key = str(item[0], encoding="utf-8")
  52. items = re.split(r'-', key)
  53. fDel = True
  54. if len(items) == 6:
  55. (stype, chname, quality, card_type, amount, time) = items
  56. time = int(time)
  57. if latest_time <= time:
  58. fDel = False
  59. if fDel:
  60. redis.hdel(name, key)
  61. pass
  62. def read_redis(self, hfive, redis, name, prefix):
  63. i = 0
  64. for item in redis.hscan_iter(name):
  65. key = str(item[0], encoding="utf-8")
  66. val = str(item[1], encoding="utf-8")
  67. print(f'{prefix}:{i}')
  68. i += 1
  69. self.parase(hfive, key, val, prefix)
  70. def parase(self, hfive, text, val, prefix):
  71. items = re.split(r'-', text)
  72. if len(items) != 6:
  73. return False
  74. (stype, chname, quality, card_type, amount, time) = items
  75. if stype == 'succ':
  76. pos = self.pos_map[f'{prefix}-succ']
  77. elif stype == 'fail':
  78. pos = self.pos_map[f'{prefix}-fail']
  79. else:
  80. return False
  81. time = int(time)
  82. today = self.day_stamp(time)
  83. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  84. if path not in hfive:
  85. hfive[path] = np.zeros((5, 86400))
  86. diff = time - today
  87. hfive[path][pos, diff] = val
  88. pass
  89. def day_stamp(self, stamp):
  90. stamp = int(stamp)
  91. x = stime.gmtime(stamp + 8 * 3600)
  92. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  93. today = stamp - diff.total_seconds()
  94. return int(today)
  95. def _days(self, root):
  96. result = []
  97. try:
  98. for name, sub in root.items():
  99. if isinstance(sub, h5py.Group):
  100. result.append(name)
  101. except Exception as ex:
  102. print(ex)
  103. finally:
  104. return result
  105. def days(self):
  106. try:
  107. hfive = h5py.File(self._file_name, 'r')
  108. root = hfive.require_group('/')
  109. days = self._days(root)
  110. hfive.close()
  111. return days
  112. except Exception as ex:
  113. print(ex)
  114. return []
  115. def paths(self, time_stamp):
  116. try:
  117. day_stamp = self.day_stamp(time_stamp)
  118. hfive = h5py.File(self._file_name, 'r')
  119. group = hfive.require_group(f'/{day_stamp}')
  120. paths = self.dir(group)
  121. hfive.close()
  122. return paths
  123. except Exception as ex:
  124. print(ex)
  125. return []
  126. def dir(self, group):
  127. result = []
  128. for name, sub in group.items():
  129. if isinstance(sub, h5py.Group):
  130. result.extend(self.dir(sub))
  131. else:
  132. result.append(sub.name)
  133. return result
  134. def draw_plot(self, start_time, interval=300, **kwargs):
  135. logger = logging.getLogger('app')
  136. hfive = h5py.File(self._file_name, 'r')
  137. try:
  138. day_stamp = self.day_stamp(start_time)
  139. start_pos = start_time - day_stamp
  140. cur_day = self.day_stamp(stime.time())
  141. if day_stamp == cur_day:
  142. end_pos = int(stime.time()) - day_stamp
  143. else:
  144. end_pos = -1
  145. fig = Figure(figsize=(16, 8))
  146. ax = fig.subplots()
  147. x = np.arange(0, 86400, interval)
  148. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  149. sub_count = 0
  150. predata = np.zeros((5, 86400))
  151. for path, data in self.read_data(hfive, paths):
  152. data = np.array(data)
  153. predata = predata + data
  154. ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
  155. if ret:
  156. sub_count += 1
  157. if sub_count > 1:
  158. self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
  159. ax.legend()
  160. ax.grid()
  161. ax.set_title('success ratio')
  162. ax.set(xlabel='time', ylabel='ratio')
  163. fig.autofmt_xdate()
  164. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  165. buf = BytesIO()
  166. fig.savefig(buf, format="png")
  167. return buf
  168. except Exception as ex:
  169. print(ex)
  170. finally:
  171. hfive.close()
  172. def read_data(self, hfive, paths):
  173. for path in paths:
  174. yield path, hfive[path]
  175. def datasets(self, hfive, start_time, **kwargs):
  176. logger = logging.getLogger('app')
  177. day_stamp = self.day_stamp(start_time)
  178. sday = f'{day_stamp}'
  179. root = hfive.require_group('/')
  180. days = self._days(root)
  181. if sday not in days:
  182. return '', []
  183. group = hfive.require_group(sday)
  184. dsets = self.dir(group)
  185. chname = quality = card_type = amount = None
  186. for key, val in kwargs.items():
  187. if val is None:
  188. continue
  189. elif key == 'chname':
  190. chname = val
  191. elif key == 'quality':
  192. quality = f'{val}'
  193. elif key == 'card_type':
  194. card_type = f'{val}'
  195. elif key == 'amount':
  196. amount = f'{val}'
  197. else:
  198. continue
  199. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  200. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  201. filer_text = ''
  202. if chname is not None:
  203. filer_text = chname
  204. if quality is not None:
  205. filer_text = filer_text + f"-qua:{quality}"
  206. if card_type is not None:
  207. filer_text = filer_text + f"-type:{card_type}"
  208. if amount is not None:
  209. filer_text = filer_text + f"-amount:{amount}"
  210. paths = []
  211. for text in dsets:
  212. items = re.split(r'/', text)
  213. if len(items) != 6:
  214. return False
  215. (_, _sday, _chname, _quality, _card_type, _amount) = items
  216. if (chname is not None) and (_chname != chname):
  217. continue
  218. if (quality is not None) and (_quality != quality):
  219. continue
  220. if (card_type is not None) and (_card_type != card_type):
  221. continue
  222. if (amount is not None) and (_amount != amount):
  223. continue
  224. paths.append(text)
  225. return filer_text, paths
  226. def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
  227. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  228. logging.getLogger('app').debug("path=%s", path)
  229. all = data[2] + data[3]
  230. all = all.reshape((-1, interval))
  231. all = np.sum(all, axis=1)
  232. ySucc = data[2]
  233. ySucc = ySucc.reshape((-1, interval))
  234. ySucc = np.sum(ySucc, axis=1)
  235. if end_pos == -1:
  236. pos = np.where(x >= start_pos)
  237. x = x[pos]
  238. ySucc = ySucc[pos]
  239. all = all[pos]
  240. else:
  241. pos = np.where(start_pos <= x)
  242. x = x[pos]
  243. ySucc = ySucc[pos]
  244. all = all[pos]
  245. pos = np.where(x < end_pos)
  246. x = x[pos]
  247. ySucc = ySucc[pos]
  248. all = all[pos]
  249. succ_count = int(np.sum(ySucc))
  250. all_count = int(np.sum(all))
  251. opened = np.where(ySucc > 0.1)
  252. if len(opened[0]) == 0:
  253. logging.getLogger('app').debug("path=%s,opened=False", path)
  254. return False
  255. ySucc = ySucc / all
  256. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  257. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  258. ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
  259. return True
  260. def _label(self, path, count, all):
  261. ratio = 0.00
  262. if all > 0:
  263. ratio = round(count * 100 / all, 2)
  264. items = re.split(r'/', path)
  265. if len(items) == 6:
  266. (_, _sday, _chname, _quality, _card_type, _amount) = items
  267. card_type = ''
  268. if _card_type == '1':
  269. card_type = 'SY'
  270. elif _card_type == '2':
  271. card_type = 'SH'
  272. elif _card_type == '4':
  273. card_type = 'YD'
  274. elif _card_type == '5':
  275. card_type = 'LT'
  276. elif _card_type == '6':
  277. card_type = 'DX'
  278. elif _card_type == '7':
  279. card_type = 'TH'
  280. return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
  281. else:
  282. if path == '' or path is None:
  283. path = 'average'
  284. return f"{path}:{count}/{all} = {ratio}%"
  285. def _cur_min(self):
  286. time_sec = int(time.time())
  287. cur_min = time_sec - time_sec % 60
  288. return cur_min
  289. def _calc_tuple(self, data, start_pos, end_pos):
  290. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  291. logging.getLogger('app').debug("path=%s", path)
  292. cur_data = data[:, start_pos:end_pos]
  293. all = np.sum(cur_data, axis=1)
  294. all = all.tolist()
  295. result = [int(x) for x in all]
  296. return result[:4]
  297. def _repath(self, path):
  298. items = re.split(r'/', path)
  299. if len(items) == 6:
  300. (_, _sday, _chname, _quality, _card_type, _amount) = items
  301. return f"{_chname}-{_amount}-{_card_type}-{_quality}"
  302. else:
  303. return False
  304. def _calc_ratio(self, start_time, end_time):
  305. hfive = None
  306. try:
  307. hfive = h5py.File(self._file_name, 'r')
  308. day_stamp = self.day_stamp(start_time)
  309. start_pos = start_time - day_stamp
  310. end_pos = end_time - day_stamp
  311. result = {'time': end_time}
  312. ratios = {}
  313. filer_text, paths = self.datasets(hfive, start_time)
  314. if len(paths) == 0:
  315. return None
  316. for path, data in self.read_data(hfive, paths):
  317. data = np.array(data)
  318. ratio = self._calc_tuple(data, start_pos, end_pos)
  319. key = self._repath(path)
  320. ratios[key] = ratio
  321. print(path, ratio)
  322. result['ratios'] = ratios
  323. return result
  324. except Exception as ex:
  325. print(ex)
  326. finally:
  327. hfive.close()
  328. pass
  329. def calc_ratio(self):
  330. import json
  331. r = None
  332. try:
  333. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  334. r = redis.Redis(connection_pool=pool)
  335. except Exception as ex:
  336. print(ex)
  337. while True:
  338. try:
  339. time_sec = int(time.time())
  340. day_stamp = self.day_stamp(time_sec)
  341. start_time = 0
  342. if time_sec > day_stamp + 901:
  343. start_time = time_sec - 901
  344. elif time_sec > day_stamp + 301:
  345. start_time = day_stamp
  346. else:
  347. pass
  348. self.log_time(day_stamp,time_sec,start_time)
  349. if start_time >= day_stamp:
  350. ratios = self._calc_ratio(start_time=start_time, end_time=time_sec - 1)
  351. print('ratios=',ratios)
  352. if ratios != None:
  353. r.set(f"nc_channel_ratios", json.dumps(ratios))
  354. r.publish('refill',json.dumps({'type':'ratio','value':0}))
  355. except Exception as ex:
  356. print(ex)
  357. finally:
  358. stime.sleep(1)
  359. def log_time(self,day_stamp, time_sec, start_time):
  360. d = time.asctime(time.localtime(day_stamp))
  361. a = time.asctime(time.localtime(time_sec))
  362. b = time.asctime(time.localtime(start_time))
  363. print('day_stamp=', d, 'cur_time=', a, 'start_time=', b)
  364. def pub_ratio(self):
  365. import json
  366. r = None
  367. try:
  368. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  369. r = redis.Redis(connection_pool=pool)
  370. r.publish('refill',json.dumps({'type':'ratio','value':0}))
  371. except Exception as ex:
  372. print(ex)
  373. dataCenter = DataCenter()