stanley-king 2 anni fa
parent
commit
5daa067c96

+ 19 - 0
plot/channel_speed.py

@@ -0,0 +1,19 @@
+from refill import opt_parse, ChSpeedCalc
+import signal as sig
+import sys,getopt
+
+import logging
+logging.basicConfig(filename='/var/www/html/data/log/statcalc.log',
+                    format='%(levelname)10s  %(asctime)s  %(name)10s %(thread)d %(message)s',
+                    level=logging.DEBUG)
+logger = logging.getLogger('channel_speed')
+
+if __name__ == '__main__':
+    try:
+        rhost, rport = opt_parse()
+        calc = ChSpeedCalc()
+        calc.set_redis(rhost, rport)
+        sig.signal(sig.SIGINT, lambda: calc.stop())
+        calc.run()
+    except Exception as ex:
+        logger.error(ex)

+ 5 - 6
plot/refill/CalcBase.py

@@ -20,7 +20,7 @@ class CalcBase(object):
         self._mquit = True
 
     def _calc_handler(self, rclient):
-        pass
+        return 1
 
     def _reader(self):
         return None
@@ -32,19 +32,18 @@ class CalcBase(object):
             return client
 
         client = None
-        loop = 0;
+        sleep_time = 1
         while self._mQuit == False:
             try:
                 if client is None:
                     client = redis_client()
-                self._calc_handler(client)
+                sleep_time = self._calc_handler(client)
             except redis.RedisError as ex:
                 logger.error(ex)
             except Exception as ex:
                 logger.error(ex)
             finally:
-                time.sleep(1)
-                loop += 1
+                time.sleep(sleep_time)
 
     def calc_time(self, reader, start_time: int, end_time: int):
         end_time = reader.near_stamp(end_time, False)
@@ -59,7 +58,7 @@ class CalcBase(object):
         logger.debug("near_stamp start_time %s end_time=%s", strtime(start_time), strtime(end_time))
 
         if start_time >= end_time:
-            raise Exception('start_time equal endtime')
+            raise Exception('start_time >= endtime')
 
         days = span_days(start_time, end_time)
         strtime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))

+ 32 - 0
plot/refill/ChSpeedCalc.py

@@ -0,0 +1,32 @@
+from .ChannelCalc import ChannelCalc, detail_paths
+from .algorithm import calc_chspeed
+from .DataStream import EChPosmap as pos_map
+from .ChannelReader import ChannelReader
+import time as time
+import json
+import redis
+
+import logging
+logger = logging.getLogger('ChSpeedCalc')
+
+class ChSpeedCalc(ChannelCalc):
+    def _calc_handler(self, rclient):
+        logger.debug('_calc_handler')
+
+        reader = self._reader()
+        end_time = int(time.time() - 86400)
+        period = 300
+        days, start_time, end_time = self.calc_time(reader, end_time - period, end_time)
+
+        day_stamp = days[0]
+        tuple_pathes = reader.many_tuple_path(days, card_types=set([4, 5, 6]))
+        gen = detail_paths(reader, tuple_pathes, days)
+
+        mamounts = dict()
+        for _name, _card_type, _spec, _data in gen:
+            speed = calc_chspeed(_data, pos_map, start_time - day_stamp, end_time - day_stamp)
+            logger.debug("_name")
+
+
+
+        return 0.1

+ 26 - 0
plot/refill/ChannelCalc.py

@@ -0,0 +1,26 @@
+from .CalcBase import CalcBase
+from .DataStream import EChPosmap as pos_map, span_days
+from .ChannelReader import ChannelReader
+
+import logging
+logger = logging.getLogger('ChannelCalc')
+
+
+def detail_pathes(reader: ChannelReader, tuple_pathes: dict, days: list):
+    count = len(days)
+    for name, tup in tuple_pathes.items():
+        for _card_type, _spec in tup:
+            detail_datas = reader.init_data(count)
+            for i, day in enumerate(days):
+                data = reader.read(day, name, _card_type, _spec)
+                if data is not None:
+                    column_pos = i * 86400
+                    view = detail_datas[:, column_pos:column_pos + 86400]
+                    view += data
+            yield name, _card_type, _spec, detail_datas
+
+
+class ChannelCalc(CalcBase):
+    def _reader(self):
+        return ChannelReader()
+    pass

+ 6 - 4
plot/refill/MAmountCalc.py

@@ -1,4 +1,4 @@
-from .MerchantCalcBase import MerchantCalcBase, mch_detail_paths, mch_paths, detail_paths
+from .MerchantCalc import MerchantCalc, mch_detail_paths, mch_paths, detail_paths
 from .algorithm import calc_morder_lack
 from .DataStream import EMchPosmap as pos_map
 import time as time
@@ -6,7 +6,7 @@ import json
 import redis
 
 import logging
-logger = logging.getLogger('mch_amount_calc')
+logger = logging.getLogger('MAmountCalc')
 
 def earliest_time(rclient: redis.client):
     result = {}
@@ -25,7 +25,7 @@ def earliest_time(rclient: redis.client):
     else:
         return result, min_time
 
-class MAmountCalc(MerchantCalcBase):
+class MAmountCalc(MerchantCalc):
     def _calc_handler(self, rclient):
         mchid_times,earliest = earliest_time(rclient)
         end_time = int(time.time())
@@ -58,4 +58,6 @@ class MAmountCalc(MerchantCalcBase):
 
         val = json.dumps({'send_amounts': result, 'time': int(time.time())})
         rclient.set('nc_refill-stat-merchant-sendamount',val)
-        logger.debug(result)
+        logger.debug(result)
+
+        return 1

+ 7 - 5
plot/refill/MProfitRatioCalc.py

@@ -1,4 +1,4 @@
-from .MerchantCalcBase import MerchantCalcBase, mch_detail_paths, mch_paths
+from .MerchantCalc import MerchantCalc, mch_detail_paths, mch_paths
 from .algorithm import calc_mch_profit
 from .DataStream import EMchPosmap as pos_map
 
@@ -7,7 +7,7 @@ import time as time
 import logging
 import json
 
-logger = logging.getLogger('mch_profit_ratio')
+logger = logging.getLogger('MProfitRatioCalc')
 
 
 def mixed_ratio(rclient):
@@ -23,9 +23,9 @@ def mixed_ratio(rclient):
     return result
 
 
-class MProfitRatioCalc(MerchantCalcBase):
+class MProfitRatioCalc(MerchantCalc):
     def _calc_handler(self, rclient):
-        logger.debug('MProfitRatioCalc _calc_handler')
+        logger.debug('_calc_handler')
         mixed_ratios = mixed_ratio(rclient)
         reader = self._reader()
 
@@ -62,4 +62,6 @@ class MProfitRatioCalc(MerchantCalcBase):
         result = {'gross': gross, 'detail': detail}
         if len(gross) != 0 or len(detail) != 0:
             rclient.set(f"nc_refill_merchant_profit_ratio", json.dumps(result))
-            rclient.publish('refill', json.dumps({'type': 'mch_profit_ratio', 'value': 0}))
+            rclient.publish('refill', json.dumps({'type': 'mch_profit_ratio', 'value': 0}))
+
+        return 1

+ 2 - 2
plot/refill/MerchantCalcBase.py

@@ -4,7 +4,7 @@ from .MerchantReader import MerchantReader
 from .algorithm import calc_morder_lack
 
 import logging
-logger = logging.getLogger('MerchantCalcBase')
+logger = logging.getLogger('MerchantCalc')
 
 def detail_paths(reader: MerchantReader, tuple_pathes: dict, days: list):
     count = len(days)
@@ -88,7 +88,7 @@ def allpathes(reader: MerchantReader, tuple_pathes: dict, days: list, spec=None)
     if show_detail == False:
         yield 'all', None, None, all_datas
 
-class MerchantCalcBase(CalcBase):
+class MerchantCalc(CalcBase):
     def _reader(self):
         return MerchantReader()
     pass

+ 3 - 1
plot/refill/__init__.py

@@ -13,6 +13,7 @@ from .MerchantPainter import MerchantPainter,get_mchids
 from .helper import filter_chname, filter_cardtype, filter_mchids
 from .MAmountCalc import MAmountCalc
 from .MProfitRatioCalc import MProfitRatioCalc
+from .ChSpeedCalc import ChSpeedCalc
 from .WriterConsumer import WriterConsumer
 from .server_util import opt_parse
 
@@ -22,5 +23,6 @@ __all__ = ['DataWriteStream', 'DataReadStream',
            'ChannelPainter', 'MerchantPainter', 'get_channels', 'get_mchids',
            'queueListener', 'open_hdf5', 'day_stamp', 'time_border',
            'filter_chname', 'filter_cardtype', 'filter_mchids',
-           'MAmountCalc','MProfitRatioCalc'
+           'MAmountCalc','MProfitRatioCalc',
+           'ChSpeedCalc',
            'opt_parse']

+ 8 - 0
plot/refill/algorithm.py

@@ -17,6 +17,14 @@ def calc_chratios(data, pos_map, start, end):
     y = y.ravel()
     return int(all[0, -1]), int(all[0, -1] + all[1, -1]), y
 
+def calc_chspeed(data, pos_map, start, end):
+    view = data[[pos_map.commit_count], :]
+    view = view[:, start:end]
+
+    all = np.sum(view, axis=1)
+    all = all.ravel()
+
+
 
 def calc_mchratios(data, pos_map, start, end):
     view = data[[pos_map.succ_count, pos_map.fail_count, pos_map.submit_count], :]