Parcourir la source

add order speed big data monitor

stanley-king il y a 3 ans
Parent
commit
65742deb00
1 fichiers modifiés avec 659 ajouts et 0 suppressions
  1. 659 0
      plot/SpeedDataCenter.py

+ 659 - 0
plot/SpeedDataCenter.py

@@ -0,0 +1,659 @@
+import os
+import time as stime
+import redis
+import h5py
+from os import path
+import re
+from datetime import timedelta
+import numpy as np
+from matplotlib.figure import Figure
+from matplotlib import ticker
+from io import BytesIO
+import logging
+
+class SpeedDataCenter(object):
+    pos_map = {
+        'commit': 0, 'success': 1, 'fail': 2
+    }
+
+    def __init__(self):
+        self._mquit = False
+        self._mRHost = ''
+        self._mRPort = 6379
+        self._file_name = '/var/www/html/data/stdata/speed.hdf5'
+
+    def set_redis(self, rhost, rport):
+        self._mRHost = rhost
+        self._mRPort = rport
+
+    def stop(self):
+        self._mquit = True
+        pass
+
+    def prepare_data(self):
+        while self._mquit == False:
+            try:
+                pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
+                r = redis.Redis(connection_pool=pool)
+
+                if path.exists(self._file_name):
+                    hfive = h5py.File(self._file_name, 'a')
+                else:
+                    hfive = h5py.File(self._file_name, 'w')
+
+                self.read_redis(hfive, r, 'nc_channel_monitor_commit')
+                hfive.close()
+                self.del_redis(r, 'nc_channel_monitor_commit')
+            except Exception as ex:
+                print(ex)
+            finally:
+                for i in range(60):
+                    if self._mquit:
+                        break
+                    else:
+                        stime.sleep(1)
+
+    def del_redis(self, redis, name):
+        latest_time = int(stime.time()) - 300
+        for item in redis.hscan_iter(name):
+            key = str(item[0], encoding="utf-8")
+            items = re.split(r'-', key)
+
+            fdel = True
+            if len(items) == 5:
+                (mchid, quality, card_type, amount, time) = items
+                time = int(time)
+                if latest_time <= time:
+                    fdel = False
+
+            if fdel:
+                redis.hdel(name, key)
+        pass
+
+    def read_redis(self, hfive, redis, name, prefix):
+        i = 0
+        for item in redis.hscan_iter(name):
+            key = str(item[0], encoding="utf-8")
+            val = str(item[1], encoding="utf-8")
+            print(f'{prefix}:{i}')
+            i += 1
+            self.parase(hfive, key, val, prefix)
+
+    def parase(self, hfive, text, val, prefix):
+        items = re.split(r'-', text)
+        if len(items) != 5:
+            return False
+
+        (mchid, quality, card_type, amount, time) = items
+        pos = self.pos_map[f'{prefix}']
+
+        time = int(time)
+        today = self.day_stamp(time)
+        path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
+        if path not in hfive:
+            dim = len(self.pos_map)
+            hfive[path] = np.zeros((dim, 86400))
+
+        diff = time - today
+        if diff < 0:
+            print(diff)
+        hfive[path][pos, diff] = val
+        print(path, pos, diff, val, hfive[path][pos, diff])
+        pass
+
+    def day_stamp(self, stamp):
+        stamp = int(stamp)
+        x = stime.gmtime(stamp + 8 * 3600)
+        diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
+        today = stamp - diff.total_seconds()
+        return int(today)
+
+    def _days(self, root):
+        result = []
+        try:
+            for name, sub in root.items():
+                if isinstance(sub, h5py.Group):
+                    result.append(name)
+        except Exception as ex:
+            print(ex)
+        finally:
+            return result
+
+    def days(self):
+        try:
+            hfive = h5py.File(self._file_name, 'r')
+            root = hfive.require_group('/')
+            days = self._days(root)
+            hfive.close()
+            return days
+        except Exception as ex:
+            print(ex)
+            return []
+
+    def paths(self, time_stamp):
+        try:
+            day_stamp = self.day_stamp(time_stamp)
+            hfive = h5py.File(self._file_name, 'r')
+            group = hfive.require_group(f'/{day_stamp}')
+            paths = self.dir(group)
+            hfive.close()
+            return paths
+        except Exception as ex:
+            print(ex)
+            return []
+
+    def dir(self, group):
+        result = []
+        for name, sub in group.items():
+            if isinstance(sub, h5py.Group):
+                result.extend(self.dir(sub))
+            else:
+                result.append(sub.name)
+        return result
+
+    def _all_none(self, **kwargs):
+        for key, val in kwargs.items():
+            if val is not None:
+                return False
+        return True
+
+    def _merge_path(self,paths):
+        result = {}
+        for path in paths:
+            items = re.split(r'/', path)
+            if len(items) != 6:
+                continue
+            (_, _sday, _mchid, _quality, _card_type, _amount) = items
+
+            _mchid = int(_mchid)
+            if _mchid not in result:
+                result[_mchid] = []
+            result[_mchid].append(path)
+
+        return result
+
+    def draw_plot(self, start_time, interval=300, **kwargs):
+        logger = logging.getLogger('app')
+        hfive = h5py.File(self._file_name, 'r')
+        try:
+            day_stamp = self.day_stamp(start_time)
+            start_pos = start_time - day_stamp
+
+            cur_day = self.day_stamp(stime.time())
+            if day_stamp == cur_day:
+                end_pos = int(stime.time()) - day_stamp
+            else:
+                end_pos = -1
+
+            fig = Figure(figsize=(16, 8))
+            ax = fig.subplots()
+
+            dim = len(self.pos_map)
+            predata = np.zeros((dim, 86400))
+            x = np.arange(0, 86400, interval)
+
+            sub_count = 0
+            filer_text, paths = self.datasets(hfive, start_time, **kwargs)
+            if self._all_none(**kwargs):
+                paths = self._merge_path(paths)
+                for mchid, data in self._read_dict_data(hfive, paths):
+                    predata = predata + data
+                    path = f'{mchid}'
+                    ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
+                    if ret:
+                        sub_count += 1
+                pass
+            else:
+                for path, data in self.read_data(hfive, paths):
+                    data = np.array(data)
+                    predata = predata + data
+                    ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
+                    if ret:
+                        sub_count += 1
+            if sub_count > 1:
+                self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
+
+            ax.legend()
+            ax.grid()
+            ax.set_title('success ratio')
+            ax.set(xlabel='time', ylabel='ratio')
+            fig.autofmt_xdate()
+            fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
+
+            buf = BytesIO()
+            fig.savefig(buf, format="png")
+            return buf
+        except Exception as ex:
+            print(ex)
+        finally:
+            hfive.close()
+
+    def read_data(self, hfive, paths):
+        for path in paths:
+            yield path, hfive[path]
+
+    def _read_dict_data(self, hfive, mchPaths):
+        for mchid, paths in mchPaths.items():
+            dim = len(self.pos_map)
+            predata = np.zeros((dim, 86400))
+            for path in paths:
+                predata += hfive[path]
+            yield mchid, predata
+
+    def datasets(self, hfive, start_time, **kwargs):
+        logger = logging.getLogger('app')
+
+        day_stamp = self.day_stamp(start_time)
+        sday = f'{day_stamp}'
+        root = hfive.require_group('/')
+        days = self._days(root)
+        if sday not in days:
+            return False
+
+        group = hfive.require_group(sday)
+        dsets = self.dir(group)
+
+        mchid = quality = card_type = amount = None
+        for key, val in kwargs.items():
+            if val is None:
+                continue
+            if key == 'mchid':
+                mchid = val
+            elif key == 'quality':
+                quality = f'{val}'
+            elif key == 'card_type':
+                card_type = f'{val}'
+            elif key == 'amount':
+                amount = f'{val}'
+            else:
+                continue
+        return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
+
+    def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
+        filer_text = ''
+        if mchid is not None:
+            filer_text = mchid
+        if quality is not None:
+            filer_text = filer_text + f"-qua:{quality}"
+        if card_type is not None:
+            filer_text = filer_text + f"-type:{card_type}"
+        if amount is not None:
+            filer_text = filer_text + f"-amount:{amount}"
+
+        paths = []
+        for text in dsets:
+            items = re.split(r'/', text)
+            if len(items) != 6:
+                return False
+            (_, _sday, _mchid, _quality, _card_type, _amount) = items
+            if (mchid is not None) and (_mchid != mchid):
+                continue
+            if (quality is not None) and (_quality != quality):
+                continue
+            if (card_type is not None) and (_card_type != card_type):
+                continue
+            if (amount is not None) and (_amount != amount):
+                continue
+            paths.append(text)
+
+        return filer_text, paths
+
+    def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
+        # 'commit-succ': 0, 'notify-succ': 1, 'notify-fail': 2
+        logging.getLogger('app').debug("path=%s", path)
+
+        all = data[1] + data[2]
+        all = all.reshape((-1, interval))
+        all = np.sum(all, axis=1)
+
+        ySucc = data[1]
+        ySucc = ySucc.reshape((-1, interval))
+        ySucc = np.sum(ySucc, axis=1)
+
+        if end_pos == -1:
+            pos = np.where(x >= start_pos)
+            x = x[pos]
+            ySucc = ySucc[pos]
+            all = all[pos]
+        else:
+            pos = np.where(start_pos <= x)
+            x = x[pos]
+            ySucc = ySucc[pos]
+            all = all[pos]
+
+            pos = np.where(x < end_pos)
+            x = x[pos]
+            ySucc = ySucc[pos]
+            all = all[pos]
+
+        succ_count = int(np.sum(ySucc))
+        all_count = int(np.sum(all))
+
+        if all_count < 1:
+            return False
+
+        pos = np.where(ySucc > all)
+        ySucc[pos] = all[pos]
+
+        ySucc = ySucc / (all + 0.00000001)
+        xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
+        ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
+        ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count))
+        return True
+
+    def _label(self, path, count, all):
+        ratio = 0.00
+        if all > 0:
+            ratio = round(count * 100 / all, 2)
+
+        items = re.split(r'/', path)
+        if len(items) == 6:
+            (_, _sday, _chname, _quality, _card_type, _amount) = items
+            card_type = ''
+            if _card_type == '1':
+                card_type = 'SY'
+            elif _card_type == '2':
+                card_type = 'SH'
+            elif _card_type == '4':
+                card_type = 'YD'
+            elif _card_type == '5':
+                card_type = 'LT'
+            elif _card_type == '6':
+                card_type = 'DX'
+            elif _card_type == '7':
+                card_type = 'TH'
+            return f"{_chname}-{_quality}-{card_type}-{_amount}:{count}/{all} = {ratio}%"
+        else:
+            if path == '' or path is None:
+                path = 'average'
+            return f"{path}:{count}/{all} = {ratio}%"
+        pass
+
+    #统计机构当前时间之前序列时间成功率
+    def _merge_mobile_path(self,paths):
+        result = {}
+        for path in paths:
+            items = re.split(r'/', path)
+            if len(items) != 6:
+                continue
+            (_, _sday, _mchid, _quality, _card_type, _amount) = items
+
+            _card_type = int(_card_type)
+            if _card_type not in [4, 5, 6]:
+                continue
+
+            _mchid = int(_mchid)
+            if _mchid not in result:
+                result[_mchid] = []
+            result[_mchid].append(path)
+        return result
+
+    def _merge_data(self, hfive, paths):
+        dim = len(self.pos_map)
+        predata = np.zeros((dim, 86400))
+        for path in paths:
+            predata += hfive[path]
+        return predata
+
+    def _calc_mratio(self,data,startes,end):
+        succ = data[1]
+        fail = data[2]
+        x = np.arange(0, 86400, 1)
+
+        result = {}
+        for start in startes:
+            if end - start < 0:
+                start_pos = 0
+            else:
+                start_pos = end - start
+
+            pos = np.where(x >= start_pos)
+            t = x[pos]
+            _fail = fail[pos]
+            _succ = succ[pos]
+
+            pos = np.where(t < end)
+            _fail = _fail[pos]
+            _succ = _succ[pos]
+
+            succs = int(np.sum(_succ))
+            fails  = int(np.sum(_fail))
+            ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
+            result[start] = [succs, fails, ratio]
+        return result
+
+    def mratios(self, time_stamp,presecs):
+        paths = self.paths(time_stamp)
+        mchid_paths = self._merge_mobile_path(paths)
+        day_stamp = self.day_stamp(time_stamp)
+
+        mratios = {}
+        hfive = h5py.File(self._file_name, 'r')
+        for mchid, paths in mchid_paths.items():
+            mdata = self._merge_data(hfive,paths)
+            result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
+            mratios[mchid] = result
+        hfive.close()
+        return mratios
+
+    def calc_ratio(self):
+        import json
+
+        r = None
+        try:
+            pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
+            r = redis.Redis(connection_pool=pool)
+        except Exception as ex:
+            print(ex)
+
+        while True:
+            try:
+                time_sec = int(stime.time())
+                presecs = [900, 1800, 3600, 7200, 86400]
+                mratios = self.mratios(time_sec, presecs)
+
+                if len(mratios) != 0:
+                    r.set(f"nc_merchant_ratios", json.dumps(mratios))
+                    r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
+                    print('push msg=',mratios)
+            except Exception as ex:
+                print(ex)
+            finally:
+                stime.sleep(2)
+
+    #以下为按照卡类型计算成功率代码
+    def _merge_mobile_type_path(self,paths,card_type=None):
+        result = {}
+        for path in paths:
+            items = re.split(r'/', path)
+            if len(items) != 6:
+                continue
+            (_, _sday, _mchid, _quality, _card_type, _amount) = items
+
+            _card_type = int(_card_type)
+            if _card_type not in [4, 5, 6]:
+                continue
+            if card_type is not None and _card_type != card_type:
+                continue
+
+            _mchid = int(_mchid)
+            if _mchid not in result:
+                result[_mchid] = []
+            result[_mchid].append(path)
+        return result
+
+    def mratio_types(self, time_stamp,presecs):
+        paths = self.paths(time_stamp)
+
+        mchid_paths = self._merge_mobile_type_path(paths)
+        day_stamp = self.day_stamp(time_stamp)
+
+        card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
+
+        mratios = {}
+        hfive = h5py.File(self._file_name, 'r')
+        for mchid, paths in mchid_paths.items():
+            mch_ratios = {}
+            for card_type, name in card_types.items():
+                print('card_type=', card_type, 'name=', name)
+                if card_type is None:
+                    cur_paths = paths
+                else:
+                    cur_paths = self._merge_mobile_type_path(paths, card_type)
+                    if len(cur_paths) == 0:
+                        continue
+                    cur_paths = cur_paths[mchid]
+                mdata = self._merge_data(hfive,cur_paths)
+                result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
+                mch_ratios[name] = result
+            mratios[mchid] = mch_ratios
+        hfive.close()
+        return mratios
+
+    def calc_ratios(self):
+        import json
+
+        r = None
+        try:
+            pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
+            r = redis.Redis(connection_pool=pool)
+        except Exception as ex:
+            print(ex)
+
+        while True:
+            try:
+                time_sec = int(stime.time())
+                presecs = [900, 1800, 3600, 7200, 86400]
+                mratios = self.mratio_types(time_sec, presecs)
+
+                if len(mratios) != 0:
+                    r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
+                    # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
+                    print('push msg=', mratios)
+            except Exception as ex:
+                print(ex)
+            finally:
+                stime.sleep(2)
+
+    ####################################################################################################################
+    ####
+    def _calc_mcount(self,data,startes,end):
+        succ = data[1]
+        fail = data[2]
+        x = np.arange(0, 86400, 1)
+
+        result = {}
+        for start in startes:
+            if end - start < 0:
+                start_pos = 0
+            else:
+                start_pos = end - start
+
+            pos = np.where(x >= start_pos)
+            t = x[pos]
+            _succ = succ[pos]
+            _fail = fail[pos]
+
+            pos = np.where(t < end)
+            _succ = _succ[pos]
+            _fail = _fail[pos]
+
+            succs = int(np.sum(_succ))
+            fails = int(np.sum(_fail))
+            ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
+            result[start] = [succs, fails, ratio]
+        return result
+
+    def merchant_rmobile_path(self, paths):
+        result = {}
+        for path in paths:
+            items = re.split(r'/', path)
+            if len(items) != 6:
+                continue
+            (_, _sday, _mchid, _quality, _card_type, _amount) = items
+
+            _card_type = int(_card_type)
+            if _card_type not in [4, 5, 6]:
+                continue
+
+            _mchid = int(_mchid)
+            if _mchid not in result:
+                result[_mchid] = []
+            result[_mchid].append(path)
+        return result
+
+    def rmobile_path(self, paths):
+        result = []
+        for path in paths:
+            items = re.split(r'/', path)
+            if len(items) != 6:
+                continue
+            (_, _sday, _mchid, _quality, _card_type, _amount) = items
+
+            _card_type = int(_card_type)
+            if _card_type not in [4, 5, 6]:
+                continue
+            result.append(path)
+        return result
+
+    def mch_count(self, paths, presecs,time_stamp):
+        hfive = h5py.File(self._file_name, 'r')
+        mdata = self._merge_data(hfive,paths)
+        day_stamp = self.day_stamp(time_stamp)
+        result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
+        hfive.close()
+
+        return result
+
+    def mch_detail_count(self, paths, presecs,time_stamp):
+        hfive = h5py.File(self._file_name, 'r')
+
+        result = {}
+        for path in paths:
+            items = re.split(r'/', path)
+            if len(items) != 6:
+                continue
+            (_, _sday, _mchid, _quality, _card_type, _amount) = items
+            key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
+            mdata = hfive[path]
+            day_stamp = self.day_stamp(time_stamp)
+            result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
+        hfive.close()
+        return result
+
+    def mch_counts(self):
+        import json
+
+        r = None
+        try:
+            pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
+            r = redis.Redis(connection_pool=pool)
+        except Exception as ex:
+            print(ex)
+
+        while True:
+            try:
+                time_stamp = int(stime.time())
+                presecs = [900, 1800, 3600, 7200, 86400]
+                all_paths = self.paths(time_stamp)
+                mchid_paths = self.merchant_rmobile_path(all_paths)
+
+                gross = {}
+                for mchid, paths in mchid_paths.items():
+                    counts = self.mch_count(paths, presecs, time_stamp)
+                    gross[mchid] = counts
+
+                paths = self.rmobile_path(all_paths)
+                detail = self.mch_detail_count(paths, presecs, time_stamp)
+
+                result = {'gross': gross, 'detail': detail}
+                if len(result) != 0:
+                    r.set(f"nc_merchant_refill_counts", json.dumps(result))
+                    r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
+                    print('push msg=', result)
+            except Exception as ex:
+                print(ex)
+            finally:
+                stime.sleep(2)
+
+mchDataCenter = SpeedDataCenter()