ChannelPainter.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. from .DataStream import EChPosmap as pos_map, day_stamp, span_days, time_border, calc_interval
  2. from .ChannelReader import ChannelReader, ChPathFilter
  3. from .PainterBase import PainterBase
  4. from collections import defaultdict
  5. from matplotlib.figure import Figure
  6. from matplotlib import ticker
  7. from io import BytesIO
  8. import numpy as np
  9. from .algorithm import calc_chratios
  10. import time as time
  11. import logging
  12. logger = logging.getLogger('ChannelPainter')
  13. _all_channels = set()
  14. def _add_channel(channel):
  15. if channel not in _all_channels:
  16. _all_channels.add(channel)
  17. def get_channels():
  18. logger.debug(_all_channels)
  19. return list(_all_channels)
  20. def chratios_pathes(reader: ChannelReader, tuple_pathes: dict, days: list):
  21. count = len(days)
  22. all_datas = reader.init_data(count)
  23. for name, tup in tuple_pathes.items():
  24. _add_channel(name)
  25. mch_datas = reader.init_data(count)
  26. for _card_type, _spec in tup:
  27. detail_datas = reader.init_data(count)
  28. for i, day in enumerate(days):
  29. data = reader.read(day, name, _card_type, _spec)
  30. if data is not None:
  31. column_pos = i * 86400
  32. if mch_datas is not None:
  33. view = mch_datas[:, column_pos:column_pos + 86400]
  34. view += data
  35. if detail_datas is not None:
  36. view = detail_datas[:, column_pos:column_pos + 86400]
  37. view += data
  38. yield name, _card_type, _spec, detail_datas
  39. all_datas += mch_datas
  40. yield name, None, None, mch_datas
  41. yield 'all', None, None, all_datas
  42. def ratio_pathes(reader: ChannelReader, tuple_pathes: dict, days: list, filter: ChPathFilter = None):
  43. count = len(days)
  44. all_datas = None if filter.show_detail() else reader.init_data(count)
  45. show_channel = filter.show_channel()
  46. for name, tup in tuple_pathes.items():
  47. _add_channel(name)
  48. mch_datas = reader.init_data(count)
  49. for _card_type, _spec in tup:
  50. detail_datas = reader.init_data(count) if filter.show_detail() else None
  51. for i, day in enumerate(days):
  52. data = reader.read(day, name, _card_type, _spec)
  53. if data is not None:
  54. column_pos = i * 86400
  55. if mch_datas is not None:
  56. view = mch_datas[:, column_pos:column_pos + 86400]
  57. view += data
  58. if detail_datas is not None:
  59. view = detail_datas[:, column_pos:column_pos + 86400]
  60. view += data
  61. if detail_datas is not None:
  62. yield name, _card_type, _spec, detail_datas
  63. if all_datas is not None and mch_datas is not None:
  64. all_datas += mch_datas
  65. if mch_datas is not None and show_channel:
  66. yield name, None, None, mch_datas
  67. if all_datas is not None:
  68. yield 'all', None, None, all_datas
  69. def detail_pathes(reader: ChannelReader, tuple_pathes: dict, days: list):
  70. count = len(days)
  71. for name, tup in tuple_pathes.items():
  72. _add_channel(name)
  73. for _card_type, _spec in tup:
  74. detail_datas = reader.init_data(count)
  75. for i, day in enumerate(days):
  76. data = reader.read(day, name, _card_type, _spec)
  77. if data is not None:
  78. column_pos = i * 86400
  79. view = detail_datas[:, column_pos:column_pos + 86400]
  80. view += data
  81. yield name, _card_type, _spec, detail_datas
  82. class ChannelPainter(PainterBase):
  83. def _fig_funs(self):
  84. def create():
  85. fig = Figure(figsize=(16, 4))
  86. ax = fig.subplots()
  87. ax.set_title('success ratio')
  88. ax.set(xlabel='time', ylabel='ratio')
  89. return ax, fig
  90. def flush(ax, fig, ticks, lables):
  91. ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=4))
  92. ax.set_xticks(ticks=ticks)
  93. ax.set_xticklabels(lables)
  94. fig.autofmt_xdate()
  95. ax.grid()
  96. ax.legend(bbox_to_anchor=(1, 1), loc='upper left')
  97. fig.tight_layout()
  98. buf = BytesIO()
  99. fig.savefig(buf, format="png")
  100. return buf
  101. return create, flush
  102. def _label(self, chname, succ, count, card_type=None, spec=None):
  103. _card_type = None
  104. if card_type == 1:
  105. _card_type = 'SY'
  106. elif card_type == 2:
  107. _card_type = 'SH'
  108. elif card_type == 4:
  109. _card_type = 'YD'
  110. elif card_type == 5:
  111. _card_type = 'LT'
  112. elif card_type == 6:
  113. _card_type = 'DX'
  114. elif card_type == 7:
  115. _card_type = 'TH'
  116. lable = f"{chname}"
  117. if _card_type is not None:
  118. lable += f"-{_card_type}"
  119. if spec is not None:
  120. lable += f"-{spec}"
  121. if count > 0:
  122. ratio = round(succ * 100 / count, 5)
  123. else:
  124. ratio = 0.00
  125. lable += f":{succ}/{count}={ratio}%"
  126. return lable, ratio
  127. def _label_simple(self, chname, filter, card_type=None, spec=None):
  128. def scard_type(card_type):
  129. if card_type == 1:
  130. _card_type = 'SY'
  131. elif card_type == 2:
  132. _card_type = 'SH'
  133. elif card_type == 4:
  134. _card_type = 'YD'
  135. elif card_type == 5:
  136. _card_type = 'LT'
  137. elif card_type == 6:
  138. _card_type = 'DX'
  139. elif card_type == 7:
  140. _card_type = 'TH'
  141. else:
  142. _card_type = ""
  143. return _card_type
  144. if card_type is None and spec is None:
  145. return chname
  146. channels = filter.channels()
  147. if channels is None or len(channels) > 1:
  148. return f'{chname}-{scard_type(card_type)}'
  149. else:
  150. return f'{scard_type(card_type)}-{spec}'