from .DataStream import EChPosmap as pos_map, day_stamp, span_days, time_border, calc_interval from .ChannelReader import ChannelReader from collections import defaultdict from matplotlib.figure import Figure from matplotlib import ticker from io import BytesIO import numpy as np from .algorithm import calc_chratios import time as time import logging logger = logging.getLogger('painter') _all_channels = set() def add_channel(channel): if channel not in _all_channels: _all_channels.add(channel) def get_channels(): return list(_all_channels) def ratio_pathes(reader: ChannelReader, tuple_pathes: dict, days: list, spec=None): count = len(days) show_detail = True if len(list(tuple_pathes.keys())) == 1 else False if show_detail == False: all_datas = reader.init_data(count) else: all_datas = None for name, tup in tuple_pathes.items(): add_channel(name) mch_datas = reader.init_data(count) for _card_type, _spec in tup: if spec is not None and _spec != spec: continue if show_detail: detail_datas = reader.init_data(count) else: detail_datas = None for i, day in enumerate(days): data = reader.read(day, name, _card_type, _spec) if data is not None: column_pos = i * 86400 view = mch_datas[:, column_pos:column_pos + 86400] view += data if show_detail: view = detail_datas[:, column_pos:column_pos + 86400] view += data if show_detail: yield name, _card_type, _spec, detail_datas if all_datas is not None: all_datas += mch_datas yield name, None, None, mch_datas if show_detail == False: yield 'all', None, None, all_datas class ChannelPainter(object): def __init__(self, start_time: int, end_time: int, chnames: set = None, card_types: set = None, spec: int = None, filter_wave: int = None): self._reader = ChannelReader() _start_time, _end_time, self._chnames, self._card_types, self._spec, self._filter_wave = start_time, end_time, chnames, card_types, spec, filter_wave if _end_time is None: _end_time = int(time.time()) end_time = self._reader.near_stamp(_end_time, False) if end_time is None: raise Exception('data is empty') if _start_time is None or start_time > end_time: _start_time = end_time - 7200 start_time = self._reader.near_stamp(_start_time, True) if start_time is None: raise Exception('data is empty') stime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t)) logger.debug("start_time=%s end_time=%s",stime(start_time) ,stime(end_time)) interval = calc_interval(start_time, end_time) start_time = time_border(interval, start_time, True) end_time = time_border(interval, end_time, False) self._days = span_days(start_time, end_time) self._start_time = start_time self._end_time = end_time self._interval = interval pass def _fig_funs(self): def create(): fig = Figure(figsize=(19, 8)) ax = fig.subplots() ax.set_title('success ratio') ax.set(xlabel='time', ylabel='ratio') return ax, fig def flush(ax, fig, ticks, lables): ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=4)) ax.set_xticks(ticks=ticks) ax.set_xticklabels(lables) fig.autofmt_xdate() ax.grid() fig.subplots_adjust(left=0.1, right=0.8, top=0.95, bottom=0.1) ax.legend(bbox_to_anchor=(1, 1), loc='upper left') buf = BytesIO() fig.savefig(buf, format="png") return buf return create, flush def paint(self): reader = ChannelReader() tuple_pathes = reader.many_tuple_path(self._days, self._chnames, self._card_types, self._spec) gen = ratio_pathes(self._reader, tuple_pathes, self._days, self._spec) if len(self._days) == 0: return BytesIO() day_stamp = self._days[0] fig_create, fig_flush = self._fig_funs() ax, fig = fig_create() x = np.array([d - self._start_time for d in range(self._start_time, self._end_time)]) if self._filter_wave is not None and self._filter_wave > 1: window = np.ones(self._filter_wave) / self._filter_wave else: window = None chname_ratios = [] for _chname, _card_type, _spec, _data in gen: succ, count, y = calc_chratios(_data, pos_map, self._start_time - day_stamp, self._end_time - day_stamp) y = np.convolve(y, window, 'same') if window is not None else y label, ratio = self._label(chname=_chname, succ=succ, count=count, card_type=_card_type, spec=_spec) ax.plot(x, y, ls='-', label=label) if _card_type is None and _spec is None and _chname != 'all': chname_ratios.append((_chname, ratio)) ticks = [d - self._start_time for d in range(self._start_time, self._end_time + 1, self._interval)] xlables = [time.strftime('%d-%H:%M:%S', time.localtime(d + self._start_time)) for d in ticks] buf = fig_flush(ax, fig, ticks, xlables) chname_ratios = sorted(chname_ratios, key=lambda x: (x[1], x[0]), reverse=True) result = [] for name, ratio in chname_ratios: result.append(f'{name}:{ratio}') return buf, result def _label(self, chname, succ, count, card_type=None, spec=None): _card_type = None if card_type == 1: _card_type = 'SY' elif card_type == 2: _card_type = 'SH' elif card_type == 4: _card_type = 'YD' elif card_type == 5: _card_type = 'LT' elif card_type == 6: _card_type = 'DX' elif card_type == 7: _card_type = 'TH' lable = f"{chname}" if _card_type is not None: lable += f"-{_card_type}" if spec is not None: lable += f"-{spec}" if count > 0: ratio = round(succ * 100 / count, 2) else: ratio = 0.00 lable += f":{succ}/{count}={ratio}%" return lable, ratio