from .DataStream import EChPosmap as pos_map, day_stamp, span_days, time_border, calc_interval from .ChannelReader import ChannelReader, ChPathFilter from .PainterBase import PainterBase from collections import defaultdict from matplotlib.figure import Figure from matplotlib import ticker from io import BytesIO import numpy as np from .algorithm import calc_chratios import time as time import logging logger = logging.getLogger('ChannelPainter') _all_channels = set() def _add_channel(channel): if channel not in _all_channels: _all_channels.add(channel) def get_channels(): logger.debug(_all_channels) return list(_all_channels) def ratio_pathes(reader: ChannelReader, tuple_pathes: dict, days: list, filter: ChPathFilter = None): count = len(days) all_datas = None if filter.show_detail() else reader.init_data(count) show_channel = filter.show_channel() for name, tup in tuple_pathes.items(): _add_channel(name) mch_datas = reader.init_data(count) for _card_type, _spec in tup: detail_datas = reader.init_data(count) if filter.show_detail() else None for i, day in enumerate(days): data = reader.read(day, name, _card_type, _spec) if data is not None: column_pos = i * 86400 if mch_datas is not None: view = mch_datas[:, column_pos:column_pos + 86400] view += data if detail_datas is not None: view = detail_datas[:, column_pos:column_pos + 86400] view += data if detail_datas is not None: yield name, _card_type, _spec, detail_datas if all_datas is not None and mch_datas is not None: all_datas += mch_datas if mch_datas is not None and show_channel: yield name, None, None, mch_datas if all_datas is not None: yield 'all', None, None, all_datas def detail_pathes(reader: ChannelReader, tuple_pathes: dict, days: list): count = len(days) for name, tup in tuple_pathes.items(): _add_channel(name) for _card_type, _spec in tup: detail_datas = reader.init_data(count) for i, day in enumerate(days): data = reader.read(day, name, _card_type, _spec) if data is not None: column_pos = i * 86400 view = detail_datas[:, column_pos:column_pos + 86400] view += data yield name, _card_type, _spec, detail_datas class ChannelPainter(PainterBase): def _fig_funs(self): def create(): fig = Figure(figsize=(16, 4)) ax = fig.subplots() ax.set_title('success ratio') ax.set(xlabel='time', ylabel='ratio') return ax, fig def flush(ax, fig, ticks, lables): ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=4)) ax.set_xticks(ticks=ticks) ax.set_xticklabels(lables) fig.autofmt_xdate() ax.grid() ax.legend(bbox_to_anchor=(1, 1), loc='upper left') fig.tight_layout() buf = BytesIO() fig.savefig(buf, format="png") return buf return create, flush def _label(self, chname, succ, count, card_type=None, spec=None): _card_type = None if card_type == 1: _card_type = 'SY' elif card_type == 2: _card_type = 'SH' elif card_type == 4: _card_type = 'YD' elif card_type == 5: _card_type = 'LT' elif card_type == 6: _card_type = 'DX' elif card_type == 7: _card_type = 'TH' lable = f"{chname}" if _card_type is not None: lable += f"-{_card_type}" if spec is not None: lable += f"-{spec}" if count > 0: ratio = round(succ * 100 / count, 5) else: ratio = 0.00 lable += f":{succ}/{count}={ratio}%" return lable, ratio def _label_simple(self, chname, filter, card_type=None, spec=None): def scard_type(card_type): if card_type == 1: _card_type = 'SY' elif card_type == 2: _card_type = 'SH' elif card_type == 4: _card_type = 'YD' elif card_type == 5: _card_type = 'LT' elif card_type == 6: _card_type = 'DX' elif card_type == 7: _card_type = 'TH' else: _card_type = "" return _card_type if card_type is None and spec is None: return chname channels = filter.channels() if channels is None or len(channels) > 1: return f'{chname}-{scard_type(card_type)}' else: return f'{scard_type(card_type)}-{spec}'