stanley-king %!s(int64=2) %!d(string=hai) anos
pai
achega
4e15b85628

+ 2 - 1
core/framework/cache/cache.redis.php

@@ -316,11 +316,12 @@ class Cacheredis extends Cache
         }
     }
 
-    public function hIncrBy($name, $prefix, $key, $value)
+    public function hIncrBy($name, $key, $value)
     {
         $this->init_master();
         if (!$this->enable) return false;
 
+        $prefix = '';
         return $this->handler->hIncrBy($this->_key($name, $prefix), $key, $value);
     }
 

+ 1 - 1
data/config/dev/base.ini.php

@@ -132,7 +132,7 @@ $config['redis']['slave']['host']     	= SLAVE_REDISHOST;
 $config['redis']['slave']['port']     	= 6379;
 $config['redis']['slave']['pconnect'] 	= 0;
 
-$config['coroutine']['redis_host'] = 'host.docker.internal';
+$config['coroutine']['redis_host'] = COROUTINE_REDISHOST;
 $config['coroutine']['redis_port'] = 6379;
 
 //$config['fullindexer']['open']      = false;

+ 11 - 1
docker/compose/workcuda/conf/php/php-swoole.ini

@@ -1941,4 +1941,14 @@ ldap.max_links = -1
 ; tab-width: 4
 ; End:
 
-output_buffering = On
+output_buffering = On
+
+[xdebug]
+xdebug.default_enable=1
+xdebug.remote_enable=1
+xdebug.remote_handler=dbgp
+xdebug.remote_host=192.168.3.220
+xdebug.remote_port=9300
+xdebug.remote_mode=req
+xdebug.remote_autostart=1
+xdebug.idekey=PHPSTORM

+ 13 - 0
docker/compose/workcuda/statsec/docker-compose.yml

@@ -0,0 +1,13 @@
+version: "3.7"
+
+services:
+  qreader:
+    image: pycpu:3.7.10
+    volumes:
+      - ../../../../:/var/www/html
+      - ../conf/etc/localtime:/etc/localtime:ro
+      - /mnt/upload:/var/www/html/data/upload
+      - /mnt/shoplog:/var/www/html/data/log
+      - /mnt/stdatasec:/var/www/html/data/stdata
+    container_name: "panda-qreader"
+    command: ['mpiexec', '-n','4','python','qreader.py','-h', '192.168.3.46', '-p', '6379']

+ 1 - 1
docker/compose/workcuda/worker/docker-compose.yml

@@ -10,7 +10,7 @@ services:
       - /mnt/upload:/var/www/html/data/upload
       - /mnt/shoplog:/var/www/html/data/log
     container_name: "panda-codispatcher"
-    command: [php,"/var/www/html/rdispatcher/codispatcher.php","32"]
+    command: [php,"/var/www/html/rdispatcher/codispatcher.php","1"]
     deploy:
       resources:
         limits:

+ 6 - 6
helper/queue/monitor.php

@@ -36,17 +36,17 @@ class MonitorClient extends IJsonClient
     public function onSubmit($mchid,$time,$spec,$card_type,$mch_amount)
     {
         $params = ['mchid' => $mchid,'time' => $time,'spec' => $spec,'card_type' => $card_type,'mch_amount' => $mch_amount];
-        $this->push('refill_submit',$params);
+        $this->push('mch_submit',$params);
     }
 
     public function onCallback($mchid,$time,$spec,$card_type,$mch_amount,$channel_amount,$succ)
     {
         if($succ) {
             $params = ['mchid' => $mchid,'time' => $time,'spec' => $spec,'card_type' => $card_type,'mch_amount' => $mch_amount,'channel_amount' => $channel_amount];
-            $this->push('refill_succ',$params);
+            $this->push('mch_succ',$params);
         } else {
             $params = ['mchid' => $mchid,'time' => $time,'spec' => $spec,'card_type' => $card_type,'mch_amount' => $mch_amount];
-            $this->push('refill_fail',$params);
+            $this->push('mch_fail',$params);
         }
     }
 
@@ -63,16 +63,16 @@ class MonitorClient extends IJsonClient
     public function onCommit($chname,$time,$spec,$card_type,$channel_amount)
     {
         $params = ['channel_name' => $chname, 'time' => $time, 'spec' => $spec, 'card_type' => $card_type, 'channel_amount' => $channel_amount];
-        $this->push('refill_commit', $params);
+        $this->push('ch_commit', $params);
     }
 
     public function onNotify($chname,$time,$spec,$card_type,$channel_amount,$period,$succ)
     {
         $params = ['channel_name' => $chname, 'time' => $time, 'spec' => $spec, 'card_type' => $card_type, 'channel_amount' => $channel_amount, 'period' => $period];
         if ($succ) {
-            $this->push('notify_succ', $params);
+            $this->push('ch_succ', $params);
         } else {
-            $this->push('notify_fail', $params);
+            $this->push('ch_fail', $params);
         }
     }
 }

+ 1 - 1
plot/qreader.py

@@ -1,4 +1,4 @@
-from refill import queueListener
+from pandashop.plot.refill import queueListener
 import signal as sig
 import sys,getopt
 import logging

+ 17 - 14
plot/refill/ChannelWriter.py

@@ -1,17 +1,20 @@
 # from . import DataHandler #此时是导入文件
-from .DataStream import DataWriteStream, ChPosmap, day_stamp
+from .DataStream import DataWriteStream, day_stamp
+from .DataStream import EChPosmap as pos_map
 import numpy as np
 
 __all__ = ['ChannelWriter']
 
+import logging
+log = logging.getLogger('writer')
 
-class ChannelWriter(DataWriteStream, ChPosmap):
+class ChannelWriter(DataWriteStream):
     def write(self, method, params):
-        if method == 'refill_commit':
+        if method == 'ch_commit':
             self._onCommit(params)
-        elif method == 'notify_succ':
+        elif method == 'ch_succ':
             self._onSucc(params)
-        elif method == 'notify_fail':
+        elif method == 'ch_fail':
             self._onFail(params)
         else:
             pass
@@ -23,8 +26,8 @@ class ChannelWriter(DataWriteStream, ChPosmap):
         chname, time, spec, card_type, channel_amount = parse(params)
         path, pos = self.path_pos(chname, time, spec, card_type)
 
-        self.file[path][self._pos_map['commit_count'], pos] += 1
-        self.file[path][self._pos_map['commit_amounts'], pos] += channel_amount
+        self.file[path][pos_map.commit_count, pos] += 1
+        self.file[path][pos_map.commit_amounts, pos] += channel_amount
         pass
 
     def _onSucc(self, params):
@@ -34,9 +37,9 @@ class ChannelWriter(DataWriteStream, ChPosmap):
         chname, time, spec, card_type, channel_amount, period = parse(params)
         path, pos = self.path_pos(chname, time, spec, card_type)
 
-        self.file[path][self._pos_map['succ_count'], pos] += 1
-        self.file[path][self._pos_map['succ_amounts'], pos] += channel_amount
-        self.file[path][self._pos_map['succ_periods'], pos] += period
+        self.file[path][pos_map.succ_count, pos] += 1
+        self.file[path][pos_map.succ_amounts, pos] += channel_amount
+        self.file[path][pos_map.succ_periods, pos] += period
         pass
 
     def _onFail(self, params):
@@ -46,9 +49,9 @@ class ChannelWriter(DataWriteStream, ChPosmap):
         chname, time, spec, card_type, channel_amount, period = parse(params)
         path, pos = self.path_pos(chname, time, spec, card_type)
 
-        self.file[path][self._pos_map['fail_count'], pos] += 1
-        self.file[path][self._pos_map['fail_amounts'], pos] += channel_amount
-        self.file[path][self._pos_map['fail_periods'], pos] += period
+        self.file[path][pos_map.fail_count, pos] += 1
+        self.file[path][pos_map.fail_amounts, pos] += channel_amount
+        self.file[path][pos_map.fail_periods, pos] += period
         pass
 
     def path_pos(self, chname, time, spec, card_type):
@@ -57,7 +60,7 @@ class ChannelWriter(DataWriteStream, ChPosmap):
 
         hfive = self.file
         if path not in hfive:
-            dim = len(self._pos_map)
+            dim = pos_map.dim()
             hfive[path] = np.zeros((dim, 86400))
 
         return path, time - today

+ 81 - 33
plot/refill/DataStream.py

@@ -1,9 +1,11 @@
-from abc import ABCMeta, abstractmethod,ABC
+from abc import ABCMeta, abstractmethod, ABC
 from datetime import timedelta
 from mpi4py import MPI
 import h5py
+from enum import IntEnum
+
+__all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'EMchPosmap', 'EChPosmap', 'ENetPosmap', 'open_hdf5']
 
-__all__ = ['DataWriteStream', 'DataReadStream', 'day_stamp', 'MchPosmap','ChPosmap','open_hdf5']
 
 def day_stamp(stamp):
     import time as stime
@@ -14,14 +16,18 @@ def day_stamp(stamp):
     today = stamp - diff.total_seconds()
     return int(today)
 
-def open_hdf5(file,is_wirte):
+
+def open_hdf5(file, is_wirte):
     if is_wirte:
         return h5py.File(file, 'a', driver='mpio', comm=MPI.COMM_WORLD)
     else:
         return h5py.File(file, 'r', driver='mpio', comm=MPI.COMM_WORLD)
 
+
 class DataWriteStream(metaclass=ABCMeta):
-    def __init__(self,hfive):
+    _version = 20200618
+
+    def __init__(self, hfive):
         self._hfive = hfive
 
     # Getter function
@@ -33,16 +39,20 @@ class DataWriteStream(metaclass=ABCMeta):
     @file.setter
     def file(self, value):
         self._hfive = value
+
     @abstractmethod
-    def write(self,method,params):
+    def write(self, method, params):
         pass
 
     def close(self):
         if self._hfive is not None:
             self._hfive.close()
+pass
+
 
 class DataReadStream(metaclass=ABCMeta):
-    def __init__(self,hfive):
+    _version = 20200618
+    def __init__(self, hfive):
         self._hfive = hfive
 
     # Getter function
@@ -56,36 +66,74 @@ class DataReadStream(metaclass=ABCMeta):
         self._hfive = value
 
     @abstractmethod
-    def write(self,method,params):
+    def read(self, path):
         pass
 
     def close(self):
         if self._hfive is not None:
             self._hfive.close()
 
-class MchPosmap(metaclass=ABCMeta):
-    _version = 20200618
-    _pos_map = {
-        'submit_count': 0, 'submit_amounts': 1,
-        'succ_count': 2, 'succ_mch_amounts': 3, 'succ_ch_amounts': 4,
-        'fail_count': 5, 'fail_mch_amounts': 6
-    }
-    pass
-
-class ChPosmap(metaclass=ABCMeta):
-    _version = 20200618
-    _pos_map = {
-        'commit_count': 0, 'commit_amounts': 1,
-        'succ_count': 2,'succ_amounts': 3, 'succ_periods': 4,
-        'fail_count': 5, 'fail_amounts': 6,'fail_periods': 7
-    }
-    pass
-
-class NetPosmap(metaclass=ABCMeta):
-    _version = 20200618
-    _pos_map = {
-        'succ_count': 0, 'fail_count': 1
-    }
-    pass
-
-
+    def _days(self, root):
+        result = []
+        try:
+            for name, sub in root.items():
+                if isinstance(sub, h5py.Group):
+                    result.append(name)
+        except Exception as ex:
+            print(ex)
+        finally:
+            return result
+
+    def _root_path(self):
+        return f'/{self._version}'
+
+    def days(self):
+        try:
+            root_ptah = self._root_path()
+            root = self.file.require_group(root_ptah)
+            days = self._days(root)
+            return days
+        except Exception as ex:
+            print(ex)
+            return []
+pass
+
+
+class EMchPosmap(IntEnum):
+    submit_count = 0
+    submit_amounts = 1
+    succ_count = 2
+    succ_mch_amounts = 3
+    succ_ch_amounts = 4
+    fail_count = 5
+    fail_mch_amounts = 6
+
+    @staticmethod
+    def dim():
+        return 7
+pass
+
+
+class EChPosmap(IntEnum):
+    commit_count = 0
+    commit_amounts = 1
+    succ_count = 2
+    succ_amounts = 3
+    succ_periods = 4
+    fail_count = 5
+    fail_amounts = 6
+    fail_periods = 7
+
+    @staticmethod
+    def dim():
+        return 8
+pass
+
+class ENetPosmap(IntEnum):
+    succ_count = 0
+    fail_count = 1
+
+    @staticmethod
+    def dim():
+        return 2
+pass

+ 23 - 0
plot/refill/MerchantReader.py

@@ -0,0 +1,23 @@
+# from . import DataHandler #此时是导入文件
+from .DataStream import DataReadStream, day_stamp,open_hdf5
+from .DataStream import EMchPosmap as pos_map
+import numpy as np
+
+__all__ = ['MerchantReader']
+
+import logging
+log = logging.getLogger('writer')
+
+class MerchantReader(DataReadStream):
+    def __init__(self):
+        file = '/var/www/html/data/stdata/merchant.hdf5'
+        hfive = open_hdf5(file,True)
+        super(MerchantReader,self).__init__(hfive)
+
+    # def __del__(self):
+    #     self.close()
+    #     pass
+
+    def read(self, path):
+        pass
+pass

+ 16 - 13
plot/refill/MerchantWriter.py

@@ -1,17 +1,20 @@
 # from . import DataHandler #此时是导入文件
-from .DataStream import DataWriteStream, MchPosmap, day_stamp
+from .DataStream import DataWriteStream, day_stamp
+from .DataStream import EMchPosmap as pos_map
 import numpy as np
 
 __all__ = ['MerchantWriter']
 
+import logging
+log = logging.getLogger('writer')
 
-class MerchantWriter(DataWriteStream, MchPosmap):
+class MerchantWriter(DataWriteStream):
     def write(self, method, params):
-        if method == 'refill_submit':
+        if method == 'mch_submit':
             self._onSubmit(params)
-        elif method == 'refill_succ':
+        elif method == 'mch_succ':
             self._onSucc(params)
-        elif method == 'refill_fail':
+        elif method == 'mch_fail':
             self._onFail(params)
         else:
             pass
@@ -23,8 +26,8 @@ class MerchantWriter(DataWriteStream, MchPosmap):
         mchid, time, spec, card_type, mch_amount = parse(params)
         path, pos = self.path_pos(mchid, time, spec, card_type)
 
-        self.file[path][self._pos_map['submit_count'], pos] += 1
-        self.file[path][self._pos_map['submit_amounts'], pos] += mch_amount
+        self.file[path][pos_map.submit_count, pos] += 1
+        self.file[path][pos_map.submit_amounts, pos] += mch_amount
         pass
 
     def _onSucc(self, params):
@@ -34,9 +37,9 @@ class MerchantWriter(DataWriteStream, MchPosmap):
         mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
         path, pos = self.path_pos(mchid, time, spec, card_type)
 
-        self.file[path][self._pos_map['succ_count'], pos] += 1
-        self.file[path][self._pos_map['succ_mch_amounts'], pos] += mch_amount
-        self.file[path][self._pos_map['succ_ch_amounts'], pos] += channel_amount
+        self.file[path][pos_map.succ_count, pos] += 1
+        self.file[path][pos_map.succ_mch_amounts, pos] += mch_amount
+        self.file[path][pos_map.succ_ch_amounts, pos] += channel_amount
         pass
 
     def _onFail(self, params):
@@ -46,8 +49,8 @@ class MerchantWriter(DataWriteStream, MchPosmap):
         mchid, time, spec, card_type, mch_amount = parse(params)
         path, pos = self.path_pos(mchid, time, spec, card_type)
 
-        self.file[path][self._pos_map['fail_count'], pos] += 1
-        self.file[path][self._pos_map['fail_mch_amounts'], pos] += mch_amount
+        self.file[path][pos_map.fail_count, pos] += 1
+        self.file[path][pos_map.fail_mch_amounts, pos] += mch_amount
         pass
 
     def path_pos(self, mchid, time, spec, card_type):
@@ -56,7 +59,7 @@ class MerchantWriter(DataWriteStream, MchPosmap):
 
         hfive = self.file
         if path not in hfive:
-            dim = len(self._pos_map)
+            dim = pos_map.dim()
             hfive[path] = np.zeros((dim, 86400))
 
         return path, time - today

+ 9 - 6
plot/refill/NeterrlWriter.py

@@ -1,11 +1,14 @@
 # from . import DataHandler #此时是导入文件
-from .DataStream import DataWriteStream, NetPosmap, day_stamp
+from .DataStream import DataWriteStream, day_stamp
+from .DataStream import ENetPosmap as pos_map
 import numpy as np
 
-__all__ = ['NeterrWriter']
+__all__ = ['NetchkWriter']
 
+import logging
+log = logging.getLogger('writer')
 
-class NeterrWriter(DataWriteStream, NetPosmap):
+class NetchkWriter(DataWriteStream):
     def write(self, method, params):
         if method == 'net_succ':
             self._onSucc(params)
@@ -21,7 +24,7 @@ class NeterrWriter(DataWriteStream, NetPosmap):
         chname, time = parse(params)
         path, pos = self.path_pos(chname, time)
 
-        self.file[path][self._pos_map['succ_count'], pos] += 1
+        self.file[path][pos_map.succ_count, pos] += 1
         pass
 
     def _onFail(self, params):
@@ -31,7 +34,7 @@ class NeterrWriter(DataWriteStream, NetPosmap):
         chname, time = parse(params)
         path, pos = self.path_pos(chname, time)
 
-        self.file[path][self._pos_map['fail_count'], pos] += 1
+        self.file[path][pos_map.fail_count, pos] += 1
         pass
 
     def path_pos(self, chname, time):
@@ -40,7 +43,7 @@ class NeterrWriter(DataWriteStream, NetPosmap):
 
         hfive = self.file
         if path not in hfive:
-            dim = len(self._pos_map)
+            dim = pos_map.dim()
             hfive[path] = np.zeros((dim, 86400))
 
         return path, time - today

+ 8 - 8
plot/refill/QueueListener.py

@@ -2,15 +2,15 @@ import json
 import os
 import time as stime
 import redis
-import logging
+
 from .DataStream import open_hdf5
 from .MerchantWriter import MerchantWriter
 from .ChannelWriter import ChannelWriter
-from .NeterrlWriter import NeterrWriter
+from .NetchkWriter import NetchkWriter
 
+import logging
 log = logging.getLogger('listener')
 
-
 class QueueListener(object):
     _mQueueName = 'REFILL_MONITOR_QUEUE'
 
@@ -55,11 +55,10 @@ class QueueListener(object):
                     elif _key == 'channel':
                         val['handler'] = ChannelWriter(hfive)
                     elif _key == 'netchk':
-                        val['handler'] = NeterrWriter(hfive)
+                        val['handler'] = NetchkWriter(hfive)
                     else:
                         pass
 
-                log.debug('init file ok')
                 self.read(self._mQueueName, r)
                 r.close()
             except Exception as ex:
@@ -81,9 +80,10 @@ class QueueListener(object):
 
     def read(self, queue, redis):
         def find_handler(method):
-            if method in ['refill_submit', 'refill_succ', 'refill_fail']:
+            log.debug(method)
+            if method in ['mch_submit', 'mch_succ', 'mch_fail']:
                 file_type = 'merchant'
-            elif method in ['notify_succ', 'notify_fail', 'refill_commit']:
+            elif method in ['ch_commit','ch_succ', 'ch_fail']:
                 file_type = 'channel'
             elif method in ['net_succ', 'net_fail']:
                 file_type = 'netchk'
@@ -105,7 +105,7 @@ class QueueListener(object):
                     if handler is not None:
                         handler.write(method, params)
                 except Exception as ex:
-                    print(ex)
+                    log.error(ex)
         pass
 
 

+ 8 - 2
plot/refill/__init__.py

@@ -1,7 +1,13 @@
 
+from .QueueListener import queueListener
 from .DataStream import DataWriteStream,DataReadStream,open_hdf5
 from .MerchantWriter import MerchantWriter
 from .ChannelWriter import ChannelWriter
-from .QueueListener import queueListener
+from .NetchkWriter import NetchkWriter
+
+from .MerchantReader import MerchantReader
 
-__all__ = ['DataWriteStream', 'DataReadStream', 'MerchantWriter', 'ChannelWriter', 'queueListener','open_hdf5']
+__all__ = ['DataWriteStream', 'DataReadStream',
+           'MerchantWriter', 'ChannelWriter','NetchkWriter',
+           'MerchantReader',
+           'queueListener','open_hdf5']

+ 15 - 6
plot/testPlot.py

@@ -1,21 +1,30 @@
 import unittest
-from refill.MerchantWriter import MerchantWriter
+import logging
 
+logging.basicConfig(filename='/var/www/html/data/log/qreader.log', level=logging.DEBUG)
+log = logging.getLogger('reader')
 
 class MyTestCase(unittest.TestCase):
-    __redis_host = '192.168.3.104'
-    # __redis_host = '192.168.3.46'
+    # __redis_host = '192.168.3.104'
+    __redis_host = '192.168.3.46'
     def test_something(self):
         self.assertEqual(True, False)  # add assertion here
 
-    def test_handler(self):
-        handler = MerchantWriter(None)
-        pass
     def test_listener(self):
         from refill import queueListener
         queueListener.set_redis(self.__redis_host,'6379')
         queueListener.prepare_data()
 
+    def testDays(self):
+        from refill import MerchantReader
+
+        try:
+            reader = MerchantReader()
+            days = reader.days()
+        except Exception as ex:
+            log.error(ex)
+        pass
+
     def test_jsonLoads(self):
         import json
         try:

+ 1 - 1
plot/thdf5.py

@@ -8,7 +8,7 @@ import numpy as np
 from DataCenter import dataCenter
 from MchDataCenter import mchDataCenter
 from SpeedDataCenter import speedDataCenter
-from . refill import queueListener
+from pandashop.plot.refill import queueListener
 
 from PIL import Image
 import json