stanley-king 3 年之前
父節點
當前提交
a395986015
共有 3 個文件被更改,包括 138 次插入4 次删除
  1. 76 0
      plot/Fetcher.py
  2. 51 0
      plot/UserFetcher.py
  3. 11 4
      plot/mchreader.py

+ 76 - 0
plot/Fetcher.py

@@ -0,0 +1,76 @@
+import time as stime
+import redis
+import h5py
+from os import path
+from datetime import timedelta
+
+class Fetcher(object):
+    latest_delta = 2
+
+    def __init__(self, file_name):
+        self._mquit = False
+        self._mRHost = ''
+        self._mRPort = 6379
+        self._file_name = file_name
+
+    def set_redis(self, rhost, rport):
+        self._mRHost = rhost
+        self._mRPort = rport
+
+    def stop(self):
+        self._mquit = True
+        pass
+
+    def redis_name_prefix(self) -> dict:
+        pass
+
+    def run(self):
+        while self._mquit == False:
+            fAllEmpty = True
+            try:
+                pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
+                r = redis.Redis(connection_pool=pool)
+
+                if path.exists(self._file_name):
+                    hfive = h5py.File(self._file_name, 'a')
+                else:
+                    hfive = h5py.File(self._file_name, 'w')
+
+                latest_time = int(stime.time()) - self.latest_delta
+                name_prefix = self.redis_name_prefix()
+
+                for name, prefix in name_prefix.items():
+                    fEmpty = self.fetch_hset(hfive, r, name, prefix, latest_time)
+                    if fEmpty == False:
+                        fAllEmpty = False
+                hfive.close()
+            except Exception as ex:
+                print(ex)
+            finally:
+                if fAllEmpty:
+                    stime.sleep(1)
+
+    def parase(self, hfive, key, val, prefix, latest_time):
+        pass
+
+    def fetch_hset(self, hfive, redis, name, prefix, latest_time):
+        i = 0
+        fEmpty = True
+        for item in redis.hscan_iter(name):
+            fEmpty = False
+            key = str(item[0], encoding="utf-8")
+            val = str(item[1], encoding="utf-8")
+            print(f'{prefix}:{i}')
+            i += 1
+            fDel = self.parase(hfive, key, val, prefix, latest_time)
+            if fDel:
+                redis.hdel(name, key)
+
+        return fEmpty
+
+    def day_stamp(self, stamp):
+        stamp = int(stamp)
+        x = stime.gmtime(stamp + 8 * 3600)
+        diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
+        today = stamp - diff.total_seconds()
+        return int(today)

+ 51 - 0
plot/UserFetcher.py

@@ -0,0 +1,51 @@
+
+import Fetcher
+import re
+import numpy as np
+
+class UserFetcher(Fetcher):
+    pos_map = {
+        'commit': 0, 'success': 1, 'fail': 2 #0纬放所有数据,1纬放成功,2纬放失败
+    }
+
+    name_prefix = {
+        'nc_user_monitor_commit': 'commit',
+        'nc_user_monitor_success': 'success',
+        'nc_user_monitor_fail': 'fail'
+    }
+
+    def __init__(self):
+        filename = '/var/www/html/data/stdata/user.hdf5'
+        super(Fetcher, self).__init__(filename)
+        pass
+
+    def redis_name_prefix(self) -> dict:
+        return self.name_prefix
+
+    def parase(self, hfive, key, val, prefix,latest_time):
+        items = re.split(r'-', key)
+        if len(items) != 5:
+            return True
+
+        (mchid, quality, card_type, amount, time) = items
+        pos = self.pos_map[f'{prefix}']
+
+        time = int(time)
+        today = self.day_stamp(time)
+        path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
+        if path not in hfive:
+            dim = len(self.pos_map)
+            hfive[path] = np.zeros((dim, 86400))
+
+        diff = time - today
+        if diff < 0:
+            print(diff)
+        hfive[path][pos, diff] = val
+        print(path, pos, diff, val, hfive[path][pos, diff])
+
+        if time > latest_time:
+            return False
+        else:
+            return True
+        pass
+    pass

+ 11 - 4
plot/mchreader.py

@@ -1,7 +1,8 @@
 import signal as sig
 import sys,getopt
 
-from MchDataCenter import mchDataCenter
+# from MchDataCenter import mchDataCenter
+from UserFetcher import UserFetcher
 
 if __name__ == '__main__':
     try:
@@ -19,6 +20,12 @@ if __name__ == '__main__':
             rport = int(val)
         else:
             print("Err argv")
-    mchDataCenter.set_redis(rhost,rport)
-    sig.signal(sig.SIGINT, lambda: mchDataCenter.stop())
-    mchDataCenter.prepare_data()
+            
+    # mchDataCenter.set_redis(rhost,rport)
+    # sig.signal(sig.SIGINT, lambda: mchDataCenter.stop())
+    # mchDataCenter.prepare_data()
+
+    fetcher  = UserFetcher()
+    sig.signal(sig.SIGINT, lambda: fetcher.stop())
+    fetcher.run()
+