ChannelPainter.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from .DataStream import EChPosmap as pos_map, day_stamp, span_days, time_border, calc_interval
  2. from .ChannelReader import ChannelReader, ChPathFilter
  3. from .PainterBase import PainterBase
  4. from collections import defaultdict
  5. from matplotlib.figure import Figure
  6. from matplotlib import ticker
  7. from io import BytesIO
  8. import numpy as np
  9. from .algorithm import calc_chratios
  10. import time as time
  11. import logging
  12. logger = logging.getLogger('ChannelPainter')
  13. _all_channels = set()
  14. def _add_channel(channel):
  15. if channel not in _all_channels:
  16. _all_channels.add(channel)
  17. def get_channels():
  18. logger.debug(_all_channels)
  19. return list(_all_channels)
  20. def ratio_pathes(reader: ChannelReader, tuple_pathes: dict, days: list, filter: ChPathFilter = None):
  21. count = len(days)
  22. all_datas = None if filter.show_detail() else reader.init_data(count)
  23. show_channel = filter.show_channel()
  24. for name, tup in tuple_pathes.items():
  25. _add_channel(name)
  26. mch_datas = reader.init_data(count)
  27. for _card_type, _spec in tup:
  28. detail_datas = reader.init_data(count) if filter.show_detail() else None
  29. for i, day in enumerate(days):
  30. data = reader.read(day, name, _card_type, _spec)
  31. if data is not None:
  32. column_pos = i * 86400
  33. if mch_datas is not None:
  34. view = mch_datas[:, column_pos:column_pos + 86400]
  35. view += data
  36. if detail_datas is not None:
  37. view = detail_datas[:, column_pos:column_pos + 86400]
  38. view += data
  39. if detail_datas is not None:
  40. yield name, _card_type, _spec, detail_datas
  41. if all_datas is not None and mch_datas is not None:
  42. all_datas += mch_datas
  43. if mch_datas is not None and show_channel:
  44. yield name, None, None, mch_datas
  45. if all_datas is not None:
  46. yield 'all', None, None, all_datas
  47. def detail_pathes(reader: ChannelReader, tuple_pathes: dict, days: list):
  48. count = len(days)
  49. for name, tup in tuple_pathes.items():
  50. _add_channel(name)
  51. for _card_type, _spec in tup:
  52. detail_datas = reader.init_data(count)
  53. for i, day in enumerate(days):
  54. data = reader.read(day, name, _card_type, _spec)
  55. if data is not None:
  56. column_pos = i * 86400
  57. view = detail_datas[:, column_pos:column_pos + 86400]
  58. view += data
  59. yield name, _card_type, _spec, detail_datas
  60. class ChannelPainter(PainterBase):
  61. def _fig_funs(self):
  62. def create():
  63. fig = Figure(figsize=(16, 4))
  64. ax = fig.subplots()
  65. ax.set_title('success ratio')
  66. ax.set(xlabel='time', ylabel='ratio')
  67. return ax, fig
  68. def flush(ax, fig, ticks, lables):
  69. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=4))
  70. ax.set_xticks(ticks=ticks)
  71. ax.set_xticklabels(lables)
  72. fig.autofmt_xdate()
  73. ax.grid()
  74. ax.legend(bbox_to_anchor=(1, 1), loc='upper left')
  75. fig.tight_layout()
  76. buf = BytesIO()
  77. fig.savefig(buf, format="png")
  78. return buf
  79. return create, flush
  80. def _label(self, chname, succ, count, card_type=None, spec=None):
  81. _card_type = None
  82. if card_type == 1:
  83. _card_type = 'SY'
  84. elif card_type == 2:
  85. _card_type = 'SH'
  86. elif card_type == 4:
  87. _card_type = 'YD'
  88. elif card_type == 5:
  89. _card_type = 'LT'
  90. elif card_type == 6:
  91. _card_type = 'DX'
  92. elif card_type == 7:
  93. _card_type = 'TH'
  94. lable = f"{chname}"
  95. if _card_type is not None:
  96. lable += f"-{_card_type}"
  97. if spec is not None:
  98. lable += f"-{spec}"
  99. if count > 0:
  100. ratio = round(succ * 100 / count, 5)
  101. else:
  102. ratio = 0.00
  103. lable += f":{succ}/{count}={ratio}%"
  104. return lable, ratio
  105. def _label_simple(self, chname, filter, card_type=None, spec=None):
  106. def scard_type(card_type):
  107. if card_type == 1:
  108. _card_type = 'SY'
  109. elif card_type == 2:
  110. _card_type = 'SH'
  111. elif card_type == 4:
  112. _card_type = 'YD'
  113. elif card_type == 5:
  114. _card_type = 'LT'
  115. elif card_type == 6:
  116. _card_type = 'DX'
  117. elif card_type == 7:
  118. _card_type = 'TH'
  119. else:
  120. _card_type = ""
  121. return _card_type
  122. if card_type is None and spec is None:
  123. return chname
  124. channels = filter.channels()
  125. if channels is None or len(channels) > 1:
  126. return f'{chname}-{scard_type(card_type)}'
  127. else:
  128. return f'{scard_type(card_type)}-{spec}'