import time import unittest import logging from refill import time_border logging.basicConfig(filename='/var/www/html/data/log/qreader.log', level=logging.DEBUG) log = logging.getLogger('reader') class MyTestCase(unittest.TestCase): # __redis_host = '192.168.3.104' __redis_host = '192.168.3.46' def test_something(self): self.assertEqual(True, False) # add assertion here def test_listener(self): try: from refill import queueListener queueListener.set_redis(self.__redis_host, '6379') queueListener.prepare_data() except Exception as ex: log.error(ex) pass def test_timestart(self): stamp = int(time.time()) for i in range(300): x = stamp + i l = time_border(300, x, True) r = time_border(300, x, False) print('l=', l, 'r=', r) def testChannel(self): from refill import ChannelReader reader = ChannelReader() days = reader.days() print('days=', days) for day in days: x = reader.tuple_path(day, card_types={4, 5, 6}) x1 = reader.tuple_path(day, {'chizeng', 'ainika'}, {4, 5, 6}) x2 = reader.tuple_path(day, {'chizeng', 'ainika'}, {4, 5, 6}, 50) def test_chpainter(self): from refill import ChannelCumPainter start_time = int(time.time()) - 10 * 86400 - 3600 end_time = int(time.time()) - 8 * 86400 painter = ChannelCumPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6}) painter.paint() def test_ch_speed_analyze_painter(self): from refill import ChannelSpeedAnalyzePainter start_time = 1664632800 end_time = 1664640000 painter = ChannelSpeedAnalyzePainter(start_time=start_time, end_time=end_time, chnames=set(['feimingyunew']), card_types={4}, spec=100) painter.paint() def test_chcov_ratio(self): from refill import ChannelCovPainter start_time = int(time.time()) - 10 * 86400 - 3600 end_time = int(time.time()) - 10 * 86400 painter = ChannelCovPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6}, filter_wave=3600) painter.paint() def test_chcov_succ(self): from refill import ChannelCovSuccPainter start_time = int(time.time()) - 20 * 86400 - 3600 end_time = int(time.time()) - 20 * 86400 painter = ChannelCovSuccPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6}, filter_wave=3600) painter.paint() def test_mch_ratio_painter(self): from refill import MerchantCumRatioPainter start_time = int(time.time()) - 21 * 86400 - 3600 end_time = int(time.time()) - 20 * 86400 painter = MerchantCumRatioPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6}) painter.paint() def test_mch_covratio_painter(self): from refill import MerchantCovRatioPainter start_time = int(time.time()) - 21 * 86400 - 3600 end_time = int(time.time()) - 20 * 86400 painter = MerchantCovRatioPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6}) painter.paint() def test_mch_amount_painter(self): from refill import MerchantAmountPainter start_time = int(time.time()) - 20 * 86400 - 3600 end_time = int(time.time()) - 20 * 86400 painter = MerchantAmountPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6}) painter.paint() def test_ChannelWriter(self): from refill import ChannelWriter, open_hdf5 import time hfive = open_hdf5('/var/www/html/data/stdata/channel.hdf5', True) chwriter = ChannelWriter(hfive) itema = {'channel_name': 'zero', 'time': int(time.time()), 'spec': 50, 'card_type': 4, 'channel_amount': 49, 'period': 30} itemb = {'channel_name': 'xxxxxx', 'time': int(time.time()), 'spec': 50, 'card_type': 4, 'channel_amount': 49, 'period': 30, 'mch_amount': 49.625} chwriter.write('ch_succ', itema) chwriter.write('ch_succ', itemb) def test_netcheck(self): from refill import NetchkReader import time from refill import day_stamp day = day_stamp(int(time.time()) - 4 * 86400) days = [day, day - 86400] net = NetchkReader() channels = net.tuple_path(day) channels = net.many_tuple_path(days) def test_net_painter(self): from refill import NetcheckCovPainter start_time = int(time.time()) - 5 * 86400 end_time = int(time.time()) - 4 * 86400 painter = NetcheckCovPainter(start_time=start_time, end_time=end_time) painter.paint() def testDays(self): from refill import MerchantReader from refill import ChannelReader try: chreader = ChannelReader() chdays = chreader.days() xlables = [time.strftime('%d-%H:%M:%S', time.localtime(d)) for d in chdays] print(xlables) reader = MerchantReader() days = reader.days() except Exception as ex: log.error(ex) pass def test_jsonLoads(self): import json try: str = 4 x = json.loads(str) print(x) except Exception as ex: print(ex) def test_partial(self): from functools import partial add_five = partial() def test_key(self): def person(name, age, *, city, job): print(name, age, city, job) person('Jack', 24, city='Beijing', job='Engineer') person('Jack', 24, 'Beijing', job='Engineer') person('Jack', 24, 'Beijing', 'Engineer') def test_none(self): x = None len = len(x) print(len) def test_set(self): x = set([(4, 50), (4, 100)]) y = set([(4, 50), (5, 100)]) z = x | y print(z) def test_env(self): import os x = os.getenv("PYCHARM_DISPLAY_PORT", "-1") print(x) def test_matplot(self): import matplotlib x = matplotlib.__version__ print(x) def test_time(self): x = int(time.time()) print(x) def test_rpop(self): import redis import json pool = redis.ConnectionPool(host=self.__redis_host, port=6379, db=0) r = redis.Redis(connection_pool=pool) item = r.rpop('REFILL_MONITOR_QUEUE') if item is None: print('hello') else: try: val = json.loads(item) method = val['method'] params = val['params'] print(method, params) except Exception as ex: log.error(ex) def test_consumer(self): from refill import WriterConsumer class PrintHandler: def write(self, method, msg): log.debug(msg) time.sleep(0.01) handler = PrintHandler() consumer = WriterConsumer(handler, 'PrintHandler') consumer.start() for i in range(100000): consumer.put('test', f'index = {i}') consumer.quit() consumer.join() def test_merge(self): from collections import defaultdict import time as time def merge(l, r): for name, ls in l.items(): if name in r: ls.extend(r[name]) k = set(ls) r[name] = list(k) else: r[name] = ls return r all = defaultdict(list) a = {'yunchonggongfs': [(4, 100), (4, 30), (4, 50)]} b = {'yunchonggongfs': [(4, 100), (4, 200), (4, 50)]} all = merge(a, all) all = merge(b, all) x = 0; def test_split(self): def split_card(card_specs): result = dict() for card_type, spec in card_specs: if card_type not in result: result[card_type] = [] result[card_type].append(spec) return result tups = [(4, 100), (4, 50), (5, 10), (4, 30), (5, 30), (6, 100), (5, 100)] x = split_card(tups) y = 1 def testFindLowest(self): # ratios = {'feimingyu_high': 5 / 215, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 7 / 23276, # 'yunchonggongx': 1 / 232760} # ratios = {'feimingyu_high': 0.05,'feimingyu_high': 5 / 215, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 0.1, 'yunchonggong': 1 / 23276,'yunchonggongx': 1 / 232760} ratios = {'feimingyu_high': 0.3, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276} # ratios = {'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276} # ratios = {'feimingyu_high': 5 / 215, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276} # ratios = {'feimingyu_high': 0.33, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276} import math, cmath def classify_ratio(ratios): cls = {} indexer = [] for name, ratio in ratios.items(): index = round(math.log(ratio)) log.debug(f'index={index} ratio={ratio}') if index not in cls: cls[index] = [] indexer.append(index) cls[index].append(name) max_index = max(indexer) min_index = max_index - 5 max_index = -7 if max_index < -7 else max_index min_index = -7 if min_index < -7 else min_index high = [] low = [] for index, names in cls.items(): if index >= min_index: high.extend(names) else: low.extend(names) return high, low high, low = classify_ratio(ratios) log.debug(f'high={high},low={low}') if __name__ == '__main__': unittest.main()