123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- import time
- import unittest
- import logging
- from refill import time_border
- logging.basicConfig(filename='/var/www/html/data/log/qreader.log', level=logging.DEBUG)
- log = logging.getLogger('reader')
- class MyTestCase(unittest.TestCase):
- # __redis_host = '192.168.3.104'
- __redis_host = '192.168.3.46'
- def test_something(self):
- self.assertEqual(True, False) # add assertion here
- def test_listener(self):
- try:
- from refill import queueListener
- queueListener.set_redis(self.__redis_host, '6379')
- queueListener.prepare_data()
- except Exception as ex:
- log.error(ex)
- pass
- def test_timestart(self):
- stamp = int(time.time())
- for i in range(300):
- x = stamp + i
- l = time_border(300, x, True)
- r = time_border(300, x, False)
- print('l=', l, 'r=', r)
- def testChannel(self):
- from refill import ChannelReader
- reader = ChannelReader()
- days = reader.days()
- print('days=', days)
- for day in days:
- x = reader.tuple_path(day, card_types={4, 5, 6})
- x1 = reader.tuple_path(day, {'chizeng', 'ainika'}, {4, 5, 6})
- x2 = reader.tuple_path(day, {'chizeng', 'ainika'}, {4, 5, 6}, 50)
- def test_chpainter(self):
- from refill import ChannelCumPainter
- start_time = int(time.time()) - 10 * 86400 - 3600
- end_time = int(time.time()) - 8 * 86400
- painter = ChannelCumPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6})
- painter.paint()
- def test_ch_speed_analyze_painter(self):
- from refill import ChannelSpeedAnalyzePainter
- start_time = 1664632800
- end_time = 1664640000
- painter = ChannelSpeedAnalyzePainter(start_time=start_time, end_time=end_time, chnames=set(['feimingyunew']), card_types={4}, spec=100)
- painter.paint()
- def test_chcov_ratio(self):
- from refill import ChannelCovPainter
- start_time = int(time.time()) - 10 * 86400 - 3600
- end_time = int(time.time()) - 10 * 86400
- painter = ChannelCovPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6}, filter_wave=3600)
- painter.paint()
- def test_chcov_succ(self):
- from refill import ChannelCovSuccPainter
- start_time = int(time.time()) - 20 * 86400 - 3600
- end_time = int(time.time()) - 20 * 86400
- painter = ChannelCovSuccPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6}, filter_wave=3600)
- painter.paint()
- def test_mch_ratio_painter(self):
- from refill import MerchantCumRatioPainter
- start_time = int(time.time()) - 21 * 86400 - 3600
- end_time = int(time.time()) - 20 * 86400
- painter = MerchantCumRatioPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6})
- painter.paint()
- def test_mch_covratio_painter(self):
- from refill import MerchantCovRatioPainter
- start_time = int(time.time()) - 21 * 86400 - 3600
- end_time = int(time.time()) - 20 * 86400
- painter = MerchantCovRatioPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6})
- painter.paint()
- def test_mch_amount_painter(self):
- from refill import MerchantAmountPainter
- start_time = int(time.time()) - 20 * 86400 - 3600
- end_time = int(time.time()) - 20 * 86400
- painter = MerchantAmountPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6})
- painter.paint()
- def test_ChannelWriter(self):
- from refill import ChannelWriter, open_hdf5
- import time
- hfive = open_hdf5('/var/www/html/data/stdata/channel.hdf5', True)
- chwriter = ChannelWriter(hfive)
- itema = {'channel_name': 'zero', 'time': int(time.time()), 'spec': 50, 'card_type': 4, 'channel_amount': 49, 'period': 30}
- itemb = {'channel_name': 'xxxxxx', 'time': int(time.time()), 'spec': 50, 'card_type': 4, 'channel_amount': 49, 'period': 30,
- 'mch_amount': 49.625}
- chwriter.write('ch_succ', itema)
- chwriter.write('ch_succ', itemb)
- def test_netcheck(self):
- from refill import NetchkReader
- import time
- from refill import day_stamp
- day = day_stamp(int(time.time()) - 4 * 86400)
- days = [day, day - 86400]
- net = NetchkReader()
- channels = net.tuple_path(day)
- channels = net.many_tuple_path(days)
- def test_net_painter(self):
- from refill import NetcheckCovPainter
- start_time = int(time.time()) - 5 * 86400
- end_time = int(time.time()) - 4 * 86400
- painter = NetcheckCovPainter(start_time=start_time, end_time=end_time)
- painter.paint()
- def testDays(self):
- from refill import MerchantReader
- from refill import ChannelReader
- try:
- chreader = ChannelReader()
- chdays = chreader.days()
- xlables = [time.strftime('%d-%H:%M:%S', time.localtime(d)) for d in chdays]
- print(xlables)
- reader = MerchantReader()
- days = reader.days()
- except Exception as ex:
- log.error(ex)
- pass
- def test_jsonLoads(self):
- import json
- try:
- str = 4
- x = json.loads(str)
- print(x)
- except Exception as ex:
- print(ex)
- def test_partial(self):
- from functools import partial
- add_five = partial()
- def test_key(self):
- def person(name, age, *, city, job):
- print(name, age, city, job)
- person('Jack', 24, city='Beijing', job='Engineer')
- person('Jack', 24, 'Beijing', job='Engineer')
- person('Jack', 24, 'Beijing', 'Engineer')
- def test_none(self):
- x = None
- len = len(x)
- print(len)
- def test_set(self):
- x = set([(4, 50), (4, 100)])
- y = set([(4, 50), (5, 100)])
- z = x | y
- print(z)
- def test_env(self):
- import os
- x = os.getenv("PYCHARM_DISPLAY_PORT", "-1")
- print(x)
- def test_matplot(self):
- import matplotlib
- x = matplotlib.__version__
- print(x)
- def test_time(self):
- x = int(time.time())
- print(x)
- def test_rpop(self):
- import redis
- import json
- pool = redis.ConnectionPool(host=self.__redis_host, port=6379, db=0)
- r = redis.Redis(connection_pool=pool)
- item = r.rpop('REFILL_MONITOR_QUEUE')
- if item is None:
- print('hello')
- else:
- try:
- val = json.loads(item)
- method = val['method']
- params = val['params']
- print(method, params)
- except Exception as ex:
- log.error(ex)
- def test_consumer(self):
- from refill import WriterConsumer
- class PrintHandler:
- def write(self, method, msg):
- log.debug(msg)
- time.sleep(0.01)
- handler = PrintHandler()
- consumer = WriterConsumer(handler, 'PrintHandler')
- consumer.start()
- for i in range(100000):
- consumer.put('test', f'index = {i}')
- consumer.quit()
- consumer.join()
- def test_merge(self):
- from collections import defaultdict
- import time as time
- def merge(l, r):
- for name, ls in l.items():
- if name in r:
- ls.extend(r[name])
- k = set(ls)
- r[name] = list(k)
- else:
- r[name] = ls
- return r
- all = defaultdict(list)
- a = {'yunchonggongfs': [(4, 100), (4, 30), (4, 50)]}
- b = {'yunchonggongfs': [(4, 100), (4, 200), (4, 50)]}
- all = merge(a, all)
- all = merge(b, all)
- x = 0;
- def test_split(self):
- def split_card(card_specs):
- result = dict()
- for card_type, spec in card_specs:
- if card_type not in result:
- result[card_type] = []
- result[card_type].append(spec)
- return result
- tups = [(4, 100), (4, 50), (5, 10), (4, 30), (5, 30), (6, 100), (5, 100)]
- x = split_card(tups)
- y = 1
- def testFindLowest(self):
- # ratios = {'feimingyu_high': 5 / 215, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 7 / 23276,
- # 'yunchonggongx': 1 / 232760}
- # ratios = {'feimingyu_high': 0.05,'feimingyu_high': 5 / 215, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 0.1, 'yunchonggong': 1 / 23276,'yunchonggongx': 1 / 232760}
- ratios = {'feimingyu_high': 0.3, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276}
- # ratios = {'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276}
- # ratios = {'feimingyu_high': 5 / 215, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276}
- # ratios = {'feimingyu_high': 0.33, 'lechong': 4 / 2108, 'lechong_high': 4 / 2223, 'weixue': 13 / 17101, 'yunchonggong': 1 / 23276}
- import math, cmath
- def classify_ratio(ratios):
- cls = {}
- indexer = []
- for name, ratio in ratios.items():
- index = round(math.log(ratio))
- log.debug(f'index={index} ratio={ratio}')
- if index not in cls:
- cls[index] = []
- indexer.append(index)
- cls[index].append(name)
- max_index = max(indexer)
- min_index = max_index - 5
- max_index = -7 if max_index < -7 else max_index
- min_index = -7 if min_index < -7 else min_index
- high = []
- low = []
- for index, names in cls.items():
- if index >= min_index:
- high.extend(names)
- else:
- low.extend(names)
- return high, low
- high, low = classify_ratio(ratios)
- log.debug(f'high={high},low={low}')
- if __name__ == '__main__':
- unittest.main()
|