testPlot.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. import time
  2. import unittest
  3. import logging
  4. from refill import time_border
  5. logging.basicConfig(filename='/var/www/html/data/log/qreader.log', level=logging.DEBUG)
  6. log = logging.getLogger('reader')
  7. class MyTestCase(unittest.TestCase):
  8. # __redis_host = '192.168.3.104'
  9. __redis_host = '192.168.3.46'
  10. def test_something(self):
  11. self.assertEqual(True, False) # add assertion here
  12. def test_listener(self):
  13. try:
  14. from refill import queueListener
  15. queueListener.set_redis(self.__redis_host, '6379')
  16. queueListener.prepare_data()
  17. except Exception as ex:
  18. log.error(ex)
  19. pass
  20. def test_timestart(self):
  21. stamp = int(time.time())
  22. for i in range(300):
  23. x = stamp + i
  24. l = time_border(300, x, True)
  25. r = time_border(300, x, False)
  26. print('l=', l, 'r=', r)
  27. def testChannel(self):
  28. from refill import ChannelReader
  29. reader = ChannelReader()
  30. days = reader.days()
  31. print('days=', days)
  32. for day in days:
  33. x = reader.tuple_path(day, card_types={4, 5, 6})
  34. x1 = reader.tuple_path(day, {'chizeng', 'ainika'}, {4, 5, 6})
  35. x2 = reader.tuple_path(day, {'chizeng', 'ainika'}, {4, 5, 6}, 50)
  36. def test_chpainter(self):
  37. from refill import ChannelCumPainter
  38. start_time = int(time.time()) - 10 * 86400 - 3600
  39. end_time = int(time.time()) - 8 * 86400
  40. painter = ChannelCumPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6})
  41. painter.paint()
  42. def test_ch_speed_analyze_painter(self):
  43. from refill import ChannelSpeedAnalyzePainter
  44. start_time = 1664632800
  45. end_time = 1664640000
  46. painter = ChannelSpeedAnalyzePainter(start_time=start_time, end_time=end_time, chnames=set(['feimingyunew']), card_types={4}, spec=100)
  47. painter.paint()
  48. def test_chcov_ratio(self):
  49. from refill import ChannelCovPainter
  50. start_time = int(time.time()) - 10 * 86400 - 3600
  51. end_time = int(time.time()) - 10 * 86400
  52. painter = ChannelCovPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6}, filter_wave=3600)
  53. painter.paint()
  54. def test_chcov_succ(self):
  55. from refill import ChannelCovSuccPainter
  56. start_time = int(time.time()) - 20 * 86400 - 3600
  57. end_time = int(time.time()) - 20 * 86400
  58. painter = ChannelCovSuccPainter(start_time=start_time, end_time=end_time, chnames=set(), card_types={4, 5, 6}, filter_wave=3600)
  59. painter.paint()
  60. def test_mch_ratio_painter(self):
  61. from refill import MerchantCumRatioPainter
  62. start_time = int(time.time()) - 21 * 86400 - 3600
  63. end_time = int(time.time()) - 20 * 86400
  64. painter = MerchantCumRatioPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6})
  65. painter.paint()
  66. def test_mch_covratio_painter(self):
  67. from refill import MerchantCovRatioPainter
  68. start_time = int(time.time()) - 21 * 86400 - 3600
  69. end_time = int(time.time()) - 20 * 86400
  70. painter = MerchantCovRatioPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6})
  71. painter.paint()
  72. def test_mch_amount_painter(self):
  73. from refill import MerchantAmountPainter
  74. start_time = int(time.time()) - 20 * 86400 - 3600
  75. end_time = int(time.time()) - 20 * 86400
  76. painter = MerchantAmountPainter(start_time=start_time, end_time=end_time, mchids=set(), card_types={4, 5, 6})
  77. painter.paint()
  78. def test_ChannelWriter(self):
  79. from refill import ChannelWriter, open_hdf5
  80. import time
  81. hfive = open_hdf5('/var/www/html/data/stdata/channel.hdf5', True)
  82. chwriter = ChannelWriter(hfive)
  83. itema = {'channel_name':'zero', 'time':int(time.time()), 'spec':50, 'card_type':4, 'channel_amount':49, 'period':30}
  84. itemb = {'channel_name':'xxxxxx', 'time':int(time.time()), 'spec':50, 'card_type':4, 'channel_amount':49, 'period':30,'mch_amount':49.625}
  85. chwriter.write('ch_succ',itema)
  86. chwriter.write('ch_succ',itemb)
  87. def test_netcheck(self):
  88. from refill import NetchkReader
  89. import time
  90. from refill import day_stamp
  91. day = day_stamp(int(time.time()) - 4 * 86400)
  92. days = [day, day - 86400]
  93. net = NetchkReader()
  94. channels = net.tuple_path(day)
  95. channels = net.many_tuple_path(days)
  96. def test_net_painter(self):
  97. from refill import NetcheckCovPainter
  98. start_time = int(time.time()) - 5 * 86400
  99. end_time = int(time.time()) - 4 * 86400
  100. painter = NetcheckCovPainter(start_time=start_time, end_time=end_time)
  101. painter.paint()
  102. def testDays(self):
  103. from refill import MerchantReader
  104. from refill import ChannelReader
  105. try:
  106. chreader = ChannelReader()
  107. chdays = chreader.days()
  108. xlables = [time.strftime('%d-%H:%M:%S', time.localtime(d)) for d in chdays]
  109. print(xlables)
  110. reader = MerchantReader()
  111. days = reader.days()
  112. except Exception as ex:
  113. log.error(ex)
  114. pass
  115. def test_jsonLoads(self):
  116. import json
  117. try:
  118. str = 4
  119. x = json.loads(str)
  120. print(x)
  121. except Exception as ex:
  122. print(ex)
  123. def test_partial(self):
  124. from functools import partial
  125. add_five = partial()
  126. def test_key(self):
  127. def person(name, age, *, city, job):
  128. print(name, age, city, job)
  129. person('Jack', 24, city='Beijing', job='Engineer')
  130. person('Jack', 24, 'Beijing', job='Engineer')
  131. person('Jack', 24, 'Beijing', 'Engineer')
  132. def test_none(self):
  133. x = None
  134. len = len(x)
  135. print(len)
  136. def test_set(self):
  137. x = set([(4, 50), (4, 100)])
  138. y = set([(4, 50), (5, 100)])
  139. z = x | y
  140. print(z)
  141. def test_env(self):
  142. import os
  143. x = os.getenv("PYCHARM_DISPLAY_PORT", "-1")
  144. print(x)
  145. def test_matplot(self):
  146. import matplotlib
  147. x = matplotlib.__version__
  148. print(x)
  149. def test_time(self):
  150. x = int(time.time())
  151. print(x)
  152. def test_rpop(self):
  153. import redis
  154. import json
  155. pool = redis.ConnectionPool(host=self.__redis_host, port=6379, db=0)
  156. r = redis.Redis(connection_pool=pool)
  157. item = r.rpop('REFILL_MONITOR_QUEUE')
  158. if item is None:
  159. print('hello')
  160. else:
  161. try:
  162. val = json.loads(item)
  163. method = val['method']
  164. params = val['params']
  165. print(method, params)
  166. except Exception as ex:
  167. log.error(ex)
  168. def test_consumer(self):
  169. from refill import WriterConsumer
  170. class PrintHandler:
  171. def write(self, method, msg):
  172. log.debug(msg)
  173. time.sleep(0.01)
  174. handler = PrintHandler()
  175. consumer = WriterConsumer(handler, 'PrintHandler')
  176. consumer.start()
  177. for i in range(100000):
  178. consumer.put('test', f'index = {i}')
  179. consumer.quit()
  180. consumer.join()
  181. def test_merge(self):
  182. from collections import defaultdict
  183. import time as time
  184. def merge(l, r):
  185. for name, ls in l.items():
  186. if name in r:
  187. ls.extend(r[name])
  188. k = set(ls)
  189. r[name] = list(k)
  190. else:
  191. r[name] = ls
  192. return r
  193. all = defaultdict(list)
  194. a = {'yunchonggongfs': [(4, 100), (4, 30), (4, 50)]}
  195. b = {'yunchonggongfs': [(4, 100), (4, 200), (4, 50)]}
  196. all = merge(a, all)
  197. all = merge(b, all)
  198. x = 0;
  199. def test_split(self):
  200. def split_card(card_specs):
  201. result = dict()
  202. for card_type, spec in card_specs:
  203. if card_type not in result:
  204. result[card_type] = []
  205. result[card_type].append(spec)
  206. return result
  207. tups = [(4, 100), (4, 50), (5, 10), (4, 30), (5, 30), (6, 100), (5, 100), ]
  208. x = split_card(tups)
  209. y = 1
  210. if __name__ == '__main__':
  211. unittest.main()