stanley-king 2 vuotta sitten
vanhempi
commit
ff4f54ec02
3 muutettua tiedostoa jossa 127 lisäystä ja 45 poistoa
  1. 90 36
      plot/refill/MerchantWriter.py
  2. 35 7
      plot/refill/WriterConsumer.py
  3. 2 2
      plot/thdf5.py

+ 90 - 36
plot/refill/MerchantWriter.py

@@ -6,13 +6,13 @@ import numpy as np
 __all__ = ['MerchantWriter']
 
 import logging
+
 log = logging.getLogger('writer')
 
+
 class MerchantWriter(DataWriteStream):
     def __init__(self, hfive):
-        DataWriteStream.__init__(self,hfive)
-        dim = pos_map.dim()
-        # self._cache = np.zeros((dim, 86400))
+        DataWriteStream.__init__(self, hfive)
 
     def write(self, method, params):
         flush = True
@@ -29,8 +29,54 @@ class MerchantWriter(DataWriteStream):
             hfive = self.file
             hfive.flush()
 
+    def write_batch(self, batches):
+        for path, items in batches.items():
+            self._write_batch(path, items)
+
+    def _write_batch(self, path, messages):
+        def _submit(params):
+            return params['time'], params['mch_amount']
+
+        def _succ(params):
+            return params['time'], params['mch_amount'], params['channel_amount']
+
+        def _fail(params):
+            return params['time'], params['mch_amount']
+
+        def _pos(time):
+            today = day_stamp(time)
+            return time - today
+
+        dset = self._data_set(path=path)
+        dim = pos_map.dim()
+        cache = np.zeros((dim, 86400))
+        cache[:, :] = dset
+
+        for method, params in messages:
+            if method == 'mch_submit':
+                time, mch_amount = _submit(params)
+                pos = _pos(time)
+                rows = [pos_map.submit_count, pos_map.submit_amounts]
+                vals = [1, mch_amount]
+                cache[rows, pos] += vals
+            elif method == 'mch_succ':
+                time, mch_amount, channel_amount = _succ(params)
+                pos = _pos(time)
+                rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
+                vals = [1, mch_amount, channel_amount]
+                cache[rows, pos] += vals
+            elif method == 'mch_fail':
+                time, mch_amount = _fail(params)
+                pos = _pos(time)
+                rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
+                vals = [1, mch_amount]
+                cache[rows, pos] += vals
+        dset[:, :] = cache
+        hfive = self.file
+        hfive.flush()
+
     def _onSubmit(self, params):
-        def parse(input):
+        def parse(params):
             return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
 
         mchid, time, spec, card_type, mch_amount = parse(params)
@@ -38,22 +84,10 @@ class MerchantWriter(DataWriteStream):
         rows = [pos_map.submit_count, pos_map.submit_amounts]
         vals = [1, mch_amount]
         dset[rows, pos] += vals
-
-        #read_direct 方式,效果不好
-        # dim = pos_map.dim()
-        # x = np.s_[pos:dim * 86400:86400]
-        # dset.read_direct(self._cache, source_sel=np.s_[pos:dim * 86400:86400],dest_sel=np.s_[pos:dim * 86400:86400])
-        # dset.read_direct(self._cache)
-
-        #数据维度调换试验
-        # dset[pos,rows]  += vals
-
-        # dset[pos_map.submit_count, pos] += 1
-        # dset[pos_map.submit_amounts, pos] += mch_amount
         pass
 
     def _onSucc(self, params):
-        def parse(input):
+        def parse(params):
             return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount']
 
         mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
@@ -62,14 +96,10 @@ class MerchantWriter(DataWriteStream):
         rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
         vals = [1, mch_amount, channel_amount]
         dset[rows, pos] += vals
-
-        # dset[pos_map.succ_count, pos] += 1
-        # dset[pos_map.succ_mch_amounts, pos] += mch_amount
-        # dset[pos_map.succ_ch_amounts, pos] += channel_amount
         pass
 
     def _onFail(self, params):
-        def parse(input):
+        def parse(params):
             return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
 
         mchid, time, spec, card_type, mch_amount = parse(params)
@@ -78,30 +108,54 @@ class MerchantWriter(DataWriteStream):
         rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
         vals = [1, mch_amount]
         dset[rows, pos] += vals
-
-        # dset[pos_map.fail_count, pos] += 1
-        # dset[pos_map.fail_mch_amounts, pos] += mch_amount
         pass
 
+    def get_path(self, method, params):
+        def _path(mchid, time, spec, card_type):
+            today = day_stamp(time)
+            path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
+            return path
+
+        def _submit(params):
+            return params['mchid'], params['time'], params['spec'], params['card_type']
+
+        def _succ(params):
+            return params['mchid'], params['time'], params['spec'], params['card_type']
+
+        def _fail(params):
+            return params['mchid'], params['time'], params['spec'], params['card_type']
+
+        if method == 'mch_submit':
+            func = _submit
+        elif method == 'mch_succ':
+            func = _succ
+        elif method == 'mch_fail':
+            func = _fail
+        else:
+            func = None
+
+        if func is not None:
+            mchid, time, spec, card_type = func(params)
+            path = _path(mchid, time, spec, card_type)
+            return path
+        else:
+            return None
+
     def path_pos(self, mchid, time, spec, card_type):
         today = day_stamp(time)
         path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
 
         log.debug("%s,%s", 'MerchantWriter', path)
+        dset = self._data_set(path=path)
+        return dset, time - today
+
+    def _data_set(self, path):
         hfive = self.file
         if path not in hfive:
             dim = pos_map.dim()
-            # dset = hfive.create_dataset(path, (dim,86400), chunks=(dim,300))
-            dset = hfive.create_dataset(path, (dim,86400), chunks=(dim,900))
-            # # dset = hfive.create_dataset(path, (dim,86400), chunks=True) # 系统配置的是(1,5400)
-
-            dset[:, :] = np.zeros((dim,86400))
-
-            # 数据维度调换试验
-            # dset = hfive.create_dataset(path, (86400,dim), chunks=(900,dim))
-            # dset[:, :] = np.zeros((86400,dim))
+            dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 900))
+            dset[:, :] = np.zeros((dim, 86400))
             hfive.flush()
         else:
             dset = hfive[path]
-
-        return dset, time - today
+        return dset

+ 35 - 7
plot/refill/WriterConsumer.py

@@ -10,16 +10,44 @@ class WriterConsumer(Thread):
         self._messages = Queue(maxsize=10000)
         self._stopped = False
         self._handler = handler
+        self._max_threshold = 1000
 
     def run(self):
         while self._stopped == False:
-            try:
-                while True:
-                    method, params = self._messages.get(timeout=1)
-                    self._handler.write(method, params)
-                    self._messages.task_done()
-            except Empty as ex:
-                log.info('Consumer Queue is Empty')
+            while True:
+                size = self._messages.qsize()
+                log.debug("messages size=%d",size)
+                if size > self._max_threshold:
+                    self._batch()
+                else:
+                    self._single()
+    def _batch(self):
+        batches = dict()
+        try:
+            while True:
+                method, params = self._messages.get(timeout=1)
+                self._messages.task_done()
+                path = self._handler.get_path(method, params)
+                if path is None:
+                    continue
+
+                if path not in batches:
+                    batches[path] = list()
+                batches[path].append((method,params))
+        except Empty as ex:
+            log.info('Consumer Queue is Empty')
+        finally:
+            self._handler.write_batch(batches)
+        pass
+
+    def _single(self):
+        try:
+            method, params = self._messages.get(timeout=1)
+            self._handler.write(method, params)
+            self._messages.task_done()
+        except Empty as ex:
+            log.info('Consumer Queue is Empty')
+        pass
 
     def put(self, method,params):
             self._messages.put((method, params))

+ 2 - 2
plot/thdf5.py

@@ -18,8 +18,8 @@ logging.basicConfig(filename='/var/www/html/data/log/qreader.log',
 log = logging.getLogger('starter')
 
 class DataTest(unittest.TestCase):
-    __redis_host = '192.168.3.104'
-    # __redis_host = '192.168.3.46'
+    # __redis_host = '192.168.3.104'
+    __redis_host = '192.168.3.46'
     def test_parase(self):
         try:
             dataCenter.parase('succ-lingzh-1-4-50-1618184676', '1')