DataCenter.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. from threading import Thread
  2. import os
  3. import time as stime
  4. from Singleton import Singleton
  5. import redis
  6. import h5py
  7. from os import path
  8. import re
  9. from datetime import timedelta
  10. import numpy as np
  11. from matplotlib.figure import Figure
  12. from matplotlib import ticker
  13. from io import BytesIO
  14. import logging
  15. class DataCenter(object):
  16. pos_map = {
  17. 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  18. }
  19. def __init__(self):
  20. self._mquit = False
  21. self._mRHost = ''
  22. self._mRPort = 6379
  23. self._file_name = '/var/www/html/data/stdata/data.hdf5'
  24. def set_redis(self, rhost, rport):
  25. self._mRHost = rhost
  26. self._mRPort = rport
  27. def stop(self):
  28. self._mquit = True
  29. pass
  30. def prepare_data(self):
  31. while True:
  32. try:
  33. # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0)
  34. pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
  35. r = redis.Redis(connection_pool=pool)
  36. if path.exists(self._file_name):
  37. hfive = h5py.File(self._file_name, 'a')
  38. else:
  39. hfive = h5py.File(self._file_name, 'w')
  40. self.read_redis(hfive, r, 'nc_channel_monitor_commit', 'commit')
  41. self.read_redis(hfive, r, 'nc_channel_monitor_notify', 'notify')
  42. hfive.close()
  43. self.del_redis(r, 'nc_channel_monitor_commit')
  44. self.del_redis(r, 'nc_channel_monitor_notify')
  45. except Exception as ex:
  46. print(ex)
  47. finally:
  48. for i in range(60):
  49. if self._mquit == True:
  50. break
  51. else:
  52. stime.sleep(1)
  53. def del_redis(self, redis, name):
  54. latest_time = int(stime.time()) - 300
  55. for item in redis.hscan_iter(name):
  56. key = str(item[0], encoding="utf-8")
  57. items = re.split(r'-', key)
  58. fdel = True
  59. if len(items) == 6:
  60. (stype, chname, quality, card_type, amount, time) = items
  61. time = int(time)
  62. if latest_time <= time:
  63. fdel = False
  64. if fdel:
  65. redis.hdel(name, key)
  66. pass
  67. def read_redis(self, hfive, redis, name, prefix):
  68. i = 0
  69. for item in redis.hscan_iter(name):
  70. key = str(item[0], encoding="utf-8")
  71. val = str(item[1], encoding="utf-8")
  72. print(f'{prefix}:{i}')
  73. i += 1
  74. self.parase(hfive, key, val, prefix)
  75. def parase(self, hfive, text, val, prefix):
  76. items = re.split(r'-', text)
  77. if len(items) != 6:
  78. return False
  79. (stype, chname, quality, card_type, amount, time) = items
  80. if stype == 'succ':
  81. pos = self.pos_map[f'{prefix}-succ']
  82. elif stype == 'fail':
  83. pos = self.pos_map[f'{prefix}-fail']
  84. else:
  85. return False
  86. time = int(time)
  87. today = self.day_stamp(time)
  88. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  89. if path not in hfive:
  90. hfive[path] = np.zeros((5, 86400))
  91. diff = time - today
  92. if diff < 0:
  93. print(diff)
  94. hfive[path][pos, diff] = val
  95. print(path, pos, diff, val, hfive[path][pos, diff])
  96. pass
  97. def day_stamp(self, stamp):
  98. stamp = int(stamp)
  99. x = stime.gmtime(stamp + 8 * 3600)
  100. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  101. today = stamp - diff.total_seconds()
  102. return int(today)
  103. def _days(self, root):
  104. result = []
  105. try:
  106. for name, sub in root.items():
  107. if isinstance(sub, h5py.Group):
  108. result.append(name)
  109. except Exception as ex:
  110. print(ex)
  111. finally:
  112. return result
  113. def days(self):
  114. try:
  115. hfive = h5py.File(self._file_name, 'r')
  116. root = hfive.require_group('/')
  117. days = self._days(root)
  118. hfive.close()
  119. return days
  120. except Exception as ex:
  121. print(ex)
  122. return []
  123. def paths(self,time_stamp):
  124. try:
  125. day_stamp = self.day_stamp(time_stamp)
  126. hfive = h5py.File(self._file_name, 'r')
  127. group = hfive.require_group(f'/{day_stamp}')
  128. paths = self.dir(group)
  129. hfive.close()
  130. return paths
  131. except Exception as ex:
  132. print(ex)
  133. return []
  134. def dir(self, group):
  135. result = []
  136. for name, sub in group.items():
  137. if isinstance(sub, h5py.Group):
  138. result.extend(self.dir(sub))
  139. else:
  140. result.append(sub.name)
  141. return result
  142. def draw_plot(self, start_time, interval=300, **kwargs):
  143. logger = logging.getLogger('app')
  144. hfive = h5py.File(self._file_name, 'r')
  145. try:
  146. filer_text, paths = self.datasets(hfive, start_time, **kwargs)
  147. day_stamp = self.day_stamp(start_time)
  148. start_pos = start_time - day_stamp
  149. cur_day = self.day_stamp(stime.time())
  150. if day_stamp == cur_day:
  151. end_pos = int(stime.time()) - day_stamp
  152. else:
  153. end_pos = -1
  154. fig = Figure(figsize=(16, 8))
  155. ax = fig.subplots()
  156. predata = np.zeros((5, 86400))
  157. x = np.arange(0, 86400, interval)
  158. for path, data in self.read_data(hfive, paths):
  159. data = np.array(data)
  160. predata = predata + data
  161. self._draw_plot(ax, x, day_stamp, start_pos,end_pos, data, interval, path)
  162. logger.info("path=%s", path)
  163. self._draw_plot(ax, x, day_stamp, start_pos,end_pos, predata, interval, filer_text)
  164. ax.legend()
  165. ax.grid()
  166. ax.set_title('success ratio')
  167. ax.set(xlabel='time', ylabel='ratio')
  168. fig.autofmt_xdate()
  169. fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
  170. buf = BytesIO()
  171. fig.savefig(buf, format="png")
  172. return buf
  173. except Exception as ex:
  174. print(ex)
  175. finally:
  176. hfive.close()
  177. def read_data(self, hfive, paths):
  178. for path in paths:
  179. yield path, hfive[path]
  180. def datasets(self, hfive, start_time, **kwargs):
  181. logger = logging.getLogger('app')
  182. day_stamp = self.day_stamp(start_time)
  183. sday = f'{day_stamp}'
  184. root = hfive.require_group('/')
  185. days = self._days(root)
  186. if sday not in days:
  187. return False
  188. group = hfive.require_group(sday)
  189. dsets = self.dir(group)
  190. chname = quality = card_type = amount = None
  191. for key, val in kwargs.items():
  192. if val is None:
  193. continue
  194. if key == 'chname':
  195. chname = val
  196. elif key == 'quality':
  197. quality = f'{val}'
  198. elif key == 'card_type':
  199. card_type = f'{val}'
  200. elif key == 'amount':
  201. amount = f'{val}'
  202. else:
  203. continue
  204. return self._filter(dsets, chname=chname, quality=quality, card_type=card_type, amount=amount)
  205. def _filter(self, dsets, chname=None, quality=None, card_type=None, amount=None):
  206. filer_text = ''
  207. if chname is not None:
  208. filer_text = chname
  209. if quality is not None:
  210. filer_text = filer_text + f"-qua:{quality}"
  211. if card_type is not None:
  212. filer_text = filer_text + f"-type:{card_type}"
  213. if amount is not None:
  214. filer_text = filer_text + f"-amount:{amount}"
  215. paths = []
  216. for text in dsets:
  217. items = re.split(r'/', text)
  218. if len(items) != 6:
  219. return False
  220. (_, _sday, _chname, _quality, _card_type, _amount) = items
  221. if (chname is not None) and (_chname != chname):
  222. continue
  223. if (quality is not None) and (_quality != quality):
  224. continue
  225. if (card_type is not None) and (_card_type != card_type):
  226. continue
  227. if (amount is not None) and (_amount != amount):
  228. continue
  229. paths.append(text)
  230. return filer_text, paths
  231. def _draw_plot(self, ax, x, day_stamp, start_pos,end_pos, data, interval=300, path=''):
  232. import matplotlib.dates as mdate
  233. # 'commit-succ': 0, 'commit-fail': 1, 'notify-succ': 2, 'notify-fail': 3, 'user_succ': 4
  234. all = data[2] + data[3]
  235. all = all.reshape((-1, interval))
  236. all = np.sum(all, axis=1)
  237. ySucc = data[2]
  238. ySucc = ySucc.reshape((-1, interval))
  239. ySucc = np.sum(ySucc, axis=1)
  240. ySucc = ySucc / (all + 0.00000001)
  241. if end_pos == -1:
  242. pos = np.where(x >= start_pos)
  243. x = x[pos]
  244. ySucc = ySucc[pos]
  245. else:
  246. pos = np.where(start_pos <= x)
  247. x = x[pos]
  248. ySucc = ySucc[pos]
  249. pos = np.where(x < end_pos)
  250. x = x[pos]
  251. ySucc = ySucc[pos]
  252. xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
  253. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
  254. ax.plot(xs, ySucc, ls='--', marker='o', label=path)
  255. dataCenter = DataCenter()