stanley-king 2 năm trước cách đây
mục cha
commit
ca763afe8f

+ 1 - 1
docker/compose/homecuda/statwriter/docker-compose.yml

@@ -10,7 +10,7 @@ services:
       - /mnt/shoplog:/var/www/html/data/log
       - /mnt/stdata:/var/www/html/data/stdata
     container_name: "panda-qreader"
-    command: ['python','qreader.py','-h', '192.168.3.46', '-p', '6379']
+    command: ['python','qreader.py','-h', '192.168.3.104', '-p', '6379']
     deploy:
       resources:
         limits:

+ 1 - 1
plot/refill/DataStream.py

@@ -26,7 +26,7 @@ def day_stamp(stamp):
 
 def open_hdf5(file, is_wirte):
     if is_wirte:
-        return h5py.File(file, 'a', libver='latest') #, swmr=True
+        return h5py.File(file, 'a', libver='latest')  # , swmr=True
     else:
         return h5py.File(file, 'r', libver='latest') #plot/test_h5py.py
 

+ 8 - 7
plot/refill/MerchantWriter.py

@@ -10,7 +10,6 @@ log = logging.getLogger('writer')
 
 class MerchantWriter(DataWriteStream):
     def write(self, method, params):
-        # self._lock.acquire()
         flush = True
         if method == 'mch_submit':
             self._onSubmit(params)
@@ -24,7 +23,6 @@ class MerchantWriter(DataWriteStream):
         # if flush:
         #     hfive = self.file
         #     hfive.flush()
-        # self._lock.release()
 
     def _onSubmit(self, params):
         def parse(input):
@@ -35,7 +33,8 @@ class MerchantWriter(DataWriteStream):
 
         rows = [pos_map.submit_count, pos_map.submit_amounts]
         vals = [1, mch_amount]
-        dset[rows, pos] += vals
+        # dset[pos,rows]  += vals
+        dset[rows,pos]  += vals
 
         # dset[pos_map.submit_count, pos] += 1
         # dset[pos_map.submit_amounts, pos] += mch_amount
@@ -66,7 +65,7 @@ class MerchantWriter(DataWriteStream):
 
         rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
         vals = [1, mch_amount]
-        dset[rows, pos] += vals
+        dset[pos,rows] += vals
 
         # dset[pos_map.fail_count, pos] += 1
         # dset[pos_map.fail_mch_amounts, pos] += mch_amount
@@ -76,12 +75,14 @@ class MerchantWriter(DataWriteStream):
         today = day_stamp(time)
         path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
 
-        # log.debug("%s,%s",'MerchantWriter',path)
+        log.debug("%s,%s",'MerchantWriter',path)
         hfive = self.file
         if path not in hfive:
             dim = pos_map.dim()
-            dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600))
-            dset[:, :] = np.zeros((dim, 86400))
+            # dset = hfive.create_dataset(path, (86400,dim), chunks=(3600,dim))
+            # dset[:, :] = np.zeros((86400,dim))
+            dset = hfive.create_dataset(path, (dim,86400), chunks=(dim,3600))
+            dset[:, :] = np.zeros((dim,86400))
             hfive.flush()
         else:
             dset = hfive[path]

+ 41 - 58
plot/refill/QueueListener.py

@@ -8,13 +8,16 @@ from .DataStream import open_hdf5
 from .MerchantWriter import MerchantWriter
 from .ChannelWriter import ChannelWriter
 from .NetchkWriter import NetchkWriter
+from .WriterConsumer import WriterConsumer
 
 import logging
+
 log = logging.getLogger('listener')
 
+
 class QueueListener(object):
-    _COUNT = 1
     _mQueueName = 'REFILL_MONITOR_QUEUE'
+
     def __init__(self):
         self._mQuit = False
         self._mRHost = ''
@@ -33,7 +36,7 @@ class QueueListener(object):
                 'handler': None
             }
         }
-        self._threads = list()
+        self._threads = dict()
         self._redis = list()
 
     def set_redis(self, rhost, rport):
@@ -47,60 +50,76 @@ class QueueListener(object):
         log.debug('stop')
         self._mQuit = True
 
+    def _read_queue(self, rclient, queue):
+        while self._mQuit == False:
+            item = rclient.brpop(queue, 1)
+            if item is None:
+                continue
+            else:
+                try:
+                    val = json.loads(item[1])
+                    method = val['method']
+                    params = val['params']
+                    self._dispatch(method, params)
+                except Exception as ex:
+                    log.error(ex)
+        pass
+
     def prepare_data(self):
+        while self._mQuit == False:
             try:
+                self._threads.clear()
                 for _key, val in self._mHandlers.items():
                     hfive = open_hdf5(val['name'], True)
                     if _key == 'merchant':
                         val['handler'] = MerchantWriter(hfive)
+                        self._threads['merchant'] = WriterConsumer(val['handler'])
                     elif _key == 'channel':
                         val['handler'] = ChannelWriter(hfive)
+                        self._threads['channel'] = WriterConsumer(val['handler'])
                     elif _key == 'netchk':
                         val['handler'] = NetchkWriter(hfive)
+                        self._threads['netchk'] = WriterConsumer(val['handler'])
                     else:
                         pass
 
-                self._pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
-                for i in range(0, self._COUNT):
-                    r = redis.Redis(connection_pool=self._pool)
-                    thread = Thread(target=read_queue,args=(self,r,self._mQueueName))
-                    self._redis.append(r)
-                    self._threads.append(thread)
+                for name, thread in self._threads.items():
                     thread.start()
 
-                log.debug('quit prepare data')
-                for thread in self._threads:
+                self._pool = redis.ConnectionPool(host=self._mRHost, port=self._mRPort, db=0)
+                r = redis.Redis(connection_pool=self._pool)
+                self._read_queue(r, self._mQueueName)
+
+                for name, thread in self._threads.items():
                     thread.join()
-                    log.debug('join')
+                    log.debug("thread %s has quit", name)
             except Exception as ex:
                 log.error(ex)
             finally:
                 self._close()
 
-    def write(self,method, params):
+    def _dispatch(self, method, params):
         def find_handler(method):
-            # log.debug(method)
             if method in ['mch_submit', 'mch_succ', 'mch_fail']:
                 file_type = 'merchant'
-            elif method in ['ch_commit','ch_succ', 'ch_fail']:
+            elif method in ['ch_commit', 'ch_succ', 'ch_fail']:
                 file_type = 'channel'
             elif method in ['net_succ', 'net_fail']:
                 file_type = 'netchk'
             else:
-                return None
-            return self._mHandlers[file_type]['handler']
+                file_type = None
+
+            return file_type
 
         try:
-            # log.debug('start write')
-            handler = find_handler(method)
-            if handler is not None:
-                handler.write(method, params)
-            # log.debug('end write')
+            file_type = find_handler(method)
+            if file_type is not None:
+                self._threads[file_type].put(method, params)
         except Exception as ex:
             log.error(ex)
 
     def flush(self):
-        for key,val in self._mHandlers.items():
+        for key, val in self._mHandlers.items():
             handler = val['handler']
             if handler is not None:
                 handler.flush()
@@ -114,40 +133,4 @@ class QueueListener(object):
                 log.info(handler)
 
 
-def bread_queue(listener: QueueListener, redis,queue):
-    while listener.has_stop() == False:
-        item = redis.brpop(queue, 1)
-        if item is None:
-            continue
-        else:
-            try:
-                val = json.loads(item[1])
-                method = val['method']
-                params = val['params']
-                listener.write(method, params)
-            except Exception as ex:
-                log.error(ex)
-    pass
-
-def read_queue(listener: QueueListener, redis,queue):
-    flush_time = 0
-    while listener.has_stop() == False:
-        item = redis.rpop(queue) #测试每秒性能在三万多.
-        if item is None:
-            time.sleep(1)
-        else:
-            try:
-                val = json.loads(item)
-                method = val['method']
-                params = val['params']
-                listener.write(method, params)
-                cur_time = int(time.time())
-
-                if cur_time > flush_time:
-                    listener.flush()
-                    flush_time = cur_time
-            except Exception as ex:
-                log.error(ex)
-    pass
-
 queueListener = QueueListener()

+ 33 - 0
plot/refill/WriterConsumer.py

@@ -0,0 +1,33 @@
+from threading import Thread, Condition
+from queue import Queue
+
+
+class WriterConsumer(Thread):
+    def __init__(self, handler):
+        Thread.__init__(self)
+        self._cond = Condition()
+        self._messages = Queue()
+        self._stopped = False
+        self._handler = handler
+
+    def run(self):
+        while True:
+            with self._cond:
+                while self._messages.empty():
+                    self._cond.wait()
+                while self._messages.empty() is False:
+                    method, params = self._messages.get()
+                    self._handler.write(method, params)
+                    self._messages.task_done()
+                if self._stopped == True:
+                    break
+
+    def put(self, method,params):
+        with self._cond:
+            self._messages.put((method, params))
+            self._cond.notify()
+
+    def quit(self):
+        with self._cond:
+            self._stopped = True
+            self._cond.notify()

+ 2 - 1
plot/refill/__init__.py

@@ -12,9 +12,10 @@ from .ChannelPainter import ChannelPainter,get_channels
 from .MerchantPainter import MerchantPainter,get_mchids
 from .helper import filter_chname, filter_cardtype, filter_mchids
 from .MerchantCalc import MerchantCalc
+from .WriterConsumer import WriterConsumer
 
 __all__ = ['DataWriteStream', 'DataReadStream',
-           'MerchantWriter', 'ChannelWriter', 'NetchkWriter',
+           'MerchantWriter', 'ChannelWriter', 'NetchkWriter','WriteConsumer',
            'MerchantReader', 'NetchkReader', 'ChannelReader',
            'ChannelPainter', 'MerchantPainter', 'get_channels', 'get_mchids',
            'queueListener', 'open_hdf5', 'day_stamp', 'time_border',

+ 15 - 0
plot/testPlot.py

@@ -134,6 +134,21 @@ class MyTestCase(unittest.TestCase):
                 log.error(ex)
 
 
+    def test_consumer(self):
+        from refill import WriterConsumer
+
+        class PrintHandler:
+            def write(self,msg):
+                log.debug(msg)
+
+        handler = PrintHandler()
+        consumer = WriterConsumer(handler)
+        consumer.start()
+
+        for i in range(1000000):
+            consumer.put(f'index = {i}')
+        consumer.quit()
+        consumer.join()
 
 
 if __name__ == '__main__':

+ 8 - 2
plot/thdf5.py

@@ -11,9 +11,15 @@ from SpeedDataCenter import speedDataCenter
 from PIL import Image
 import json
 
+import logging
+logging.basicConfig(filename='/var/www/html/data/log/qreader.log',
+                    format='%(levelname)10s  %(asctime)s  %(name)10s %(thread)d %(message)s',
+                    level=logging.DEBUG)
+log = logging.getLogger('starter')
+
 class DataTest(unittest.TestCase):
-    # __redis_host = '192.168.3.104'
-    __redis_host = '192.168.3.46'
+    __redis_host = '192.168.3.104'
+    # __redis_host = '192.168.3.46'
     def test_parase(self):
         try:
             dataCenter.parase('succ-lingzh-1-4-50-1618184676', '1')

+ 27 - 6
test/TestRefillMonitor.php

@@ -26,12 +26,33 @@ class TestRefillMonitor extends TestCase
 
     public function testPushMessage()
     {
-        $time = strtotime("2022-06-28");
-        foreach (range(1, 1000000) as $i) {
-            refill\util::monitor_submit(1092, 100, 4, 98.5, $time + $i);
-            refill\util::monitor_submit(1092, 100, 4, 98.5, $time + $i);
-            refill\util::monitor_submit(1092, 100, 4, 98.5, $time + $i);
-            refill\util::monitor_submit(1092, 100, 4, 98.5, $time + $i);
+        $gen_mchids = function ($start,$count)
+        {
+            $result = [];
+            for($i = $start; $i < $start + $count; ++$i) {
+                $result[] = $i;
+            }
+
+            return $result;
+        };
+
+        $time = strtotime("2022-07-27");
+        $mchids = $gen_mchids(1092,200);
+
+        foreach (range(1, 100000) as $i)
+        {
+            $cur_time = $time + rand(0, 86400);
+            $pos = rand(0, 200);
+            $mchid = $mchids[$pos];
+
+            refill\util::monitor_submit($mchid, 10, 4, 98.5, $cur_time);
+            refill\util::monitor_submit($mchid, 20, 5, 98.5, $cur_time);
+            refill\util::monitor_submit($mchid, 30, 6, 98.5, $cur_time);
+            refill\util::monitor_submit($mchid, 50, 4, 98.5, $cur_time);
+            refill\util::monitor_submit($mchid, 100, 5, 98.5, $cur_time);
+            refill\util::monitor_submit($mchid, 200, 6, 98.5, $cur_time);
+            refill\util::monitor_submit($mchid, 300, 5, 98.5, $cur_time);
+            refill\util::monitor_submit($mchid, 500, 6, 98.5, $cur_time);
         }
     }