DataCenter.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from threading import Thread
  2. import os
  3. import time
  4. from Singleton import Singleton
  5. import redis
  6. import h5py
  7. from os import path
  8. import re
  9. from datetime import timedelta
  10. import numpy as np
  11. class DataCenter(object):
  12. pos_map = {
  13. 'commit-succ': 0, 'commit-fail': 1, 'notify_succ': 2, 'notify-fail': 3, 'user_succ': 4
  14. }
  15. def __init__(self):
  16. self._mquit = False
  17. file_name = '/var/www/html/data/stdata/data.hdf5'
  18. if path.exists(file_name):
  19. self._mHfive = h5py.File(file_name, 'r+')
  20. else:
  21. self._mHfive = h5py.File(file_name, 'a')
  22. def __del__(self):
  23. self._mHfive.close()
  24. def start(self):
  25. self._mTrdReader = Thread(target=DataCenter.prepare_data, args=(self,))
  26. self._mTrdReader.start()
  27. self._mTrdReader.join()
  28. pass
  29. def stop(self):
  30. self._mquit = True
  31. pass
  32. def wait(self):
  33. self._mTrdReader.join()
  34. def prepare_data(self):
  35. try:
  36. # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0)
  37. pool = redis.ConnectionPool(host='172.26.105.125', port=6379, db=0)
  38. r = redis.Redis(connection_pool=pool)
  39. self.read_redis(r, 'nc_channel_monitor_commit', 'commit')
  40. self.read_redis(r, 'nc_channel_monitor_notify', 'notify')
  41. except Exception as ex:
  42. print(ex)
  43. # while not self._mquit:
  44. # try:
  45. # # pool = redis.ConnectionPool(host='121.89.223.81', port=57649, db=0)
  46. # pool = redis.ConnectionPool(host='172.26.105.125', port=6379, db=0)
  47. # r = redis.Redis(connection_pool=pool)
  48. # self.read_redis(r, 'nc_channel_monitor_commit', 'commit')
  49. # self.read_redis(r, 'nc_channel_monitor_notify', 'notify')
  50. # break
  51. # except Exception as ex:
  52. # print(ex)
  53. # finally:
  54. # time.sleep(60)
  55. pass
  56. def read_redis(self, redis, name, prefix):
  57. i = 0
  58. for item in redis.hscan_iter(name):
  59. key = str(item[0], encoding="utf-8")
  60. val = str(item[1], encoding="utf-8")
  61. print(f'{prefix}:{i}')
  62. i += 1
  63. self.parase(key, val, prefix)
  64. def parase(self, text, val, prefix):
  65. items = re.split(r'-', text)
  66. if len(items) != 6:
  67. return False
  68. (stype, chname, quality, card_type, amount, time) = items
  69. if stype == 'succ':
  70. pos = self.pos_map[f'{prefix}-succ']
  71. elif stype == 'fail':
  72. pos = self.pos_map[f'{prefix}-fail']
  73. else:
  74. return False
  75. time = int(time)
  76. today = self.day_stamp(time)
  77. path = f'/{today}/{chname}/{quality}/{card_type}/{amount}'
  78. if path not in self._mHfive:
  79. self._mHfive[path] = np.zeros((5, 86400))
  80. diff = time - today
  81. if diff < 0:
  82. print(diff)
  83. self._mHfive[path][pos][diff] = int(val)
  84. print(path, pos, diff)
  85. pass
  86. def day_stamp(self, stamp):
  87. stamp = int(stamp)
  88. x = time.gmtime(stamp + 8 * 3600)
  89. diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
  90. today = stamp - diff.total_seconds()
  91. return int(today)