stanley-king 2 gadi atpakaļ
vecāks
revīzija
0d1a08974e

+ 77 - 13
plot/refill/ChannelWriter.py

@@ -9,6 +9,11 @@ import logging
 log = logging.getLogger('writer')
 
 class ChannelWriter(DataWriteStream):
+    def __init__(self, hfive):
+        DataWriteStream.__init__(self, hfive)
+        dim = pos_map.dim()
+        self._cache = np.zeros((dim, 86400))
+
     def write(self, method, params):
         flush = True
         if method == 'ch_commit':
@@ -25,6 +30,45 @@ class ChannelWriter(DataWriteStream):
             hfive.flush()
         pass
 
+    def write_batch(self, batches):
+        for path, items in batches.items():
+            self._write_batch(path, items)
+
+    def _write_batch(self, path, messages):
+        def _commit(params):
+            return input['time'], input['channel_amount']
+        def _succ(input):
+            return input['time'], input['channel_amount'], input['period']
+        def _fail(params):
+            return input['time'], input['channel_amount'], input['period']
+        def _pos(time):
+            today = day_stamp(time)
+            return time - today
+
+        dset = self._data_set(path=path)
+        self._cache[:, :] = dset
+
+        for method, params in messages:
+            if method == 'ch_commit':
+                time, channel_amount = _commit(params)
+                pos = _pos(time)
+                rows = [pos_map.commit_count, pos_map.commit_amounts]
+                self._cache[rows, pos] += [1, channel_amount]
+            elif method == 'ch_succ':
+                time, channel_amount, period = _succ(params)
+                pos = _pos(time)
+                rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
+                self._cache[rows, pos] += [1, channel_amount, period]
+            elif method == 'ch_fail':
+                time, channel_amount, period = _fail(params)
+                pos = _pos(time)
+                rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
+                self._cache[rows, pos] += [1, channel_amount, period]
+
+        dset[:, :] = self._cache
+        hfive = self.file
+        hfive.flush()
+
     def _onCommit(self, params):
         def parse(input):
             return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
@@ -35,9 +79,6 @@ class ChannelWriter(DataWriteStream):
         rows = [pos_map.commit_count, pos_map.commit_amounts]
         vals = [1, channel_amount]
         dset[rows, pos] += vals
-
-        # dset[pos_map.commit_count, pos] += 1
-        # dset[pos_map.commit_amounts, pos] += channel_amount
         pass
 
     def _onSucc(self, params):
@@ -50,10 +91,6 @@ class ChannelWriter(DataWriteStream):
         rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
         vals = [1, channel_amount, period]
         dset[rows, pos] += vals
-
-        # dset[pos_map.succ_count, pos] += 1
-        # dset[pos_map.succ_amounts, pos] += channel_amount
-        # dset[pos_map.succ_periods, pos] += period
         pass
 
     def _onFail(self, params):
@@ -66,17 +103,45 @@ class ChannelWriter(DataWriteStream):
         rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
         vals = [1, channel_amount, period]
         dset[rows, pos] += vals
-
-        # dset[pos_map.fail_count, pos] += 1
-        # dset[pos_map.fail_amounts, pos] += channel_amount
-        # dset[pos_map.fail_periods, pos] += period
         pass
 
+    def get_path(self, method, params):
+        def _path(chname, time, spec, card_type):
+            today = day_stamp(time)
+            path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
+            return path
+        def _submit(input):
+            return input['channel_name'], input['time'], input['spec'], input['card_type']
+        def _succ(input):
+            return input['channel_name'], input['time'], input['spec'], input['card_type']
+        def _fail(input):
+            return input['channel_name'], input['time'], input['spec'], input['card_type']
+
+        if method == 'ch_commit':
+            func = _submit
+        elif method == 'ch_succ':
+            func = _succ
+        elif method == 'ch_fail':
+            func = _fail
+        else:
+            func = None
+
+        if func is not None:
+            chname, time, spec, card_type = func(params)
+            path = _path(chname, time, spec, card_type)
+            return path
+        else:
+            return None
+
     def path_pos(self, chname, time, spec, card_type):
         today = day_stamp(time)
         path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
         log.debug("%s,%s", 'ChannelWriter', path)
 
+        dset = self._data_set(path=path)
+        return dset, time - today
+
+    def _data_set(self, path):
         hfive = self.file
         if path not in hfive:
             dim = pos_map.dim()
@@ -85,5 +150,4 @@ class ChannelWriter(DataWriteStream):
             hfive.flush()
         else:
             dset = hfive[path]
-
-        return dset, time - today
+        return dset

+ 9 - 16
plot/refill/MerchantWriter.py

@@ -13,6 +13,8 @@ log = logging.getLogger('writer')
 class MerchantWriter(DataWriteStream):
     def __init__(self, hfive):
         DataWriteStream.__init__(self, hfive)
+        dim = pos_map.dim()
+        self._cache = np.zeros((dim, 86400))
 
     def write(self, method, params):
         flush = True
@@ -36,42 +38,35 @@ class MerchantWriter(DataWriteStream):
     def _write_batch(self, path, messages):
         def _submit(params):
             return params['time'], params['mch_amount']
-
         def _succ(params):
             return params['time'], params['mch_amount'], params['channel_amount']
-
         def _fail(params):
             return params['time'], params['mch_amount']
-
         def _pos(time):
             today = day_stamp(time)
             return time - today
 
         dset = self._data_set(path=path)
-        dim = pos_map.dim()
-        cache = np.zeros((dim, 86400))
-        cache[:, :] = dset
+        self._cache[:, :] = dset
 
         for method, params in messages:
             if method == 'mch_submit':
                 time, mch_amount = _submit(params)
                 pos = _pos(time)
                 rows = [pos_map.submit_count, pos_map.submit_amounts]
-                vals = [1, mch_amount]
-                cache[rows, pos] += vals
+                self._cache[rows, pos] += [1, mch_amount]
             elif method == 'mch_succ':
                 time, mch_amount, channel_amount = _succ(params)
                 pos = _pos(time)
                 rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
-                vals = [1, mch_amount, channel_amount]
-                cache[rows, pos] += vals
+                self._cache[rows, pos] += [1, mch_amount, channel_amount]
             elif method == 'mch_fail':
                 time, mch_amount = _fail(params)
                 pos = _pos(time)
                 rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
-                vals = [1, mch_amount]
-                cache[rows, pos] += vals
-        dset[:, :] = cache
+                self._cache[rows, pos] += [1, mch_amount]
+
+        dset[:, :] = self._cache
         hfive = self.file
         hfive.flush()
 
@@ -118,10 +113,8 @@ class MerchantWriter(DataWriteStream):
 
         def _submit(params):
             return params['mchid'], params['time'], params['spec'], params['card_type']
-
         def _succ(params):
             return params['mchid'], params['time'], params['spec'], params['card_type']
-
         def _fail(params):
             return params['mchid'], params['time'], params['spec'], params['card_type']
 
@@ -158,4 +151,4 @@ class MerchantWriter(DataWriteStream):
             hfive.flush()
         else:
             dset = hfive[path]
-        return dset
+        return dset

+ 64 - 2
plot/refill/NetchkWriter.py

@@ -11,6 +11,11 @@ log = logging.getLogger('writer')
 
 
 class NetchkWriter(DataWriteStream):
+    def __init__(self, hfive):
+        DataWriteStream.__init__(self, hfive)
+        dim = pos_map.dim()
+        self._cache = np.zeros((dim, 86400))
+
     def write(self, method, params):
         flush = True
         if method == 'net_succ':
@@ -24,6 +29,36 @@ class NetchkWriter(DataWriteStream):
             hfive = self.file
             hfive.flush()
 
+    def write_batch(self, batches):
+        for path, items in batches.items():
+            self._write_batch(path, items)
+
+    def _write_batch(self, path, messages):
+        def _succ(input):
+            return input['time']
+        def _fail(input):
+            return input['time']
+        def _pos(time):
+            today = day_stamp(time)
+            return time - today
+
+        dset = self._data_set(path=path)
+        self._cache[:, :] = dset
+
+        for method, params in messages:
+            if method == 'net_succ':
+                time = _succ(params)
+                pos = _pos(time)
+                self._cache[pos_map.succ_count, pos] += 1
+            elif method == 'net_fail':
+                time = _fail(params)
+                pos = _pos(time)
+                self._cache[pos_map.fail_count, pos] += 1
+
+        dset[:, :] = self._cache
+        hfive = self.file
+        hfive.flush()
+
     def _onSucc(self, params):
         def parse(input):
             return input['channel_name'], input['time']
@@ -44,11 +79,39 @@ class NetchkWriter(DataWriteStream):
         dset[pos_map.fail_count, pos] += 1
         pass
 
+    def get_path(self, method, params):
+        def _path(chname, time):
+            today = day_stamp(time)
+            path = f'/{self._version}/{today}/{chname}'
+            return path
+        def _succ(input):
+            return input['channel_name'], input['time']
+        def _fail(input):
+            return input['channel_name'], input['time']
+
+        if method == 'net_succ':
+            func = _succ
+        elif method == 'net_fail':
+            func = _fail
+        else:
+            func = None
+
+        if func is not None:
+            chname, time = func(params)
+            path = _path(chname, time)
+            return path
+        else:
+            return None
+
     def path_pos(self, chname, time):
         today = day_stamp(time)
         path = f'/{self._version}/{today}/{chname}'
         log.debug("%s,%s", 'NetchkWriter', path)
 
+        dset = self._data_set(path=path)
+        return dset, time - today
+
+    def _data_set(self, path):
         hfive = self.file
         if path not in hfive:
             dim = pos_map.dim()
@@ -57,5 +120,4 @@ class NetchkWriter(DataWriteStream):
             hfive.flush()
         else:
             dset = hfive[path]
-
-        return dset, time - today
+        return dset

+ 3 - 3
plot/refill/QueueListener.py

@@ -73,13 +73,13 @@ class QueueListener(object):
                     hfive = open_hdf5(val['name'], True)
                     if _key == 'merchant':
                         val['handler'] = MerchantWriter(hfive)
-                        self._threads['merchant'] = WriterConsumer(val['handler'])
+                        self._threads['merchant'] = WriterConsumer(val['handler'],'merchant')
                     elif _key == 'channel':
                         val['handler'] = ChannelWriter(hfive)
-                        self._threads['channel'] = WriterConsumer(val['handler'])
+                        self._threads['channel'] = WriterConsumer(val['handler'],'channel')
                     elif _key == 'netchk':
                         val['handler'] = NetchkWriter(hfive)
-                        self._threads['netchk'] = WriterConsumer(val['handler'])
+                        self._threads['netchk'] = WriterConsumer(val['handler'],'netchk')
                     else:
                         pass
 

+ 27 - 21
plot/refill/WriterConsumer.py

@@ -5,40 +5,47 @@ import logging
 log = logging.getLogger('consumer')
 
 class WriterConsumer(Thread):
-    def __init__(self, handler):
+    def __init__(self, handler, name):
         Thread.__init__(self)
-        self._messages = Queue(maxsize=10000)
+        self._messages = Queue()
         self._stopped = False
         self._handler = handler
-        self._max_threshold = 1000
+        self._max_threshold = 0 #10000
+        self._name = name
 
     def run(self):
-        while self._stopped == False:
-            while True:
-                size = self._messages.qsize()
-                log.debug("messages size=%d",size)
-                if size > self._max_threshold:
-                    self._batch()
-                else:
-                    self._single()
+        while True:
+            size = self._messages.qsize()
+            log.debug("%s messages size=%d", self._name, size)
+            if size > self._max_threshold:
+                self._batch()
+            else:
+                self._single()
+            if self._stopped and self._messages.empty():
+                break
+
     def _batch(self):
-        batches = dict()
+        log.debug("%s start collect batch count", self._name)
         try:
-            while True:
-                method, params = self._messages.get(timeout=1)
+            batches = dict()
+            count = 0;
+            while count < 200000:
+                method, params = self._messages.get(False)
                 self._messages.task_done()
                 path = self._handler.get_path(method, params)
                 if path is None:
                     continue
-
                 if path not in batches:
                     batches[path] = list()
                 batches[path].append((method,params))
+                count += 1
         except Empty as ex:
-            log.info('Consumer Queue is Empty')
+            log.info("%s consumer queue is empty", self._name)
         finally:
-            self._handler.write_batch(batches)
-        pass
+            log.info("%s collect batch count=%d", self._name, count)
+            if count > 0:
+                self._handler.write_batch(batches)
+            log.info("%s write batch count=%d", self._name, count)
 
     def _single(self):
         try:
@@ -46,14 +53,13 @@ class WriterConsumer(Thread):
             self._handler.write(method, params)
             self._messages.task_done()
         except Empty as ex:
-            log.info('Consumer Queue is Empty')
-        pass
+            log.info("%s consumer queue is empty", self._name)
 
     def put(self, method,params):
             self._messages.put((method, params))
 
     def quit(self):
-        log.debug("thread has quit")
+        log.debug("%s thread has quit", self._name)
         self._stopped = True
         self._messages.join()
 

+ 1 - 1
plot/refill/WriterConsumerEx.py

@@ -2,7 +2,7 @@ from threading import Thread, Condition
 from queue import Queue
 
 
-class WriterConsumer(Thread):
+class WriterConsumerEx(Thread):
     def __init__(self, handler):
         Thread.__init__(self)
         self._cond = Condition()

+ 1 - 1
plot/testPlot.py

@@ -143,7 +143,7 @@ class MyTestCase(unittest.TestCase):
                 time.sleep(0.01)
 
         handler = PrintHandler()
-        consumer = WriterConsumer(handler)
+        consumer = WriterConsumer(handler,'PrintHandler')
         consumer.start()
 
         for i in range(100000):

+ 12 - 0
test/TestRefillMonitor.php

@@ -58,6 +58,18 @@ class TestRefillMonitor extends TestCase
         }
     }
 
+    public function testPushEqualMsg()
+    {
+        $time = strtotime("2022-07-27");
+        $cur_time = $time + 10;
+
+        for($i = 0; $i < 10; $i++) {
+            $mchid = 1092;
+            refill\util::monitor_submit($mchid, 100, 5, 98.5, $cur_time);
+        }
+
+    }
+
     public function testAddMTimes()
     {
         $mtimes = [