import unittest import redis import h5py import time from datetime import timedelta import re import numpy as np from PIL import Image import json import logging logging.basicConfig(filename='/var/www/html/data/log/plot_test.log', format='%(levelname)10s %(asctime)s %(name)10s %(thread)d %(message)s', level=logging.DEBUG) logger = logging.getLogger('test_log') class DataTest(unittest.TestCase): # __redis_host = '192.168.3.104' __redis_host = '192.168.3.226' def test_connect(self): pool = redis.ConnectionPool(host=self.__redis_host, port=6379, db=0) r = redis.Redis(connection_pool=pool) for item in r.hscan_iter('nc_channel_monitor_commit'): key = item[0] val = item[1] print(str(key, encoding="utf-8"), str(val, encoding="utf-8")) def test_regex(self): text = 'succm-lingzh-1-4-30-1618291260' result = re.split(r'-', text) pass def day(self, time): pass def test_today(self): stamp = 1617908681 stamp = int(stamp) x = time.gmtime(stamp + 8 * 3600) diff = timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec) today = stamp - diff.total_seconds() y = stamp - today pass def dir(self, group): result = [] for name, sub in group.items(): if isinstance(sub, h5py.Group): result.extend(self.dir(sub)) else: print(sub.name) result.append(sub.name) return result def test_h5pyr(self): try: file_name = '/var/www/html/data/stdata/data.hdf5' f = h5py.File(file_name, 'r') root = f.require_group('/') result = self.dir(group=root) f.close() except Exception as ex: print(ex) def test_numpy(self): x = np.arange(0, 86400) y = x.reshape((-1, 300)) y = np.sum(y, axis=1) pos = np.where(x >= 25890) x = x[pos] pos = np.where(x < 85400) x = x[pos] print(x) def test_curmin(self): time_sec = int(time.time()) cur_min = time_sec - time_sec % 60 print('cur_min=', cur_min) def test_profitPrepare(self): from refill import queueListener queueListener.set_redis(self.__redis_host,'6379') queueListener.prepare_data() def test_mamount(self): from refill import MAmountCalc calc = MAmountCalc() calc.set_redis(self.__redis_host,'6379') logger.debug('start log.') calc.run() def test_json(self): try: x = {'method': 'commit', 'params': {'inprice': 4.5, 'outprice': 12}} print(x) # x = [ { 'a' : 1, 'b' : 2, 'c' : 3, 'd' : 4, 'e' : 5 } ] data = json.dumps(x) print(data) y = json.loads(data) print(y) except Exception as ex: print(ex) finally: pass def test_setcache(self): r = None try: pool = redis.ConnectionPool(host='192.168.1.220', port=6379, db=0) r = redis.Redis(connection_pool=pool) r.set(f"nc_channel_ratios", "{\"time\": 1624298880, \"ratios\": {\"feinimoshu_hf-100-4-1\": [0, 0, 0, 0], \"feinimoshu_hf-200-4-1\": [0, 0, 0, 0], \"feinimoshu_hf-50-4-1\": [0, 0, 0, 0], \"feinimoshu_hf-100-6-1\": [0, 0, 0, 0], \"gftd-100-2-5\": [0, 0, 38, 77], \"gftd-200-2-5\": [0, 0, 13, 45], \"gftd-500-2-5\": [0, 0, 14, 0], \"gftdsinop-100-2-5\": [6, 0, 0, 0], \"gftdsinop-200-2-5\": [2, 0, 0, 0], \"gftdsinop-500-2-5\": [0, 0, 0, 0], \"lingzh-100-4-1\": [0, 0, 0, 0], \"lingzh-30-4-1\": [6, 0, 0, 6], \"lingzh-50-4-1\": [0, 0, 0, 0], \"lingzh-100-5-1\": [0, 0, 0, 0], \"lingzh-200-5-1\": [0, 0, 0, 0], \"lingzh-30-5-1\": [4, 0, 1, 3], \"lingzh-50-5-1\": [2, 0, 0, 3], \"lingzh-100-6-1\": [0, 0, 0, 0], \"lingzh-200-6-1\": [0, 0, 0, 0], \"lingzh-30-6-1\": [0, 0, 0, 0], \"lingzh-300-6-1\": [0, 0, 0, 0], \"lingzh-50-6-1\": [0, 0, 0, 0], \"lingzh-100-4-5\": [0, 0, 0, 0], \"lingzh-200-4-5\": [0, 0, 0, 0], \"lingzh-100-5-5\": [0, 0, 0, 0], \"lingzh-200-5-5\": [0, 0, 0, 0], \"lingzh-100-6-5\": [0, 0, 0, 0], \"lingzhoil-100-2-1\": [0, 0, 0, 0], \"lingzhoil-200-2-1\": [1, 0, 0, 1], \"moxj-30-5-1\": [5, 0, 2, 1], \"moxj-50-5-1\": [3, 0, 3, 1], \"moxj_fs-50-4-1\": [0, 0, 0, 0], \"qianqian-100-4-1\": [0, 0, 0, 0], \"qianqian-200-4-1\": [0, 0, 0, 0], \"qianqian-30-4-1\": [7, 0, 0, 7], \"qianqian-50-4-1\": [0, 0, 0, 0], \"qianqian-50-5-1\": [0, 0, 0, 0], \"qianqian-100-6-1\": [0, 0, 0, 0], \"weiyiwt-100-4-1\": [0, 0, 0, 0], \"weiyiwt-200-4-1\": [0, 0, 0, 0], \"weiyiwt-30-4-1\": [8, 0, 0, 8], \"weiyiwt-50-4-1\": [0, 0, 0, 0], \"weiyiwt-100-5-1\": [0, 0, 0, 0], \"weiyiwt-100-6-1\": [0, 0, 0, 0], \"weiyiwt-200-6-1\": [0, 0, 0, 0], \"weiyiwt-30-6-1\": [1, 0, 0, 1], \"weiyiwt-50-6-1\": [0, 0, 0, 0], \"yinteng-100-6-1\": [0, 0, 0, 0], \"yinteng-200-6-1\": [0, 0, 0, 0], \"yinteng-30-6-1\": [0, 1, 0, 0], \"yinteng-50-6-1\": [0, 0, 0, 0], \"yunling-100-4-1\": [0, 0, 0, 0], \"yunling-200-4-1\": [0, 0, 0, 0], \"yunling-30-4-1\": [8, 0, 0, 9], \"yunling-50-4-1\": [0, 0, 0, 0], \"yunling-100-5-1\": [0, 0, 0, 0], \"yunling-30-5-1\": [6, 0, 2, 3], \"yunling-50-5-1\": [4, 0, 3, 3], \"yunling-100-6-1\": [0, 0, 0, 0], \"yunling-50-6-1\": [0, 0, 0, 0], \"yunsuoyao-100-4-1\": [0, 0, 0, 0], \"yunsuoyao-30-4-1\": [1, 0, 0, 0], \"yunsuoyao-50-4-1\": [0, 0, 0, 0], \"zanzanquick-30-4-1\": [0, 0, 0, 0], \"zanzanquick-100-5-1\": [0, 0, 0, 0], \"zanzanquick-200-5-1\": [0, 0, 0, 0], \"zanzanquick-30-5-1\": [0, 0, 0, 0], \"zanzanquick-50-5-1\": [0, 0, 0, 0], \"zanzanquick-100-6-1\": [0, 0, 0, 0], \"zanzanquick-200-6-1\": [0, 0, 0, 0], \"zanzanquick-30-6-1\": [0, 0, 0, 0]}}") except Exception as ex: print(ex) if __name__ == '__main__': unittest.main()