stanley-king 2 년 전
부모
커밋
8fa8c0347c

+ 4 - 0
plot/refill/CalcBase.py

@@ -3,6 +3,9 @@ import time as time
 import logging
 import json
 from .DataStream import span_days
+
+import traceback
+
 import logging
 logger = logging.getLogger('CalcBase')
 
@@ -42,6 +45,7 @@ class CalcBase(object):
                 logger.error(ex)
             except Exception as ex:
                 logger.error(ex)
+                logger.error(traceback.format_exc())
             finally:
                 time.sleep(sleep_time)
 

+ 10 - 30
plot/refill/ChSpeedControlCalc.py

@@ -3,8 +3,8 @@ from .algorithm import calc_chspeed, calc_chratio, calc_commit, calc_chprice, ca
 from .DataStream import EChPosmap as pos_map
 from .DataStream import ch_calc_cfgs
 from .ChannelReader import ChannelReader
-from .speed_item import speed_item, order_pmf
-from .speed_manager import speed_manager
+from .chs_type_spec import type_spec_chs, ch_type_spec
+from .chs_manager import chs_manager
 import numpy as np
 import time as time
 import json
@@ -16,9 +16,6 @@ logger = logging.getLogger('ChSpeedControlCalc')
 
 
 class ChSpeedControlCalc(ChannelCalc):
-    # def __init__(self):
-    #     pass
-
     def _calc_pmf(self, _commits, _succs):
         pass
 
@@ -26,38 +23,21 @@ class ChSpeedControlCalc(ChannelCalc):
         start_period, ratio_period, speed_period, monitor_period, cdf_speed_period = ch_calc_cfgs()
 
         reader = self._reader()
-        end_time = int(time.time()) - 86400 * 2
+        end_time = int(time.time()) - 86400 * 5
         days, start_time, end_time = self.calc_time(reader, end_time - start_period, end_time)
 
         day_stamp = days[0]
+        end_pos = end_time - day_stamp
+
         tuple_pathes = reader.many_tuple_path(days, card_types=set([4, 5, 6]))
         gen = detail_pathes(reader, tuple_pathes, days)
 
-        start_pos = start_time - day_stamp
-        end_pos = end_time - day_stamp
-
+        manager = chs_manager()
         for _name, _card_type, _spec, _data in gen:
-            speed = calc_chspeed(_data, pos_map, end_pos - speed_period, end_pos) / 5
-            price, pratio = calc_chprice(_data, pos_map, end_pos - speed_period, end_pos)
-            ratio, ratio_commit, notify_time, succ_time = calc_chratio(_data, pos_map, end_pos - start_period, end_pos)
-            monitor_commit = calc_commit(_data, pos_map, end_pos - monitor_period, end_pos) / 5
-            _commits, _succs, _ratios = calc_chspeed_ratio(_data, pos_map, end_pos - start_period, end_pos, cdf_speed_period)
-
-            x = _succs.astype(int).tolist()
-            succs = ','.join(str(i) for i in x)
-
-            x = _commits.astype(int).tolist()
-            commits = ','.join(str(i) for i in x)
-
-            logger.debug(f'commits={commits},succs={succs}')
-
-            # pmf = order_pmf()
-            # pmf.calculate(_commits,_succs)
-
-            # key = f'{_spec}-{_card_type}'
-            # result[key] = [speed, ratio, ratio_commit, notify_time, monitor_commit]
-            # logger.debug("%s-%d-%d speed=%d ratio=%.5f commit_count=%d notify_time=%.5f succ_time=%.5f monitor_commit=%d", _name, _card_type, _spec, speed, ratio,
-            #              ratio_commit, notify_time, succ_time, monitor_commit)
+            manager.add(_name, _card_type, _spec, _data, end_pos)
+        manager.speed()
+        manager.optimize()
+        pass
 
     def _calc_handler(self, rclient):
         self._collect_data()

+ 3 - 3
plot/refill/DataStream.py

@@ -129,9 +129,9 @@ class DataReadStream(metaclass=ABCMeta):
     def __init__(self, hfive):
         self._hfive = hfive
         self._days = self._getdays()
-        stime = lambda t: time.strftime('%y-%m-%d %H:%M:%S', time.localtime(t))
-        sdays = [stime(day) for day in self._days]
-        logger.debug(sdays)
+        # stime = lambda t: time.strftime('%y-%m-%d', time.localtime(t))
+        # sdays = [stime(day) for day in self._days]
+        # logger.debug(sdays)
 
     def __del__(self):
         pass

+ 1 - 1
plot/refill/__init__.py

@@ -30,7 +30,7 @@ from .ChSpeedControlCalc import ChSpeedControlCalc
 
 from .WriterConsumer import WriterConsumer
 
-from .speed_item import order_pmf,speed_item
+from .chs_type_spec import ch_type_spec,type_spec_chs
 
 
 from .server_util import opt_parse

+ 2 - 1
plot/refill/algorithm.py

@@ -106,6 +106,7 @@ def calc_chprice(data, pos_map, start, end):
     else:
         return price,None
 
+    profit = None
     pratio = None
     succs = data[pos_map.succ_count, :]
     pos_succ = np.where(succs > 0)
@@ -117,7 +118,7 @@ def calc_chprice(data, pos_map, start, end):
         ch_amounts  = sums[pos_map.succ_amounts]
         profit = mch_amounts - ch_amounts
         pratio = round(profit / ch_amounts, 4)
-    return price, pratio
+    return price, profit, pratio
 
 
 def calc_chratio(data, pos_map, start, end):

+ 182 - 0
plot/refill/ch_type_spec.py

@@ -0,0 +1,182 @@
+from .algorithm import calc_chspeed, calc_chratio, calc_commit, calc_chprice, calc_chspeed_ratio
+from .DataStream import EChPosmap as pos_map
+from .DataStream import ch_calc_cfgs
+import numpy as np
+from itertools import product
+from enum import IntEnum
+
+import logging
+logger = logging.getLogger('speedctl')
+
+# 总提单量,总成功单量,成功率,每分钟提单平均数,每分钟成单量
+class EDetailPosmap(IntEnum):
+    cur_commit = 0
+    cur_succ = 1
+    per_commit = 2
+    per_succ = 3
+    per_profit = 4
+
+    @staticmethod
+    def dim():
+        return 5
+    pass
+
+class ch_type_spec(object):
+    def __init__(self, name):
+        self._name = name
+        self._succs = None
+        self._commits = None
+        self._detail = {}
+        self._reckon_speed = 0
+        pass
+
+    def succ_count(self):
+        return self._succs
+
+    def commit_count(self):
+        return self._commits
+
+    def price(self):
+        return self._price
+
+    def profit(self):
+        return self._profit
+
+    def pratio(self):
+        return self._pratio
+
+    def cur_speed(self):
+        return self._speed
+
+    def set_reckon_speed(self,speed):
+        self._reckon_speed = int(round(speed,0))
+    def reckon_speed(self):
+        return self._reckon_speed
+
+    def detail_dim(self):
+        result = []
+        for key,item in self._detail.items():
+            result.append(key)
+        return result
+
+    def is_dim_zero(self):
+        if len(self._detail) == 1 and 0 in self._detail:
+            return True
+        else:
+            return False
+
+    def detail(self,dim):
+        if dim in self._detail:
+            return self._detail[dim]
+        else:
+            return None
+
+    def detail_speed(self,dim):
+        if dim in self._detail:
+            return self._detail[dim][EDetailPosmap.per_commit]
+        else:
+            return None
+
+    def per_commit(self,dim):
+        if dim in self._detail:
+            return self._detail[dim][EDetailPosmap.per_commit]
+        else:
+            return None
+
+    def init_data(self, data, end_pos):
+        logger.debug(f'chname={self._name}')
+        start_period, ratio_period, speed_period, monitor_period, cdf_speed_period = ch_calc_cfgs()
+        self._price, self._rofit, self._pratio = calc_chprice(data, pos_map, end_pos - speed_period, end_pos)
+        self._speed = calc_chspeed(data, pos_map, end_pos - speed_period, end_pos) / 5
+        ratio, ratio_commit, self._notify_time, succ_time = calc_chratio(data, pos_map, end_pos - start_period, end_pos)
+        monitor_commit = calc_commit(data, pos_map, end_pos - monitor_period, end_pos) / 5
+        _commits, _succs, _ratios = calc_chspeed_ratio(data, pos_map, end_pos - start_period, end_pos, cdf_speed_period)
+        self.calculate(_commits,_succs)
+        pass
+
+    def _prepare_data(self, datas):
+        commits, succs = self._unique(datas)
+        result = np.concatenate([[commits], [succs]], axis=0)
+        return result
+
+    def calculate(self, commits, succs):
+        def tostr(x):
+            x = x.tolist()
+            strx = ','.join(str(i) for i in x)
+            return strx
+
+        commits = commits.astype(int)
+        succs = succs.astype(int)
+
+        logger.debug(f'commits={tostr(commits)} succs={tostr(succs)}')
+
+        datas = np.concatenate([[commits], [succs]], axis=0)
+        indexer = datas[0].argsort()
+        datas = datas[:,indexer]
+
+        avg = np.average(datas, axis=1)
+        self._commit_min = round(avg[0], 5)
+        self._succs_min = round(avg[1], 5)
+
+        sumview = np.sum(datas, axis=1)
+        self._commits = sumview[0]
+        self._succs = sumview[1]
+
+        cum = np.cumsum(datas, axis=1)
+        scales = list(set([int(self._succs * i * 0.2) for i in range(0, 6, 1)]))
+        scales.sort()
+
+        detail = {}
+        start = 0
+        last = 0
+        for i, succ in enumerate(scales):
+            pos = np.where(cum[1] <= succ)
+            if len(pos[0]) > 0:
+                end = pos[0][-1]
+                if start > end:
+                    continue
+                delta = cum[:, end] - cum[:, last]
+                view = datas[0:, start:end + 1]
+
+                avg = np.average(view, axis=1)
+                max = np.max(view, axis=1)
+
+                start = end + 1
+                last = end
+                # 总提单量,总成功单量,成功率,每分钟提单平均数,每分钟成单量
+                cur_commit = delta[0]
+                cur_succ = delta[1]
+                cur_ratio = round(delta[1] / (delta[0] + 0.000001), 5)
+                per_commit = round(avg[0], 5)
+                per_succ   = round(avg[1], 5)
+
+                per_pratio = 0 if self._pratio is None else self._pratio
+                per_profit = per_pratio * per_succ
+                #每分钟提单平均数,每分钟成单量,每分钟利润
+                detail[i] = [cur_commit, cur_succ, per_commit, per_succ, per_profit]
+                logger.debug(f'i={i} detail={detail[i]}')
+        self._detail = detail
+        pass
+
+    def _unique(self, datas):
+        succs = []
+        commits = []
+
+        last_commit = datas[0, 0]
+        last_succ = datas[1, 0]
+        for i in range(datas.shape[1]):
+            commit = datas[0, i]
+            succ = datas[1, i]
+            if commit != last_commit:
+                succs.append(last_succ)
+                commits.append(last_commit)
+                last_commit = commit
+                last_succ = succ
+            elif i == 0:
+                pass
+            else:
+                last_succ += succ
+
+        succs.append(last_succ)
+        commits.append(last_commit)
+        return commits, succs

+ 31 - 0
plot/refill/chs_manager.py

@@ -0,0 +1,31 @@
+from .chs_type_spec import type_spec_chs
+
+import logging
+logger = logging.getLogger('speedctl')
+
+class chs_manager(object):
+    def __init__(self):
+        self.channels = {}
+        pass
+
+    def add(self,name, card_type, spec, data,end_pos):
+        key_gen = lambda card_type,spec: f'{card_type}-{spec}'
+        key = key_gen(card_type,spec)
+        if key in self.channels:
+            subch = self.channels[key]
+        else:
+            subch = type_spec_chs(card_type, spec)
+            self.channels[key] = subch
+        subch.add(name, data, end_pos)
+
+    def optimize(self):
+        for key,item in self.channels.items():
+            item.optimize()
+
+    def speed(self):
+        total = 0
+        for key,item in self.channels.items():
+            total += item.cur_speed()
+        logger.debug(f'total = {total}')
+        return total
+    pass

+ 175 - 0
plot/refill/chs_type_spec.py

@@ -0,0 +1,175 @@
+from .algorithm import calc_chspeed, calc_chratio, calc_commit, calc_chprice, calc_chspeed_ratio
+from .DataStream import EChPosmap as pos_map
+from .DataStream import ch_calc_cfgs
+from .ch_type_spec import ch_type_spec
+from .ch_type_spec import EDetailPosmap as dposmap
+import numpy as np
+from itertools import product
+
+import logging
+
+logger = logging.getLogger('speedctl')
+
+def is_fs(chname):
+    if len(chname) < 2:
+        return False;
+    ext = chname[-2:]
+    return True if ext == 'fs' else False
+
+class type_spec_chs(object):
+    def __init__(self, card_type, spec):
+        self._card_type = card_type
+        self._spec = spec
+        self._normal_price = [] #普充通道
+        self._over_price = []   #溢价通道
+        self._fs = []           #分省通道
+        self._chs = {}
+        pass
+
+    def _normal_optimize(self):
+        def looper():
+            chpos  = []
+            feeds  = []
+            looper = []
+            for chname in self._normal_price:
+                ch_item = self._chs[chname]
+                if ch_item.is_dim_zero():
+                    feeds.append(ch_item)
+                    continue
+
+                dim = ch_item.detail_dim()
+                if len(dim) > 0:
+                    looper.append(dim)
+                    chpos.append(chname)
+                else:
+                    feeds.append(chname)
+
+            combination = []
+            for i in product(*looper):
+                combination.append(i)
+            return feeds, chpos, combination
+        
+        def handle_feed(feed,left):
+            length = len(feed)
+            per_comit = int(round(left / length + 0.000001, 1))
+            per_comit = per_comit if per_comit > 0 else 0
+
+            for ch_name in feed:
+                ch_item = self._chs[chname]
+                ch_item.set_reckon_speed(per_comit)
+
+        feeds, chpos, combination = looper()
+        if len(combination) > 0:
+            used = self._op_maxsuccs(combination, chpos)
+        else:
+            used = 0
+
+        left = self.speed() - used
+        handle_feed(left)
+        pass
+
+    def _op_maxsuccs(self,combination,chpos):
+        def fill_data(combination, chpos, detail_length):
+            lenth = len(combination)
+            datas = np.zeros((len(chpos), detail_length, len(combination)))
+            for col,item in enumerate(combination):
+                for row,dim in enumerate(item):
+                    chname = chpos[row]
+                    ch_item = self._chs[chname]
+                    detail = ch_item.detail(dim)
+                    datas[row, :, col] = detail  # 总提单量,总成功单量,成功率,每分钟提单平均数,每分钟成单量
+            return datas
+            pass
+
+        def sort_per_profit(datas, combination):
+            view = np.sum(datas,axis=0)
+            indexer = view[dposmap.per_profit].argsort()
+            combination = [combination[i] for i in indexer]
+            view = view[:, indexer]
+            return view, combination
+
+        def sort_nearst(sumview, combination, total):
+            def splice(view, total):
+                min = total * 0.9
+                max = total
+                minpos = np.where(view[dposmap.per_commit] >= min)
+                maxpos = np.where(view[dposmap.per_commit] <= max)
+
+                pos = set(minpos[0]) & set(maxpos[0])
+                pos = list(pos)
+                if len(pos) == 0:
+                    if len(minpos[0]) == 0:
+                        return [maxpos[0][0]]
+                    else:
+                        return [minpos[0][-1]]
+                else:
+                    return pos
+
+            def max_pratio(matches_view):
+                pratio = matches_view[dposmap.per_profit] / matches_view[dposmap.per_commit]
+                index = np.argmax(pratio)
+                return index
+                pass
+
+            indexer = splice(sumview, total)
+            combination = [combination[i] for i in indexer]
+            match_view = sumview[:, indexer]
+            index = max_pratio(match_view)
+
+            logger.debug(f'card_type={self._card_type} spec={self._spec}')
+            item = combination[index]
+
+            used = 0
+            for i,dim in enumerate(item):
+                chname = chpos[i]
+                ch_item = self._chs[chname]
+                speed = ch_item.detail_speed(dim)
+                if speed is not None:
+                    speed = int(round(speed,0))
+                    ch_item.set_reckon_speed(speed)
+                    used += speed
+                detail = ch_item.detail(dim)
+                logger.debug(f'{chname} dim={dim} detail={detail}')
+            logger.debug(f'cursum={match_view[:, index]}')
+            return used
+        ##############################################################################################################################################
+        datas = fill_data(combination,chpos,dposmap.dim())
+        #按找利润排序,再在里面找满足提单量的
+        total = self.speed()
+        sumview, combination = sort_per_profit(datas, combination)
+        used = sort_nearst(sumview, combination, total)
+        return used
+
+    def optimize(self):
+        self._normal_optimize()
+        
+        pass
+
+    def add(self, chname, data, end_pos):
+        ch_item = ch_type_spec(chname)
+        ch_item.init_data(data, end_pos)
+
+        price = ch_item.price()
+        pratio = ch_item.pratio()
+        if price is not None:
+            price = int(price * 1000)
+
+        if price >= self._spec * 1000 or (pratio is not None and pratio <= 0):
+            self._over_price.append(chname)
+        elif is_fs(chname):
+            self._fs.append(chname)
+        else:
+            self._normal_price.append(chname)
+
+        if chname not in self._chs:
+            self._chs[chname] = ch_item
+        else:
+            logger.info(f'{chname} duplicate')
+        pass
+
+    def speed(self):
+        total = 0
+        for key, item in self._chs.items():
+            total += item.cur_speed()
+        logger.debug(f'{self._card_type}-{self._spec} speed={total}')
+        return total

+ 0 - 47
plot/refill/speed_item.py

@@ -1,47 +0,0 @@
-
-import numpy as np
-class speed_item(object):
-    def __init__(item):
-        pass
-
-
-class order_pmf(object):
-
-    def calculate(self, commits, succs):
-        succs = succs.astype(int)
-        commits = commits.astype(int)
-
-        self._succs = np.sum(succs)
-        self._commits = np.sum(commits)
-
-        min = np.min(commits)
-        max = np.max(commits)
-
-        mcommits = [i for i in range(0, max + 1, 1)]
-        msuccs = []
-
-        succ = self._succ
-        scales = [succ * i for i in range(0,1.0,0.2)]
-
-        for i in mcommits:
-            pos = np.where(commits == i)
-            succ = succs[pos]
-            count = np.sum(succ)
-            msuccs.append(count)
-
-        msuccs = np.array(msuccs)
-        msuccs = np.cumsum(msuccs)
-
-
-
-        pos = np.where(msuccs == succ)
-        if len(pos[0]) > 1:
-            end = pos[0][1]
-            mcommits = mcommits[:end]
-            ratio = ratio[:end]
-        pass
-
-    def succ_count(self):
-        return self._succs
-    def commit_count(self):
-        return self._commits

+ 0 - 3
plot/refill/speed_manager.py

@@ -1,3 +0,0 @@
-
-class speed_manager(object):
-    pass

+ 68 - 4
plot/testSpeed.py

@@ -1,17 +1,81 @@
 import unittest
-from refill import order_pmf
+from refill import ch_type_spec
 import numpy as np
 
+import logging
+logging.basicConfig(filename='/var/www/html/data/log/tspeed.log', level=logging.DEBUG)
+log = logging.getLogger('SpeedTestCase')
 
 class SpeedTestCase(unittest.TestCase):
     def test_calculate(self):
-        commits = [23,8,82,72,157,228,119,164,119,226,166,127,349,220,267,133,57,257,307,83,14,25,172,428,145,379,219,335,231,630]
-        succs=[0,0,0,0,0,0,0,0,1,1,0,0,0,0,2,0,0,0,0,1,1,0,0,1,0,2,0,1,0,0]
-        pmf = order_pmf()
+        # commits = []
+        # succs=[]
+        # commits = [23,8,82,72,157,228,119,164,119,226,166,127,349,220,267,133,57,257,307,83,14,25,172,428,145,379,219,335,231,630]
+        # succs=[0,0,0,0,0,0,0,0,1,1,0,0,0,0,2,0,0,0,0,1,1,0,0,1,0,2,0,1,0,0]
+        # commits = [1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1]
+        # succs = [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+
+        # commits = [41, 64, 42, 40, 35, 35, 26, 36, 17, 16, 0, 0, 0, 3, 13, 7, 0, 0, 2, 0, 0, 0, 4, 0, 1, 6, 0, 0, 0, 0]
+        # succs   = [0,  2,  0,  0,  1,  0,  0,  0,  0,  1,  0, 0, 0, 0, 0,  0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+        commits = [67, 31, 0, 146, 207, 156, 157, 0, 41, 286, 333, 166, 76, 292, 32, 472, 53, 102, 0, 62, 115, 104, 107, 200, 190, 66, 94, 411, 259,
+                   105]
+        succs = [0, 0, 0, 2, 0, 1, 0, 0, 0, 0, 2, 2, 1, 1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 1, 0, 0, 4, 3, 0]
+
+        pmf = ch_type_spec('bodian')
         pmf.calculate(np.array(commits),np.array(succs))
 
+
         pass
 
+    def test_npsort(self):
+        commits = [23, 8, 82, 72, 157, 228, 119, 164, 119, 226, 166, 127, 349, 220, 267, 133, 57, 257, 307, 83, 14, 25, 172, 428, 145, 379, 219, 335,
+                   231, 630]
+        succs = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 2, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 2, 0, 1, 0, 0]
+        datas = np.concatenate([[commits],[succs]],axis=0)
+        # datas[0,:].sort()
+        indexer = datas[0].argsort()
+        view = datas[:,indexer]
+
+        def unique(datas):
+            succs = []
+            commits = []
+
+            last_commit = datas[0,0]
+            last_succ = datas[1,0]
+            for i in range(datas.shape[1]):
+                commit = datas[0,i]
+                succ = datas[1,i]
+                if commit != last_commit:
+                    succs.append(last_succ)
+                    commits.append(last_commit)
+                    last_commit = commit
+                    last_succ = succ
+                elif i == 0:
+                    pass
+                else:
+                    last_succ += succ
+
+            succs.append(last_succ)
+            commits.append(last_commit)
+            result = np.concatenate([[commits],[succs]],axis=0)
+            return result
+        
+        unique(datas)
+
+    def test_fsch(self):
+        def is_fs(chname):
+            if len(chname) < 2:
+                return False;
+            ext = chname[-2:]
+            return True if ext == 'fs' else False
+
+        x = 'yongchonggongfs'
+        k = is_fs(x)
+        
+        x = int(round(2.55,0))
+        pass
+
+