123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- from .DataStream import EChPosmap as pos_map, day_stamp, span_days, time_border, calc_interval
- from .ChannelReader import ChannelReader, ChPathFilter
- from .PainterBase import PainterBase
- from collections import defaultdict
- from matplotlib.figure import Figure
- from matplotlib import ticker
- from io import BytesIO
- import numpy as np
- from .algorithm import calc_chratios
- import time as time
- import logging
- logger = logging.getLogger('ChannelPainter')
- _all_channels = set()
- def _add_channel(channel):
- if channel not in _all_channels:
- _all_channels.add(channel)
- def get_channels():
- logger.debug(_all_channels)
- return list(_all_channels)
- def chratios_pathes(reader: ChannelReader, tuple_pathes: dict, days: list):
- count = len(days)
- all_datas = reader.init_data(count)
- for name, tup in tuple_pathes.items():
- _add_channel(name)
- mch_datas = reader.init_data(count)
- for _card_type, _spec in tup:
- detail_datas = reader.init_data(count)
- for i, day in enumerate(days):
- data = reader.read(day, name, _card_type, _spec)
- if data is not None:
- column_pos = i * 86400
- if mch_datas is not None:
- view = mch_datas[:, column_pos:column_pos + 86400]
- view += data
- if detail_datas is not None:
- view = detail_datas[:, column_pos:column_pos + 86400]
- view += data
- yield name, _card_type, _spec, detail_datas
- all_datas += mch_datas
- yield name, None, None, mch_datas
- yield 'all', None, None, all_datas
- def ratio_pathes(reader: ChannelReader, tuple_pathes: dict, days: list, filter: ChPathFilter = None):
- count = len(days)
- all_datas = None if filter.show_detail() else reader.init_data(count)
- show_channel = filter.show_channel()
- for name, tup in tuple_pathes.items():
- _add_channel(name)
- mch_datas = reader.init_data(count)
- for _card_type, _spec in tup:
- detail_datas = reader.init_data(count) if filter.show_detail() else None
- for i, day in enumerate(days):
- data = reader.read(day, name, _card_type, _spec)
- if data is not None:
- column_pos = i * 86400
- if mch_datas is not None:
- view = mch_datas[:, column_pos:column_pos + 86400]
- view += data
- if detail_datas is not None:
- view = detail_datas[:, column_pos:column_pos + 86400]
- view += data
- if detail_datas is not None:
- yield name, _card_type, _spec, detail_datas
- if all_datas is not None and mch_datas is not None:
- all_datas += mch_datas
- if mch_datas is not None and show_channel:
- yield name, None, None, mch_datas
- if all_datas is not None:
- yield 'all', None, None, all_datas
- def detail_pathes(reader: ChannelReader, tuple_pathes: dict, days: list):
- count = len(days)
- for name, tup in tuple_pathes.items():
- _add_channel(name)
- for _card_type, _spec in tup:
- detail_datas = reader.init_data(count)
- for i, day in enumerate(days):
- data = reader.read(day, name, _card_type, _spec)
- if data is not None:
- column_pos = i * 86400
- view = detail_datas[:, column_pos:column_pos + 86400]
- view += data
- yield name, _card_type, _spec, detail_datas
- class ChannelPainter(PainterBase):
- def _fig_funs(self):
- def create():
- fig = Figure(figsize=(16, 4))
- ax = fig.subplots()
- ax.set_title('success ratio')
- ax.set(xlabel='time', ylabel='ratio')
- return ax, fig
- def flush(ax, fig, ticks, lables):
- ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=4))
- ax.set_xticks(ticks=ticks)
- ax.set_xticklabels(lables)
- fig.autofmt_xdate()
- ax.grid()
- ax.legend(bbox_to_anchor=(1, 1), loc='upper left')
- fig.tight_layout()
- buf = BytesIO()
- fig.savefig(buf, format="png")
- return buf
- return create, flush
- def _label(self, chname, succ, count, card_type=None, spec=None):
- _card_type = None
- if card_type == 1:
- _card_type = 'SY'
- elif card_type == 2:
- _card_type = 'SH'
- elif card_type == 4:
- _card_type = 'YD'
- elif card_type == 5:
- _card_type = 'LT'
- elif card_type == 6:
- _card_type = 'DX'
- elif card_type == 7:
- _card_type = 'TH'
- lable = f"{chname}"
- if _card_type is not None:
- lable += f"-{_card_type}"
- if spec is not None:
- lable += f"-{spec}"
- if count > 0:
- ratio = round(succ * 100 / count, 5)
- else:
- ratio = 0.00
- lable += f":{succ}/{count}={ratio}%"
- return lable, ratio
- def _label_simple(self, chname, filter, card_type=None, spec=None):
- def scard_type(card_type):
- if card_type == 1:
- _card_type = 'SY'
- elif card_type == 2:
- _card_type = 'SH'
- elif card_type == 4:
- _card_type = 'YD'
- elif card_type == 5:
- _card_type = 'LT'
- elif card_type == 6:
- _card_type = 'DX'
- elif card_type == 7:
- _card_type = 'TH'
- else:
- _card_type = ""
- return _card_type
- if card_type is None and spec is None:
- return chname
- channels = filter.channels()
- if channels is None or len(channels) > 1:
- return f'{chname}-{scard_type(card_type)}'
- else:
- return f'{scard_type(card_type)}-{spec}'
|