stanley-king 2 lat temu
rodzic
commit
75dac153cb

+ 0 - 0
__init__.py


+ 19 - 8
docker/compose/homecuda/stat/docker-compose.yml

@@ -10,7 +10,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-reader"
-    command: ['python','reader.py', '-h', '192.168.1.130', '-p', '6379']
+    command: ['python','reader.py', '-h', '192.168.3.104', '-p', '6379']
 
   mchreadersrv:
     image: pycpu:3.7.10
@@ -21,7 +21,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-mchreader"
-    command: ['python','mchreader.py', '-h', '192.168.1.130', '-p', '6379']
+    command: ['python','mchreader.py', '-h', '192.168.3.104', '-p', '6379']
 
   speedreader:
     image: pycpu:3.7.10
@@ -32,7 +32,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-speedreader"
-    command: ['python','speed_reader.py', '-h', '192.168.1.130', '-p', '6379']
+    command: ['python','speed_reader.py', '-h', '192.168.3.104', '-p', '6379']
     deploy:
       resources:
         limits:
@@ -47,7 +47,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-chspeed"
-    command: ['python','chspeed.py','-h', '192.168.1.130', '-p', '6379']
+    command: ['python','chspeed.py','-h', '192.168.3.104', '-p', '6379']
 
   ratiosrv:
     image: pycpu:3.7.10
@@ -57,7 +57,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-ratio"
-    command: ['python','ratio.py', '-h', '192.168.1.130', '-p', '6379']
+    command: ['python','ratio.py', '-h', '192.168.3.104', '-p', '6379']
 
   flasksrv:
     image: pycpu:3.7.10
@@ -81,7 +81,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-mratio"
-    command: ['python','mratio.py','-h', '192.168.1.130', '-p', '6379']
+    command: ['python','mratio.py','-h', '192.168.3.104', '-p', '6379']
 
   mratios:
     image: pycpu:3.7.10
@@ -92,7 +92,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-mratios"
-    command: ['python','mratios.py','-h', '192.168.1.130', '-p', '6379']
+    command: ['python','mratios.py','-h', '192.168.3.104', '-p', '6379']
 
   mcounts:
     image: pycpu:3.7.10
@@ -103,4 +103,15 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-mcounts"
-    command: ['python','mcounts.py','-h', '192.168.1.130', '-p', '6379']
+    command: ['python','mcounts.py','-h', '192.168.3.104', '-p', '6379']
+
+  qreader:
+    image: pycpu:3.7.10
+    volumes:
+      - ../../../../:/var/www/html
+      - ../conf/etc/localtime:/etc/localtime:ro
+      - /mnt/upload:/var/www/html/data/upload
+      - /mnt/shoplog:/var/www/html/data/log
+      - /mnt/stdata:/var/www/html/data/stdata
+    container_name: "panda-qreader"
+    command: ['python','qreader.py','-h', '192.168.3.104', '-p', '6379']

+ 13 - 6
helper/queue/monitor.php

@@ -33,18 +33,19 @@ class MonitorClient extends IJsonClient
         parent::__construct($db);
     }
 
-    public function onSubmit($mchid,$time,$mch_amount)
+    public function onSubmit($mchid,$time,$spec,$card_type,$mch_amount)
     {
-        $params = ['mchid' => $mchid,'time' => $time,'mch_amount' => $mch_amount];
+        $params = ['mchid' => $mchid,'time' => $time,'spec' => $spec,'card_type' => $card_type,'mch_amount' => $mch_amount];
         $this->push('refill_submit',$params);
     }
 
-    public function onCallback($mchid,$time,$mch_amount,$channel_amount,$succ)
+    public function onCallback($mchid,$time,$spec,$card_type,$mch_amount,$channel_amount,$succ)
     {
-        $params = ['mchid' => $mchid,'time' => $time,'mch_amount' => $mch_amount,'channel_amount' => $channel_amount];
         if($succ) {
+            $params = ['mchid' => $mchid,'time' => $time,'spec' => $spec,'card_type' => $card_type,'mch_amount' => $mch_amount,'channel_amount' => $channel_amount];
             $this->push('refill_succ',$params);
         } else {
+            $params = ['mchid' => $mchid,'time' => $time,'spec' => $spec,'card_type' => $card_type,'mch_amount' => $mch_amount];
             $this->push('refill_fail',$params);
         }
     }
@@ -55,9 +56,15 @@ class MonitorClient extends IJsonClient
         $this->push('channel_neterr',$params);
     }
 
-    public function onNotify($chname,$time,$spec,$card_type,$period,$succ)
+    public function onCommit($chname,$time,$spec,$card_type,$channel_amount)
     {
-        $params = ['channel_name' => $chname,'time' => $time,'spec' => $spec,'card_type' => $card_type,'period' => $period];
+        $params = ['channel_name' => $chname, 'time' => $time, 'spec' => $spec, 'card_type' => $card_type, 'channel_amount' => $channel_amount];
+        $this->push('refill_commit', $params);
+    }
+
+    public function onNotify($chname,$time,$spec,$card_type,$channel_amount,$period,$succ)
+    {
+        $params = ['channel_name' => $chname, 'time' => $time, 'spec' => $spec, 'card_type' => $card_type, 'channel_amount' => $channel_amount, 'period' => $period];
         if ($succ) {
             $this->push('notify_succ', $params);
         } else {

+ 4 - 5
helper/refill/RefillBase.php

@@ -128,7 +128,7 @@ class RefillBase
             $mch_order = $refill_info['mch_order'];
 
             $period = time() - intval($refill_info['commit_time']);
-            util::monitor_notify($chname,$spec,$card_type,$period,$success);
+            util::monitor_notify($chname,$spec,$card_type,$refill_info['channel_amount'],$period,$success);
 
             if ($success) {
                 $logic_vr_order->changeOrderStateSuccess($order_id,true);
@@ -137,7 +137,7 @@ class RefillBase
                 util::incr_notify($chname, $card_type, $spec, $quality, true);
                 util::incr_user_success($mchid,$card_type, $spec,$quality);
                 util::onOrderSuccess($refill_info,$order_info);
-                util::monitor_callback($mchid, $refill_info['mch_amount'], $refill_info['channel_amount'], true);
+                util::monitor_callback($mchid, $spec,$card_type,$refill_info['mch_amount'], $refill_info['channel_amount'], true);
             }
             elseif ($can_try)
             {
@@ -156,7 +156,6 @@ class RefillBase
                     }
                 }
                 util::incr_user_fail($mchid,$card_type, $spec,$quality);
-                util::monitor_callback($mchid, $refill_info['mch_amount'], 0, false);
             }
             else {
                 $logic_vr_order->changeOrderStateCancel($order_info, '', "{$chname}接口回调通知失败,不可重试.",true,true);
@@ -165,9 +164,9 @@ class RefillBase
                 util::incr_notify($chname, $card_type, $spec, $quality, false);
                 util::incr_amount_lock($mchid,$card_type,$spec);
                 util::incr_user_fail($mchid,$card_type, $spec,$quality);
-                util::monitor_callback($mchid, $refill_info['mch_amount'], 0, false);
             }
 
+            util::monitor_callback($mchid, $spec, $card_type, $refill_info['mch_amount'], 0, false);
             $mod_refill->edit($order_id, ['notify_time' => time(), 'is_retrying' => 0,'notify_state' => 1]);
             util::pop_queue_order($mchid,$mch_order);
             QueueClient::push("NotifyMerchantComplete", ['order_id' => $order_id,'manual' => false]);
@@ -406,6 +405,7 @@ class RefillBase
             {
                 $chfilters->add_channel($channel_name,true);
                 util::incr_commit($channel_name,$card_type,$spec,$quality,true);
+                util::monitor_commit($channel_name, $spec, $card_type, $channel_amount);
                 $trade_no = $errmsg;
 
                 $refill_type = $provider->refill_type();
@@ -444,7 +444,6 @@ class RefillBase
                 if(!empty($neterr) && util::need_check($net_errno)) {
                     $mod_refill->edit($order_id, ['commit_time' => time(),'neterr' => 1,'err_msg' => "neterr={$net_errno}"]);
                     util::monitor_neterr($channel_name);
-
                     break;
                 } else {
                     $neterr = false;

+ 10 - 6
helper/refill/util.php

@@ -515,14 +515,14 @@ class util
         }
     }
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-    public static function monitor_submit($mchid,$mch_amount)
+    public static function monitor_submit($mchid,$spec,$card_type,$mch_amount)
     {
-        queue\MonitorClient::instance()->onSubmit($mchid,time(),$mch_amount);
+        queue\MonitorClient::instance()->onSubmit($mchid,time(),$spec,$card_type,$mch_amount);
     }
 
-    public static function monitor_callback($mchid,$mch_amount,$channel_amount,$succ)
+    public static function monitor_callback($mchid,$spec,$card_type,$mch_amount,$channel_amount,$succ)
     {
-        queue\MonitorClient::instance()->onCallback($mchid,time(),floatval($mch_amount),floatval($channel_amount),$succ);
+        queue\MonitorClient::instance()->onCallback($mchid,time(),$spec,$card_type,floatval($mch_amount),floatval($channel_amount),$succ);
     }
 
     public static function monitor_neterr($chname)
@@ -530,11 +530,15 @@ class util
         queue\MonitorClient::instance()->onNetError($chname, time());
     }
 
-    public static function monitor_notify($chname,$spec,$card_type,$period,$succ)
+    public static function monitor_commit($chname, $spec, $card_type, $channel_amount)
     {
-        queue\MonitorClient::instance()->onNotify($chname, time(),$spec,$card_type,$period,$succ);
+        queue\MonitorClient::instance()->onCommit($chname, time(), $spec, $card_type, $channel_amount);
     }
 
+    public static function monitor_notify($chname, $spec, $card_type, $channel_amount, $period, $succ)
+    {
+        queue\MonitorClient::instance()->onNotify($chname, time(), $spec, $card_type, floatval($channel_amount), $period, $succ);
+    }
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
     public static function set_order_channels($mchid,$mchorder,$datas)
     {

+ 0 - 675
plot/QueueListener.py

@@ -1,675 +0,0 @@
-import json
-import os
-import time as stime
-import redis
-import h5py
-from os import path
-import re
-from datetime import timedelta
-from datetime import datetime
-import numpy as np
-from matplotlib.figure import Figure
-from matplotlib import ticker
-from io import BytesIO
-import logging
-
-from refill import ProfitHandler
-
-class QueueListener(object):
-    queue_name = 'REFILL_MONITOR_QUEUE'
-    latest_delta = 10
-    pos_map = {
-        'commit': 0, 'success': 1, 'fail': 2
-    }
-
-    def __init__(self):
-        self._mquit = False
-        self._mRHost = ''
-        self._mRPort = 6379
-        self._file_names = {
-            'profit': {
-                'name': '/var/www/html/data/stdata/profit.hdf5',
-                'handler': None
-            }
-        }
-
-    def set_redis(self, rhost, rport):
-        self._mRHost = rhost
-        self._mRPort = rport
-
-    def stop(self):
-        self._mquit = True
-        pass
-
-    def prepare_data(self):
-        def open(file):
-            if path.exists(file):
-                hfive = h5py.File(file, 'a')
-            else:
-                hfive = h5py.File(file, 'w')
-            return hfive
-
-        while self._mquit == False:
-            try:
-                pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
-                r = redis.Redis(connection_pool=pool)
-
-                for _key,val in self._file_names.items():
-                    hfive = open(val['name'])
-                    if _key == 'profit':
-                        val['handler'] = ProfitHandler(hfive)
-
-                self.read(self.queue_name,r)
-            except Exception as ex:
-                print(ex)
-            finally:
-                stime.sleep(1)
-
-    def read(self,queue,redis):
-        while self._mquit == False:
-            item = redis.brpop(queue, 1)
-            if item is None:
-                continue
-            else:
-                val = json.loads(item[1])
-                self._parse(val)
-
-    def _parse(self,val):
-        method = val['method']
-        params = val['params']
-
-
-    def parase(self, hfive, text, val, prefix):
-        items = re.split(r'-', text)
-        if len(items) != 5:
-            return False
-
-        (mchid, quality, card_type, amount, time) = items
-        pos = self.pos_map[f'{prefix}']
-
-        time = int(time)
-        today = self.day_stamp(time)
-        path = f'/{today}/{mchid}/{quality}/{card_type}/{amount}'
-        if path not in hfive:
-            dim = len(self.pos_map)
-            hfive[path] = np.zeros((dim, 86400))
-
-        diff = time - today
-        if diff < 0:
-            print(diff)
-        hfive[path][pos, diff] = val
-        print(path, pos, prefix, diff, time, val, hfive[path][pos, diff])
-        pass
-
-    def day_stamp(self, stamp):
-        stamp = int(stamp)
-        x = stime.gmtime(stamp + 8 * 3600)
-        diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec)
-        today = stamp - diff.total_seconds()
-        return int(today)
-
-    def _days(self, root):
-        result = []
-        try:
-            for name, sub in root.items():
-                if isinstance(sub, h5py.Group):
-                    result.append(name)
-        except Exception as ex:
-            print(ex)
-        finally:
-            return result
-
-    def days(self):
-        try:
-            hfive = h5py.File(self._file_name, 'r')
-            root = hfive.require_group('/')
-            days = self._days(root)
-            hfive.close()
-            return days
-        except Exception as ex:
-            print(ex)
-            return []
-
-    def paths(self, time_stamp):
-        try:
-            day_stamp = self.day_stamp(time_stamp)
-            hfive = h5py.File(self._file_name, 'r')
-            group = hfive.require_group(f'/{day_stamp}')
-            paths = self.dir(group)
-            hfive.close()
-            return paths
-        except Exception as ex:
-            print(ex)
-            return []
-
-    def dir(self, group):
-        result = []
-        for name, sub in group.items():
-            if isinstance(sub, h5py.Group):
-                result.extend(self.dir(sub))
-            else:
-                result.append(sub.name)
-        return result
-
-    def _all_none(self, **kwargs):
-        for key, val in kwargs.items():
-            if val is not None:
-                return False
-        return True
-
-    def _merge_path(self,paths):
-        result = {}
-        for path in paths:
-            items = re.split(r'/', path)
-            if len(items) != 6:
-                continue
-            (_, _sday, _mchid, _quality, _card_type, _amount) = items
-
-            _mchid = int(_mchid)
-            if _mchid not in result:
-                result[_mchid] = []
-            result[_mchid].append(path)
-
-        return result
-
-    def draw_plot(self, start_time, interval=300, **kwargs):
-        logger = logging.getLogger('app')
-        hfive = h5py.File(self._file_name, 'r')
-        try:
-            day_stamp = self.day_stamp(start_time)
-            start_pos = start_time - day_stamp
-
-            cur_day = self.day_stamp(stime.time())
-            if day_stamp == cur_day:
-                end_pos = int(stime.time()) - day_stamp
-            else:
-                end_pos = -1
-
-            fig = Figure(figsize=(16, 8))
-            ax = fig.subplots()
-
-            dim = len(self.pos_map)
-            predata = np.zeros((dim, 86400))
-            x = np.arange(0, 86400, interval)
-
-            sub_count = 0
-            filer_text, paths = self.datasets(hfive, start_time, **kwargs)
-            if self._all_none(**kwargs):
-                paths = self._merge_path(paths)
-                for mchid, data in self._read_dict_data(hfive, paths):
-                    predata = predata + data
-                    path = f'{mchid}'
-                    ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
-                    if ret:
-                        sub_count += 1
-                pass
-            else:
-                for path, data in self.read_data(hfive, paths):
-                    data = np.array(data)
-                    predata = predata + data
-                    ret = self._draw_plot(ax, x, day_stamp, start_pos, end_pos, data, interval, path)
-                    if ret:
-                        sub_count += 1
-            if sub_count > 1:
-                self._draw_plot(ax, x, day_stamp, start_pos, end_pos, predata, interval, filer_text)
-
-            ax.legend()
-            ax.grid()
-            ax.set_title('success ratio')
-            ax.set(xlabel='time', ylabel='ratio')
-            fig.autofmt_xdate()
-            fig.subplots_adjust(left=0.05, right=0.999, top=0.95, bottom=0.1)
-
-            buf = BytesIO()
-            fig.savefig(buf, format="png")
-            return buf
-        except Exception as ex:
-            print(ex)
-        finally:
-            hfive.close()
-
-    def read_data(self, hfive, paths):
-        for path in paths:
-            yield path, hfive[path]
-
-    def _read_dict_data(self, hfive, mchPaths):
-        for mchid, paths in mchPaths.items():
-            dim = len(self.pos_map)
-            predata = np.zeros((dim, 86400))
-            for path in paths:
-                predata += hfive[path]
-            yield mchid, predata
-
-    def datasets(self, hfive, start_time, **kwargs):
-        logger = logging.getLogger('app')
-
-        day_stamp = self.day_stamp(start_time)
-        sday = f'{day_stamp}'
-        root = hfive.require_group('/')
-        days = self._days(root)
-        if sday not in days:
-            return False
-
-        group = hfive.require_group(sday)
-        dsets = self.dir(group)
-
-        mchid = quality = card_type = amount = None
-        for key, val in kwargs.items():
-            if val is None:
-                continue
-            if key == 'mchid':
-                mchid = val
-            elif key == 'quality':
-                quality = f'{val}'
-            elif key == 'card_type':
-                card_type = f'{val}'
-            elif key == 'amount':
-                amount = f'{val}'
-            else:
-                continue
-        return self._filter(dsets, mchid=mchid, quality=quality, card_type=card_type, amount=amount)
-
-    def _filter(self, dsets, mchid=None, quality=None, card_type=None, amount=None):
-        filer_text = ''
-        if mchid is not None:
-            filer_text = mchid
-        if quality is not None:
-            filer_text = filer_text + f"-qua:{quality}"
-        if card_type is not None:
-            filer_text = filer_text + f"-type:{card_type}"
-        if amount is not None:
-            filer_text = filer_text + f"-amount:{amount}"
-
-        paths = []
-        for text in dsets:
-            items = re.split(r'/', text)
-            if len(items) != 6:
-                return False
-            (_, _sday, _mchid, _quality, _card_type, _amount) = items
-            if (mchid is not None) and (_mchid != mchid):
-                continue
-            if (quality is not None) and (_quality != quality):
-                continue
-            if (card_type is not None) and (_card_type != card_type):
-                continue
-            if (amount is not None) and (_amount != amount):
-                continue
-            paths.append(text)
-
-        return filer_text, paths
-
-    def _draw_plot(self, ax, x, day_stamp, start_pos, end_pos, data, interval=300, path=''):
-        # 'commit': 0, 'succ': 1, 'fail': 2
-        logging.getLogger('app').debug("path=%s", path)
-
-        all = data[1] + data[2]
-        all = all.reshape((-1, interval))
-        all = np.sum(all, axis=1)
-
-        commit = data[0]
-        commit = commit.reshape((-1, interval))
-        commit = np.sum(commit, axis=1)
-
-        ySucc = data[1]
-        ySucc = ySucc.reshape((-1, interval))
-        ySucc = np.sum(ySucc, axis=1)
-
-        yFail = data[2]
-        yFail = yFail.reshape((-1, interval))
-        yFail = np.sum(yFail, axis=1)
-
-        if end_pos == -1:
-            pos = np.where(x >= start_pos)
-            x = x[pos]
-            ySucc = ySucc[pos]
-            all = all[pos]
-            commit = commit[pos]
-            yFail = yFail[pos]
-        else:
-            pos = np.where(start_pos <= x)
-            x = x[pos]
-            ySucc = ySucc[pos]
-            all = all[pos]
-            commit = commit[pos]
-            yFail = yFail[pos]
-
-            pos = np.where(x < end_pos)
-            x = x[pos]
-            ySucc = ySucc[pos]
-            all = all[pos]
-            commit = commit[pos]
-            yFail = yFail[pos]
-
-        succ_count = int(np.sum(ySucc))
-        all_count  = int(np.sum(all))
-        commit_count = int(np.sum(commit))
-        fail_count = int(np.sum(yFail))
-
-        if all_count < 1:
-            return False
-
-        pos = np.where(ySucc > all)
-        ySucc[pos] = all[pos]
-
-        ySucc = ySucc / (all + 0.00000001)
-        xs = np.array([stime.strftime('%H:%M', stime.localtime(d + day_stamp)) for d in x])
-        ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=0))
-        ax.plot(xs, ySucc, ls='--', marker='o', label=self._label(path, succ_count, all_count,commit_count,fail_count))
-        return True
-
-    def _label(self, path, succ_count, all, commit_count, fail_count):
-        ratio = 0.00
-        if all > 0:
-            ratio = round(succ_count * 100 / all, 2)
-
-        items = re.split(r'/', path)
-        if len(items) == 6:
-            (_, _sday, _chname, _quality, _card_type, _amount) = items
-            card_type = ''
-            if _card_type == '1':
-                card_type = 'SY'
-            elif _card_type == '2':
-                card_type = 'SH'
-            elif _card_type == '4':
-                card_type = 'YD'
-            elif _card_type == '5':
-                card_type = 'LT'
-            elif _card_type == '6':
-                card_type = 'DX'
-            elif _card_type == '7':
-                card_type = 'TH'
-            return f"{_chname}-{_quality}-{card_type}-{_amount}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
-        else:
-            if path == '' or path is None:
-                path = 'average'
-            return f"{path}:{succ_count}/{all} = {ratio}% {commit_count}:{fail_count}"
-        pass
-
-    #统计机构当前时间之前序列时间成功率
-    def _merge_mobile_path(self,paths):
-        result = {}
-        for path in paths:
-            items = re.split(r'/', path)
-            if len(items) != 6:
-                continue
-            (_, _sday, _mchid, _quality, _card_type, _amount) = items
-
-            _card_type = int(_card_type)
-            if _card_type not in [4, 5, 6]:
-                continue
-
-            _mchid = int(_mchid)
-            if _mchid not in result:
-                result[_mchid] = []
-            result[_mchid].append(path)
-        return result
-
-    def _merge_data(self, hfive, paths):
-        dim = len(self.pos_map)
-        predata = np.zeros((dim, 86400))
-        for path in paths:
-            predata += hfive[path]
-        return predata
-
-    def _calc_mratio(self,data,startes,end):
-        succ = data[1]
-        fail = data[2]
-        x = np.arange(0, 86400, 1)
-
-        result = {}
-        for start in startes:
-            if end - start < 0:
-                start_pos = 0
-            else:
-                start_pos = end - start
-
-            pos = np.where(x >= start_pos)
-            t = x[pos]
-            _fail = fail[pos]
-            _succ = succ[pos]
-
-            pos = np.where(t < end)
-            _fail = _fail[pos]
-            _succ = _succ[pos]
-
-            succs = int(np.sum(_succ))
-            fails  = int(np.sum(_fail))
-            ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
-            result[start] = [succs, fails, ratio]
-        return result
-
-    def mratios(self, time_stamp,presecs):
-        paths = self.paths(time_stamp)
-        mchid_paths = self._merge_mobile_path(paths)
-        day_stamp = self.day_stamp(time_stamp)
-
-        mratios = {}
-        hfive = h5py.File(self._file_name, 'r')
-        for mchid, paths in mchid_paths.items():
-            mdata = self._merge_data(hfive,paths)
-            result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
-            mratios[mchid] = result
-        hfive.close()
-        return mratios
-
-    def calc_ratio(self):
-        import json
-
-        r = None
-        try:
-            pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
-            r = redis.Redis(connection_pool=pool)
-        except Exception as ex:
-            print(ex)
-
-        while True:
-            try:
-                time_sec = int(stime.time())
-                presecs = [900, 1800, 3600, 7200, 86400]
-                mratios = self.mratios(time_sec, presecs)
-
-                if len(mratios) != 0:
-                    r.set(f"nc_merchant_ratios", json.dumps(mratios))
-                    r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
-                    print('push msg=',mratios)
-            except Exception as ex:
-                print(ex)
-            finally:
-                stime.sleep(2)
-
-    #以下为按照卡类型计算成功率代码
-    def _merge_mobile_type_path(self,paths,card_type=None):
-        result = {}
-        for path in paths:
-            items = re.split(r'/', path)
-            if len(items) != 6:
-                continue
-            (_, _sday, _mchid, _quality, _card_type, _amount) = items
-
-            _card_type = int(_card_type)
-            if _card_type not in [4, 5, 6]:
-                continue
-            if card_type is not None and _card_type != card_type:
-                continue
-
-            _mchid = int(_mchid)
-            if _mchid not in result:
-                result[_mchid] = []
-            result[_mchid].append(path)
-        return result
-
-    def mratio_types(self, time_stamp,presecs):
-        paths = self.paths(time_stamp)
-
-        mchid_paths = self._merge_mobile_type_path(paths)
-        day_stamp = self.day_stamp(time_stamp)
-
-        card_types = {None: 'ALL', 4: 'YD', 5: 'LT', 6: 'DX'}
-
-        mratios = {}
-        hfive = h5py.File(self._file_name, 'r')
-        for mchid, paths in mchid_paths.items():
-            mch_ratios = {}
-            for card_type, name in card_types.items():
-                print('card_type=', card_type, 'name=', name)
-                if card_type is None:
-                    cur_paths = paths
-                else:
-                    cur_paths = self._merge_mobile_type_path(paths, card_type)
-                    if len(cur_paths) == 0:
-                        continue
-                    cur_paths = cur_paths[mchid]
-                mdata = self._merge_data(hfive,cur_paths)
-                result = self._calc_mratio(mdata,presecs,time_stamp - day_stamp)
-                mch_ratios[name] = result
-            mratios[mchid] = mch_ratios
-        hfive.close()
-        return mratios
-
-    def calc_ratios(self):
-        import json
-
-        r = None
-        try:
-            pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
-            r = redis.Redis(connection_pool=pool)
-        except Exception as ex:
-            print(ex)
-
-        while True:
-            try:
-                time_sec = int(stime.time())
-                presecs = [900, 1800, 3600, 7200, 86400]
-                mratios = self.mratio_types(time_sec, presecs)
-
-                if len(mratios) != 0:
-                    r.set(f"nc_merchant_card_type_ratios", json.dumps(mratios))
-                    # r.publish('refill',json.dumps({'type':'mch_ratio','value':0}))
-                    print('push msg=', mratios)
-            except Exception as ex:
-                print(ex)
-            finally:
-                stime.sleep(2)
-
-    ####################################################################################################################
-    ####
-    def _calc_mcount(self,data,startes,end):
-        succ = data[1]
-        fail = data[2]
-        x = np.arange(0, 86400, 1)
-
-        result = {}
-        for start in startes:
-            if end - start < 0:
-                start_pos = 0
-            else:
-                start_pos = end - start
-
-            pos = np.where(x >= start_pos)
-            t = x[pos]
-            _succ = succ[pos]
-            _fail = fail[pos]
-
-            pos = np.where(t < end)
-            _succ = _succ[pos]
-            _fail = _fail[pos]
-
-            succs = int(np.sum(_succ))
-            fails = int(np.sum(_fail))
-            ratio = round((succs + 0.00001) / (succs + fails + 0.00001), 4)
-            result[start] = [succs, fails, ratio]
-        return result
-
-    def merchant_rmobile_path(self, paths):
-        result = {}
-        for path in paths:
-            items = re.split(r'/', path)
-            if len(items) != 6:
-                continue
-            (_, _sday, _mchid, _quality, _card_type, _amount) = items
-
-            _card_type = int(_card_type)
-            if _card_type not in [4, 5, 6]:
-                continue
-
-            _mchid = int(_mchid)
-            if _mchid not in result:
-                result[_mchid] = []
-            result[_mchid].append(path)
-        return result
-
-    def rmobile_path(self, paths):
-        result = []
-        for path in paths:
-            items = re.split(r'/', path)
-            if len(items) != 6:
-                continue
-            (_, _sday, _mchid, _quality, _card_type, _amount) = items
-
-            _card_type = int(_card_type)
-            if _card_type not in [4, 5, 6]:
-                continue
-            result.append(path)
-        return result
-
-    def mch_count(self, paths, presecs,time_stamp):
-        hfive = h5py.File(self._file_name, 'r')
-        mdata = self._merge_data(hfive,paths)
-        day_stamp = self.day_stamp(time_stamp)
-        result = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
-        hfive.close()
-
-        return result
-
-    def mch_detail_count(self, paths, presecs,time_stamp):
-        hfive = h5py.File(self._file_name, 'r')
-
-        result = {}
-        for path in paths:
-            items = re.split(r'/', path)
-            if len(items) != 6:
-                continue
-            (_, _sday, _mchid, _quality, _card_type, _amount) = items
-            key = f"{_mchid}-{_quality}-{_card_type}-{_amount}"
-            mdata = hfive[path]
-            day_stamp = self.day_stamp(time_stamp)
-            result[key] = self._calc_mcount(mdata,presecs,time_stamp - day_stamp)
-        hfive.close()
-        return result
-
-    def mch_counts(self):
-        import json
-
-        r = None
-        try:
-            pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
-            r = redis.Redis(connection_pool=pool)
-        except Exception as ex:
-            print(ex)
-
-        while True:
-            try:
-                time_stamp = int(stime.time())
-                presecs = [900, 1800, 3600, 7200, 86400]
-                all_paths = self.paths(time_stamp)
-                mchid_paths = self.merchant_rmobile_path(all_paths)
-
-                gross = {}
-                for mchid, paths in mchid_paths.items():
-                    counts = self.mch_count(paths, presecs, time_stamp)
-                    gross[mchid] = counts
-
-                paths = self.rmobile_path(all_paths)
-                detail = self.mch_detail_count(paths, presecs, time_stamp)
-
-                result = {'gross': gross, 'detail': detail}
-                if len(result) != 0:
-                    r.set(f"nc_merchant_refill_counts", json.dumps(result))
-                    r.publish('refill',json.dumps({'type':'mch_counts','value':0}))
-                    print('push msg=', result)
-            except Exception as ex:
-                print(ex)
-            finally:
-                stime.sleep(2)
-
-queueListener = QueueListener()

+ 0 - 0
plot/__init__.py


+ 32 - 0
plot/qreader.py

@@ -0,0 +1,32 @@
+from .refill import queueListener
+import signal as sig
+import sys,getopt
+import logging
+
+logging.basicConfig(filename='/var/www/html/data/log/qreader.log', level=logging.DEBUG)
+log = logging.getLogger('reader')
+
+def handle_sigterm(*args):
+    queueListener.stop()
+    pass
+
+if __name__ == '__main__':
+    try:
+        opts, args = getopt.getopt(sys.argv[1:],"h:p:",["host=",'port='])
+    except Exception as ex:
+        print(ex)
+        sys.exit(2)
+    rhost = ''
+    rport = 6379
+    for o, val in opts:
+        if o in ("-h", "--host"):
+            rhost = val
+        elif o in ('-p', "--port"):
+            rport = int(val)
+        else:
+            print("Err argv")
+    queueListener.set_redis(rhost,rport)
+
+    # sig.signal(sig.SIGTERM, queueListener.stop) #无法监听到退出消息
+    sig.signal(sig.SIGTERM, handle_sigterm) #可以监听到退出消息
+    queueListener.prepare_data()

+ 0 - 23
plot/queue_start.py

@@ -1,23 +0,0 @@
-from QueueListener import profitCenter
-import signal as sig
-import sys,getopt
-
-if __name__ == '__main__':
-    try:
-        opts, args = getopt.getopt(sys.argv[1:],"h:p:",["host=",'port='])
-    except Exception as ex:
-        print(ex)
-        sys.exit(2)
-
-    rhost = ''
-    rport=6379
-    for o, val in opts:
-        if o in ("-h", "--host"):
-            rhost = val
-        elif o in ('-p', "--port"):
-            rport = int(val)
-        else:
-            print("Err argv")
-    profitCenter.set_redis(rhost,rport)
-    sig.signal(sig.SIGINT, lambda: profitCenter.stop())
-    profitCenter.prepare_data()

+ 63 - 0
plot/refill/ChannelWriter.py

@@ -0,0 +1,63 @@
+# from . import DataHandler #此时是导入文件
+from .DataStream import DataWriteStream, ChPosmap, day_stamp
+import numpy as np
+
+__all__ = ['ChannelWriter']
+
+
+class ChannelWriter(DataWriteStream, ChPosmap):
+    def write(self, method, params):
+        if method == 'refill_commit':
+            self._onCommit(params)
+        elif method == 'notify_succ':
+            self._onSucc(params)
+        elif method == 'notify_fail':
+            self._onFail(params)
+        else:
+            pass
+
+    def _onCommit(self, params):
+        def parse(input):
+            return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
+
+        chname, time, spec, card_type, channel_amount = parse(params)
+        path, pos = self.path_pos(chname, time, spec, card_type)
+
+        self.file[path][self.pos_map['commit_count'], pos] += 1
+        self.file[path][self.pos_map['commit_amounts'], pos] += channel_amount
+        pass
+
+    def _onSucc(self, params):
+        def parse(input):
+            return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
+
+        chname, time, spec, card_type, channel_amount, period = parse(params)
+        path, pos = self.path_pos(chname, time, spec, card_type)
+
+        self.file[path][self.pos_map['fail_count'], pos] += 1
+        self.file[path][self.pos_map['fail_amounts'], pos] += channel_amount
+        self.file[path][self.pos_map['fail_periods'], pos] += period
+        pass
+
+    def _onFail(self, params):
+        def parse(input):
+            return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
+
+        chname, time, spec, card_type, channel_amount, period = parse(params)
+        path, pos = self.path_pos(chname, time, spec, card_type)
+
+        self.file[path][self.pos_map['succ_count'], pos] += 1
+        self.file[path][self.pos_map['succ_amounts'], pos] += channel_amount
+        self.file[path][self.pos_map['succ_periods'], pos] += period
+        pass
+
+    def path_pos(self, chname, time, spec, card_type):
+        today = day_stamp(time)
+        path = f'/{today}/{chname}/{card_type}/{spec}'
+
+        hfive = self.file
+        if path not in hfive:
+            dim = len(self.pos_map)
+            hfive[path] = np.zeros((dim, 86400))
+
+        return path, time - today

+ 0 - 21
plot/refill/DataHandler.py

@@ -1,21 +0,0 @@
-from abc import ABCMeta, abstractmethod
-
-__all__ = ['DataHandler']
-
-class DataHandler(metaclass=ABCMeta):
-    def __init__(self,hfive):
-        self._hfive = hfive
-
-    # Getter function
-    @property
-    def file(self):
-        return self._hfive
-
-    # Setter function
-    @file.setter
-    def file(self, value):
-        self._hfive = value
-
-    @abstractmethod
-    def read(self):
-        pass

+ 79 - 0
plot/refill/DataStream.py

@@ -0,0 +1,79 @@
+from abc import ABCMeta, abstractmethod,ABC
+from datetime import timedelta
+
+__all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'MchPosmap','ChPosmap']
+
+def day_stamp(stamp):
+    import time as stime
+
+    stamp = int(stamp)
+    st_time = stime.gmtime(stamp + 8 * 3600)
+    diff = timedelta(hours=st_time.tm_hour, minutes=st_time.tm_min, seconds=st_time.tm_sec)
+    today = stamp - diff.total_seconds()
+    return int(today)
+
+class DataWriteStream(metaclass=ABCMeta):
+    def __init__(self,hfive):
+        self._hfive = hfive
+
+    # Getter function
+    @property
+    def file(self):
+        return self._hfive
+
+    # Setter function
+    @file.setter
+    def file(self, value):
+        self._hfive = value
+
+    @abstractmethod
+    def read(self):
+        pass
+
+    @abstractmethod
+    def write(self,method,params):
+        pass
+
+    def close(self):
+        if self._hfive is not None:
+            self._hfive.close()
+
+class DataReadStream(metaclass=ABCMeta):
+    def __init__(self,hfive):
+        self._hfive = hfive
+
+    # Getter function
+    @property
+    def file(self):
+        return self._hfive
+
+    # Setter function
+    @file.setter
+    def file(self, value):
+        self._hfive = value
+
+    @abstractmethod
+    def write(self,method,params):
+        pass
+
+    def close(self):
+        if self._hfive is not None:
+            self._hfive.close()
+
+class MchPosmap(metaclass=ABCMeta):
+    pos_map = {
+        'submit_count': 0, 'submit_amounts': 1,
+        'succ_count': 2, 'succ_mch_amounts': 3, 'succ_ch_amounts': 4,
+        'fail_count': 5, 'fail_mch_amounts': 6
+    }
+    pass
+
+class ChPosmap(metaclass=ABCMeta):
+    pos_map = {
+        'commit_count': 0, 'commit_amounts': 1,
+        'succ_count': 2,'succ_amounts': 3, 'succ_periods': 4,
+        'fail_count': 5, 'fail_amounts': 6,'fail_periods': 7
+    }
+    pass
+
+

+ 60 - 0
plot/refill/MerchantWriter.py

@@ -0,0 +1,60 @@
+# from . import DataHandler #此时是导入文件
+from . DataStream import DataWriteStream,MchPosmap,day_stamp
+import numpy as np
+
+__all__ = ['MerchantWriter']
+
+class MerchantWriter(DataWriteStream, MchPosmap):
+    def write(self,method,params):
+        if method == 'refill_submit':
+            self._onSubmit(params)
+        elif method == 'refill_succ':
+            self._onSucc(params)
+        else:
+            self._onFail(params)
+        pass
+
+    def _onSubmit(self,params):
+        def parse(input):
+            return  params['mchid'],params['time'],params['spec'],params['card_type'],params['mch_amount']
+
+        mchid,time,spec,card_type,mch_amount = parse(params)
+        path,pos = self.path_pos(mchid,time,spec,card_type)
+
+        self.file[path][self.pos_map['submit_count'],pos] += 1
+        self.file[path][self.pos_map['submit_amounts'],pos] += mch_amount
+        pass
+
+    def _onSucc(self,params):
+        def parse(input):
+            return  params['mchid'],params['time'],params['spec'],params['card_type'],params['mch_amount'],params['channel_amount']
+
+        mchid,time,spec,card_type,mch_amount,channel_amount = parse(params)
+        path,pos = self.path_pos(mchid,time,spec,card_type)
+
+        self.file[path][self.pos_map['succ_count'],pos] += 1
+        self.file[path][self.pos_map['succ_mch_amounts'],pos] += mch_amount
+        self.file[path][self.pos_map['succ_ch_amounts'],pos] += channel_amount
+        pass
+
+    def _onFail(self,params):
+        def parse(input):
+            return  params['mchid'],params['time'],params['spec'],params['card_type'],params['mch_amount']
+
+        mchid,time,spec,card_type,mch_amount = parse(params)
+        path,pos = self.path_pos(mchid,time,spec,card_type)
+
+        self.file[path][self.pos_map['fail_count'],pos] += 1
+        self.file[path][self.pos_map['fail_mch_amounts'],pos] += mch_amount
+        pass
+
+    def path_pos(self,mchid,time,spec,card_type):
+        today = day_stamp(time)
+        path = f'/{today}/{mchid}/{card_type}/{spec}'
+
+        hfive = self.file
+        if path not in hfive:
+            dim = len(self.pos_map)
+            hfive[path] = np.zeros((dim, 86400))
+
+        return path, time - today

+ 39 - 0
plot/refill/NeterrlWriter.py

@@ -0,0 +1,39 @@
+# from . import DataHandler #此时是导入文件
+from .DataStream import DataWriteStream, ChPosmap, day_stamp
+import numpy as np
+
+__all__ = ['NeterrWriter']
+
+
+class NeterrWriter(DataWriteStream, ChPosmap):
+    def write(self, method, params):
+        if method == 'channel_neterr':
+            self._onCommit(params)
+        elif method == 'notify_succ':
+            self._onSucc(params)
+        elif method == 'notify_fail':
+            self._onFail(params)
+        else:
+            pass
+
+    def _onNeterr(self, params):
+        def parse(input):
+            return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
+
+        chname, time, spec, card_type, channel_amount = parse(params)
+        path, pos = self.path_pos(chname, time, spec, card_type)
+
+        self.file[path][self.pos_map['commit_count'], pos] += 1
+        self.file[path][self.pos_map['commit_amounts'], pos] += channel_amount
+        pass
+
+    def path_pos(self, chname, time, spec, card_type):
+        today = day_stamp(time)
+        path = f'/{today}/{chname}/{card_type}/{spec}'
+
+        hfive = self.file
+        if path not in hfive:
+            dim = len(self.pos_map)
+            hfive[path] = np.zeros((dim, 86400))
+
+        return path, time - today

+ 0 - 8
plot/refill/ProfitHandler.py

@@ -1,8 +0,0 @@
-# from . import DataHandler #此时是导入文件
-from . DataHandler import DataHandler
-
-__all__ = ['ProfitHandler']
-
-class ProfitHandler(DataHandler):
-    def read(self):
-        pass

+ 106 - 0
plot/refill/QueueListener.py

@@ -0,0 +1,106 @@
+import json
+import os
+import time as stime
+import redis
+from mpi4py import MPI
+import h5py
+import logging
+from .MerchantWriter import MerchantWriter
+from .ChannelWriter import ChannelWriter
+
+log = logging.getLogger('listener')
+
+class QueueListener(object):
+    _mQueueName = 'REFILL_MONITOR_QUEUE'
+    def __init__(self):
+        self._mQuit = False
+        self._mRHost = ''
+        self._mRPort = 6379
+        self._mHandlers = {
+            'merchant': {
+                'name': '/var/www/html/data/stdata/merchant.hdf5',
+                'handler': None
+            },
+            'channel': {
+                'name': '/var/www/html/data/stdata/channel.hdf5',
+                'handler': None
+            },
+            'neterr': {
+                'name': '/var/www/html/data/stdata/neterr.hdf5',
+                'handler': None
+            }
+        }
+
+    def set_redis(self, rhost, rport):
+        self._mRHost = rhost
+        self._mRPort = rport
+
+    def stop(self):
+        log.debug('stop')
+        self._mQuit = True
+        pass
+
+    def prepare_data(self):
+        def open(file):
+            hfive = h5py.File(file, 'a')
+            return hfive
+
+        while self._mQuit == False:
+            try:
+                pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
+                r = redis.Redis(connection_pool=pool)
+
+                for _key,val in self._mHandlers.items():
+                    hfive = open(val['name'])
+                    if _key == 'merchant':
+                        val['handler'] = MerchantWriter(hfive)
+                    elif _key == 'channel':
+                        val['handler'] = ChannelWriter(hfive)
+                log.debug('init file ok')
+                self.read(self._mQueueName, r)
+                r.close()
+            except Exception as ex:
+                log.error(ex)
+                print(ex)
+            finally:
+                self._close()
+                stime.sleep(1)
+
+        self._close()
+        log.debug('quit prepare data')
+
+    def _close(self):
+        for _key,val in self._mHandlers.items():
+            handler = val['handler']
+            if handler is not None:
+                handler.close()
+                val['handler'] = None
+                log.info(handler)
+
+    def read(self,queue,redis):
+        def find_handler(method):
+            if method in ['refill_submit','refill_succ','refill_fail']:
+                file_type = 'merchant'
+            elif method in ['notify_succ','notify_fail','channel_neterr','refill_commit']:
+                file_type = 'channel'
+            else:
+                return None
+            return self._mHandlers[file_type]['handler']
+
+        while self._mQuit == False:
+            item = redis.brpop(queue, 1)
+            if item is None:
+                continue
+            else:
+                try:
+                    val = json.loads(item[1])
+                    method = val['method']
+                    params = val['params']
+                    handler = find_handler(method)
+                    if handler is not None:
+                        handler.write(method,params)
+                except Exception as ex:
+                    print(ex)
+        pass
+
+queueListener = QueueListener()

+ 6 - 3
plot/refill/__init__.py

@@ -1,5 +1,8 @@
 
-from .DataHandler import DataHandler
-from .ProfitHandler import ProfitHandler
+from .DataStream import DataWriteStream
+from .DataStream import DataReadStream
+from .MerchantWriter import MerchantWriter
+from .ChannelWriter import ChannelWriter
+from .QueueListener import queueListener
 
-__all__ = ['DataHandler','ProfitHandler']
+__all__ = ['DataWriteStream', 'DataReadStream', 'MerchantWriter', 'ChannelWriter', 'queueListener']

+ 0 - 17
plot/testModule.py

@@ -1,17 +0,0 @@
-import unittest
-from refill import *
-# from refill.DataHandler import DataHandler
-# from refill.ProfitHandler import ProfitHandler
-
-class MyTestCase(unittest.TestCase):
-    def test_something(self):
-        self.assertEqual(True, False)  # add assertion here
-
-    def test_handler(self):
-        handler = ProfitHandler(None)
-        pass
-
-
-
-if __name__ == '__main__':
-    unittest.main()

+ 30 - 0
plot/testPlot.py

@@ -0,0 +1,30 @@
+import unittest
+from refill.MerchantWriter import MerchantWriter
+
+
+class MyTestCase(unittest.TestCase):
+    __redis_host = '192.168.3.104'
+    # __redis_host = '192.168.3.46'
+    def test_something(self):
+        self.assertEqual(True, False)  # add assertion here
+
+    def test_handler(self):
+        handler = MerchantWriter(None)
+        pass
+    def test_listener(self):
+        from refill import queueListener
+        queueListener.set_redis(self.__redis_host,'6379')
+        queueListener.prepare_data()
+
+    def test_jsonLoads(self):
+        import json
+        try:
+            str = 4
+            x = json.loads(str)
+            print(x)
+        except Exception as ex:
+            print(ex)
+
+
+if __name__ == '__main__':
+    unittest.main()

+ 3 - 9
plot/thdf5.py

@@ -2,26 +2,21 @@ import unittest
 import redis
 import h5py
 import time
-from datetime import datetime
 from datetime import timedelta
 import re
-import threading
 import numpy as np
 from DataCenter import dataCenter
 from MchDataCenter import mchDataCenter
 from SpeedDataCenter import speedDataCenter
-from QueueListener import queueListener
+from . refill import queueListener
 
-from matplotlib.figure import Figure
-from PIL import Image
-from io import BytesIO
 from PIL import Image
 import json
 
 
 class DataTest(unittest.TestCase):
-    # __redis_host = '192.168.3.104'
-    __redis_host = '192.168.3.46'
+    __redis_host = '192.168.3.104'
+    # __redis_host = '192.168.3.46'
     def test_parase(self):
         try:
             dataCenter.parase('succ-lingzh-1-4-50-1618184676', '1')
@@ -184,7 +179,6 @@ class DataTest(unittest.TestCase):
             pass
         
     def test_setcache(self):
-        import json
         r = None
         try:
             pool = redis.ConnectionPool(host='192.168.1.220', port=6379, db=0)

+ 2 - 2
rdispatcher/proxy.php

@@ -147,7 +147,7 @@ class proxy
             } else {
                 refill\util::incr_user_commit($mchid, $order->card_type(), $order->spec(), $org_quality);
                 $mch_amount = refill\RefillFactory::instance()->mch_amount($order);
-                refill\util::monitor_submit($order->mchid(),$mch_amount);
+                refill\util::monitor_submit($order->mchid(), $order->spec(), $order->card_type(), $mch_amount);
             }
         }
         elseif($this->need_transfer($order) && $this->transfer($order)) {
@@ -309,7 +309,7 @@ class proxy
         $refill_order->edit_detail($mchid,$mch_order,['order_state' => ORDER_STATE_SEND]);
         refill\util::incr_user_commit($mchid,$card_type,$spec,$org_quality);
         $mch_amount = refill\RefillFactory::instance()->mch_amount($order);
-        refill\util::monitor_submit($mchid,$mch_amount);
+        refill\util::monitor_submit($mchid, $order->spec(), $order->card_type(), $mch_amount);
 
         [$errcode, $errmsg, $order_id, $neterr,$net_errno] = refill\RefillFactory::instance()->add($order);
         if($errcode !== true)

+ 4 - 3
test/TestRefillMonitor.php

@@ -23,8 +23,9 @@ class TestRefillMonitor extends TestCase
 
     public function testPushMessage()
     {
-//        queue\MonitorClient::instance()->onSubmit(1,time(),98.5);
-        refill\util::monitor_submit(1092,98.5);
+        refill\util::monitor_submit(1092,100,4,98.5);
+        refill\util::monitor_submit(1092,100,4,98.5);
+        refill\util::monitor_submit(1092,100,4,98.5);
+        refill\util::monitor_submit(1092,100,4,98.5);
     }
-
 }