stanley-king 2 年之前
父节点
当前提交
55ef01d591

+ 1 - 1
docker/compose/homecuda/stat/docker-compose.yml

@@ -114,4 +114,4 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-qreader"
-    command: ['python','qreader.py','-h', '192.168.3.104', '-p', '6379']
+    command: ['mpiexec', '-n','4','python','qreader.py','-h', '192.168.3.104', '-p', '6379']

+ 6 - 2
helper/queue/monitor.php

@@ -50,10 +50,14 @@ class MonitorClient extends IJsonClient
         }
     }
 
-    public function onNetError($chname,$time)
+    public function onNetCheck($chname, $time, $succ)
     {
         $params = ['channel_name' => $chname,'time' => $time];
-        $this->push('channel_neterr',$params);
+        if($succ) {
+            $this->push('net_succ',$params);
+        } else {
+            $this->push('net_fail',$params);
+        }
     }
 
     public function onCommit($chname,$time,$spec,$card_type,$channel_amount)

+ 7 - 2
helper/refill/RefillBase.php

@@ -443,7 +443,7 @@ class RefillBase
                 $chfilters->add_channel($channel_name,false);
                 if(!empty($neterr) && util::need_check($net_errno)) {
                     $mod_refill->edit($order_id, ['commit_time' => time(),'neterr' => 1,'err_msg' => "neterr={$net_errno}"]);
-                    util::monitor_neterr($channel_name);
+                    util::monitor_netchk($channel_name,false);
                     break;
                 } else {
                     $neterr = false;
@@ -718,10 +718,11 @@ class RefillBase
             [$state, $order_state] = $provider->query($refill_info);
             if(!$state) {
                 QueueClient::async_push("QueryOrderNeterr",['order_id' => $order_id],30);
-                return false;
+                $neterr = true;
             }
             elseif($order_state == ORDER_STATE_SUCCESS || $order_state == ORDER_STATE_CANCEL)
             {
+                $neterr = false;
                 $logic_vr_order = Logic("vr_order");
                 $logic_vr_order->changeOrderStateSend($order_id, true);
 
@@ -730,6 +731,7 @@ class RefillBase
                 QueueClient::async_push("QueryRefillState", ['order_id' => $order_id], 1);
             }
             elseif ($order_state == ORDER_STATE_NOEXIST) {
+                $neterr = false;
                 $logic_vr_order = Logic("vr_order");
                 $logic_vr_order->changeOrderStateCancel($order_info, '', "{$chname}查询订单不存在.",true,true);
 
@@ -738,8 +740,11 @@ class RefillBase
                 QueueClient::push("NotifyMerchantComplete", ['order_id' => $order_id, 'manual' => false]);
             }
             else {
+                $neterr = true;
                 QueueClient::async_push("QueryOrderNeterr",['order_id' => $order_id],30);
             }
+
+            util::monitor_netchk($chname,$neterr);
         }
 
         return true;

+ 2 - 2
helper/refill/util.php

@@ -525,9 +525,9 @@ class util
         queue\MonitorClient::instance()->onCallback($mchid,time(),$spec,$card_type,floatval($mch_amount),floatval($channel_amount),$succ);
     }
 
-    public static function monitor_neterr($chname)
+    public static function monitor_netchk($chname,$succ)
     {
-        queue\MonitorClient::instance()->onNetError($chname, time());
+        queue\MonitorClient::instance()->onNetCheck($chname, time(),$succ);
     }
 
     public static function monitor_commit($chname, $spec, $card_type, $channel_amount)

+ 1 - 1
plot/qreader.py

@@ -1,4 +1,4 @@
-from .refill import queueListener
+from refill import queueListener
 import signal as sig
 import sys,getopt
 import logging

+ 10 - 10
plot/refill/ChannelWriter.py

@@ -23,8 +23,8 @@ class ChannelWriter(DataWriteStream, ChPosmap):
         chname, time, spec, card_type, channel_amount = parse(params)
         path, pos = self.path_pos(chname, time, spec, card_type)
 
-        self.file[path][self.pos_map['commit_count'], pos] += 1
-        self.file[path][self.pos_map['commit_amounts'], pos] += channel_amount
+        self.file[path][self._pos_map['commit_count'], pos] += 1
+        self.file[path][self._pos_map['commit_amounts'], pos] += channel_amount
         pass
 
     def _onSucc(self, params):
@@ -34,9 +34,9 @@ class ChannelWriter(DataWriteStream, ChPosmap):
         chname, time, spec, card_type, channel_amount, period = parse(params)
         path, pos = self.path_pos(chname, time, spec, card_type)
 
-        self.file[path][self.pos_map['fail_count'], pos] += 1
-        self.file[path][self.pos_map['fail_amounts'], pos] += channel_amount
-        self.file[path][self.pos_map['fail_periods'], pos] += period
+        self.file[path][self._pos_map['succ_count'], pos] += 1
+        self.file[path][self._pos_map['succ_amounts'], pos] += channel_amount
+        self.file[path][self._pos_map['succ_periods'], pos] += period
         pass
 
     def _onFail(self, params):
@@ -46,18 +46,18 @@ class ChannelWriter(DataWriteStream, ChPosmap):
         chname, time, spec, card_type, channel_amount, period = parse(params)
         path, pos = self.path_pos(chname, time, spec, card_type)
 
-        self.file[path][self.pos_map['succ_count'], pos] += 1
-        self.file[path][self.pos_map['succ_amounts'], pos] += channel_amount
-        self.file[path][self.pos_map['succ_periods'], pos] += period
+        self.file[path][self._pos_map['fail_count'], pos] += 1
+        self.file[path][self._pos_map['fail_amounts'], pos] += channel_amount
+        self.file[path][self._pos_map['fail_periods'], pos] += period
         pass
 
     def path_pos(self, chname, time, spec, card_type):
         today = day_stamp(time)
-        path = f'/{today}/{chname}/{card_type}/{spec}'
+        path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
 
         hfive = self.file
         if path not in hfive:
-            dim = len(self.pos_map)
+            dim = len(self._pos_map)
             hfive[path] = np.zeros((dim, 86400))
 
         return path, time - today

+ 20 - 8
plot/refill/DataStream.py

@@ -1,7 +1,9 @@
 from abc import ABCMeta, abstractmethod,ABC
 from datetime import timedelta
+from mpi4py import MPI
+import h5py
 
-__all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'MchPosmap','ChPosmap']
+__all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'MchPosmap','ChPosmap','open_hdf5']
 
 def day_stamp(stamp):
     import time as stime
@@ -12,6 +14,12 @@ def day_stamp(stamp):
     today = stamp - diff.total_seconds()
     return int(today)
 
+def open_hdf5(file,is_wirte):
+    if is_wirte:
+        return h5py.File(file, 'a', driver='mpio', comm=MPI.COMM_WORLD)
+    else:
+        return h5py.File(file, 'r', driver='mpio', comm=MPI.COMM_WORLD)
+
 class DataWriteStream(metaclass=ABCMeta):
     def __init__(self,hfive):
         self._hfive = hfive
@@ -25,11 +33,6 @@ class DataWriteStream(metaclass=ABCMeta):
     @file.setter
     def file(self, value):
         self._hfive = value
-
-    @abstractmethod
-    def read(self):
-        pass
-
     @abstractmethod
     def write(self,method,params):
         pass
@@ -61,7 +64,8 @@ class DataReadStream(metaclass=ABCMeta):
             self._hfive.close()
 
 class MchPosmap(metaclass=ABCMeta):
-    pos_map = {
+    _version = 20200618
+    _pos_map = {
         'submit_count': 0, 'submit_amounts': 1,
         'succ_count': 2, 'succ_mch_amounts': 3, 'succ_ch_amounts': 4,
         'fail_count': 5, 'fail_mch_amounts': 6
@@ -69,11 +73,19 @@ class MchPosmap(metaclass=ABCMeta):
     pass
 
 class ChPosmap(metaclass=ABCMeta):
-    pos_map = {
+    _version = 20200618
+    _pos_map = {
         'commit_count': 0, 'commit_amounts': 1,
         'succ_count': 2,'succ_amounts': 3, 'succ_periods': 4,
         'fail_count': 5, 'fail_amounts': 6,'fail_periods': 7
     }
     pass
 
+class NetPosmap(metaclass=ABCMeta):
+    _version = 20200618
+    _pos_map = {
+        'succ_count': 0, 'fail_count': 1
+    }
+    pass
+
 

+ 29 - 27
plot/refill/MerchantWriter.py

@@ -1,60 +1,62 @@
 # from . import DataHandler #此时是导入文件
-from . DataStream import DataWriteStream,MchPosmap,day_stamp
+from .DataStream import DataWriteStream, MchPosmap, day_stamp
 import numpy as np
 
 __all__ = ['MerchantWriter']
 
+
 class MerchantWriter(DataWriteStream, MchPosmap):
-    def write(self,method,params):
+    def write(self, method, params):
         if method == 'refill_submit':
             self._onSubmit(params)
         elif method == 'refill_succ':
             self._onSucc(params)
-        else:
+        elif method == 'refill_fail':
             self._onFail(params)
-        pass
+        else:
+            pass
 
-    def _onSubmit(self,params):
+    def _onSubmit(self, params):
         def parse(input):
-            return  params['mchid'],params['time'],params['spec'],params['card_type'],params['mch_amount']
+            return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
 
-        mchid,time,spec,card_type,mch_amount = parse(params)
-        path,pos = self.path_pos(mchid,time,spec,card_type)
+        mchid, time, spec, card_type, mch_amount = parse(params)
+        path, pos = self.path_pos(mchid, time, spec, card_type)
 
-        self.file[path][self.pos_map['submit_count'],pos] += 1
-        self.file[path][self.pos_map['submit_amounts'],pos] += mch_amount
+        self.file[path][self._pos_map['submit_count'], pos] += 1
+        self.file[path][self._pos_map['submit_amounts'], pos] += mch_amount
         pass
 
-    def _onSucc(self,params):
+    def _onSucc(self, params):
         def parse(input):
-            return  params['mchid'],params['time'],params['spec'],params['card_type'],params['mch_amount'],params['channel_amount']
+            return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount']
 
-        mchid,time,spec,card_type,mch_amount,channel_amount = parse(params)
-        path,pos = self.path_pos(mchid,time,spec,card_type)
+        mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
+        path, pos = self.path_pos(mchid, time, spec, card_type)
 
-        self.file[path][self.pos_map['succ_count'],pos] += 1
-        self.file[path][self.pos_map['succ_mch_amounts'],pos] += mch_amount
-        self.file[path][self.pos_map['succ_ch_amounts'],pos] += channel_amount
+        self.file[path][self._pos_map['succ_count'], pos] += 1
+        self.file[path][self._pos_map['succ_mch_amounts'], pos] += mch_amount
+        self.file[path][self._pos_map['succ_ch_amounts'], pos] += channel_amount
         pass
 
-    def _onFail(self,params):
+    def _onFail(self, params):
         def parse(input):
-            return  params['mchid'],params['time'],params['spec'],params['card_type'],params['mch_amount']
+            return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
 
-        mchid,time,spec,card_type,mch_amount = parse(params)
-        path,pos = self.path_pos(mchid,time,spec,card_type)
+        mchid, time, spec, card_type, mch_amount = parse(params)
+        path, pos = self.path_pos(mchid, time, spec, card_type)
 
-        self.file[path][self.pos_map['fail_count'],pos] += 1
-        self.file[path][self.pos_map['fail_mch_amounts'],pos] += mch_amount
+        self.file[path][self._pos_map['fail_count'], pos] += 1
+        self.file[path][self._pos_map['fail_mch_amounts'], pos] += mch_amount
         pass
 
-    def path_pos(self,mchid,time,spec,card_type):
+    def path_pos(self, mchid, time, spec, card_type):
         today = day_stamp(time)
-        path = f'/{today}/{mchid}/{card_type}/{spec}'
+        path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
 
         hfive = self.file
         if path not in hfive:
-            dim = len(self.pos_map)
+            dim = len(self._pos_map)
             hfive[path] = np.zeros((dim, 86400))
 
-        return path, time - today
+        return path, time - today

+ 24 - 15
plot/refill/NeterrlWriter.py

@@ -1,39 +1,48 @@
 # from . import DataHandler #此时是导入文件
-from .DataStream import DataWriteStream, ChPosmap, day_stamp
+from .DataStream import DataWriteStream, NetPosmap, day_stamp
 import numpy as np
 
 __all__ = ['NeterrWriter']
 
 
-class NeterrWriter(DataWriteStream, ChPosmap):
+class NeterrWriter(DataWriteStream, NetPosmap):
     def write(self, method, params):
-        if method == 'channel_neterr':
-            self._onCommit(params)
-        elif method == 'notify_succ':
+        if method == 'net_succ':
             self._onSucc(params)
-        elif method == 'notify_fail':
+        elif method == 'net_fail':
             self._onFail(params)
         else:
             pass
 
-    def _onNeterr(self, params):
+    def _onSucc(self, params):
         def parse(input):
-            return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
+            return input['channel_name'], input['time']
 
-        chname, time, spec, card_type, channel_amount = parse(params)
-        path, pos = self.path_pos(chname, time, spec, card_type)
+        chname, time = parse(params)
+        path, pos = self.path_pos(chname, time)
 
-        self.file[path][self.pos_map['commit_count'], pos] += 1
-        self.file[path][self.pos_map['commit_amounts'], pos] += channel_amount
+        self.file[path][self._pos_map['succ_count'], pos] += 1
         pass
 
-    def path_pos(self, chname, time, spec, card_type):
+    def _onFail(self, params):
+        def parse(input):
+            return input['channel_name'], input['time']
+
+        chname, time = parse(params)
+        path, pos = self.path_pos(chname, time)
+
+        self.file[path][self._pos_map['fail_count'], pos] += 1
+        pass
+
+    def path_pos(self, chname, time):
         today = day_stamp(time)
-        path = f'/{today}/{chname}/{card_type}/{spec}'
+        path = f'/{self._version}/{today}/{chname}'
 
         hfive = self.file
         if path not in hfive:
-            dim = len(self.pos_map)
+            dim = len(self._pos_map)
             hfive[path] = np.zeros((dim, 86400))
 
         return path, time - today
+
+    pass

+ 22 - 16
plot/refill/QueueListener.py

@@ -2,16 +2,18 @@ import json
 import os
 import time as stime
 import redis
-from mpi4py import MPI
-import h5py
 import logging
+from .DataStream import open_hdf5
 from .MerchantWriter import MerchantWriter
 from .ChannelWriter import ChannelWriter
+from .NeterrlWriter import NeterrWriter
 
 log = logging.getLogger('listener')
 
+
 class QueueListener(object):
     _mQueueName = 'REFILL_MONITOR_QUEUE'
+
     def __init__(self):
         self._mQuit = False
         self._mRHost = ''
@@ -25,8 +27,8 @@ class QueueListener(object):
                 'name': '/var/www/html/data/stdata/channel.hdf5',
                 'handler': None
             },
-            'neterr': {
-                'name': '/var/www/html/data/stdata/neterr.hdf5',
+            'netchk': {
+                'name': '/var/www/html/data/stdata/netchk.hdf5',
                 'handler': None
             }
         }
@@ -41,27 +43,27 @@ class QueueListener(object):
         pass
 
     def prepare_data(self):
-        def open(file):
-            hfive = h5py.File(file, 'a')
-            return hfive
-
         while self._mQuit == False:
             try:
                 pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
                 r = redis.Redis(connection_pool=pool)
 
-                for _key,val in self._mHandlers.items():
-                    hfive = open(val['name'])
+                for _key, val in self._mHandlers.items():
+                    hfive = open_hdf5(val['name'],True)
                     if _key == 'merchant':
                         val['handler'] = MerchantWriter(hfive)
                     elif _key == 'channel':
                         val['handler'] = ChannelWriter(hfive)
+                    elif _key == 'netchk':
+                        val['handler'] = NeterrWriter(hfive)
+                    else:
+                        pass
+
                 log.debug('init file ok')
                 self.read(self._mQueueName, r)
                 r.close()
             except Exception as ex:
                 log.error(ex)
-                print(ex)
             finally:
                 self._close()
                 stime.sleep(1)
@@ -70,21 +72,24 @@ class QueueListener(object):
         log.debug('quit prepare data')
 
     def _close(self):
-        for _key,val in self._mHandlers.items():
+        for _key, val in self._mHandlers.items():
             handler = val['handler']
             if handler is not None:
                 handler.close()
                 val['handler'] = None
                 log.info(handler)
 
-    def read(self,queue,redis):
+    def read(self, queue, redis):
         def find_handler(method):
-            if method in ['refill_submit','refill_succ','refill_fail']:
+            if method in ['refill_submit', 'refill_succ', 'refill_fail']:
                 file_type = 'merchant'
-            elif method in ['notify_succ','notify_fail','channel_neterr','refill_commit']:
+            elif method in ['notify_succ', 'notify_fail', 'refill_commit']:
                 file_type = 'channel'
+            elif method in ['net_succ', 'net_fail']:
+                file_type = 'netchk'
             else:
                 return None
+
             return self._mHandlers[file_type]['handler']
 
         while self._mQuit == False:
@@ -98,9 +103,10 @@ class QueueListener(object):
                     params = val['params']
                     handler = find_handler(method)
                     if handler is not None:
-                        handler.write(method,params)
+                        handler.write(method, params)
                 except Exception as ex:
                     print(ex)
         pass
 
+
 queueListener = QueueListener()

+ 2 - 3
plot/refill/__init__.py

@@ -1,8 +1,7 @@
 
-from .DataStream import DataWriteStream
-from .DataStream import DataReadStream
+from .DataStream import DataWriteStream,DataReadStream,open_hdf5
 from .MerchantWriter import MerchantWriter
 from .ChannelWriter import ChannelWriter
 from .QueueListener import queueListener
 
-__all__ = ['DataWriteStream', 'DataReadStream', 'MerchantWriter', 'ChannelWriter', 'queueListener']
+__all__ = ['DataWriteStream', 'DataReadStream', 'MerchantWriter', 'ChannelWriter', 'queueListener','open_hdf5']