123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- from .DataStream import EChPosmap as pos_map, day_stamp, span_days, time_border, calc_interval
- from .ChannelReader import ChannelReader
- from collections import defaultdict
- from matplotlib.figure import Figure
- from matplotlib import ticker
- from io import BytesIO
- import numpy as np
- from .algorithm import calc_chratios
- import time as time
- import logging
- logger = logging.getLogger('painter')
- def allpathes(reader: ChannelReader, tuple_pathes: dict, days: list, spec=None):
- count = len(days)
- show_detail = True if len(list(tuple_pathes.keys())) == 1 else False
- if show_detail == False:
- all_datas = reader.init_data(count)
- else:
- all_datas = None
- for name, tup in tuple_pathes.items():
- ch_datas = reader.init_data(count)
- for _card_type, _spec in tup:
- if spec is not None and _spec != spec:
- continue
- if show_detail:
- detail_datas = reader.init_data(count)
- else:
- detail_datas = None
- for i, day in enumerate(days):
- data = reader.read(day, name, _card_type, _spec)
- if data is not None:
- column_pos = i * 86400
- view = ch_datas[:, column_pos:column_pos + 86400]
- view += data
- if show_detail:
- view = detail_datas[:, column_pos:column_pos + 86400]
- view += data
- if show_detail:
- yield name, _card_type, _spec, detail_datas
- if all_datas is not None:
- all_datas += ch_datas
- yield name, None, None, ch_datas
- if show_detail == False:
- yield 'all', None, None, all_datas
- class ChannelPainter(object):
- def __init__(self, start_time: int, end_time: int, chnames: set = None, card_types: set = None, spec: int = None):
- self._reader = ChannelReader()
- _start_time, _end_time, self._chnames, self._card_types, self._spec = start_time, end_time, chnames, card_types, spec
- start_time = self._reader.near_stamp(_start_time,True)
- end_time = self._reader.near_stamp(_end_time,False)
- stime = lambda t: time.strftime('%d-%H:%M:%S', time.localtime(t))
- logger.debug("start_time=%s end_time=%s",stime(start_time) ,stime(end_time))
- interval = calc_interval(start_time, end_time)
- start_time = time_border(interval, start_time, True)
- end_time = time_border(interval, end_time, False)
- self._days = span_days(start_time, end_time)
- self._start_time = start_time
- self._end_time = end_time
- self._interval = interval
- pass
- def _fig_funs(self):
- def create():
- fig = Figure(figsize=(19, 8))
- ax = fig.subplots()
- ax.set_title('success ratio')
- ax.set(xlabel='time', ylabel='ratio')
- return ax, fig
- def flush(ax, fig, ticks, lables):
- ax.yaxis.set_major_formatter(ticker.PercentFormatter(xmax=1, decimals=4))
- ax.set_xticks(ticks=ticks)
- ax.set_xticklabels(lables)
- fig.autofmt_xdate()
- ax.grid()
- fig.subplots_adjust(left=0.1, right=0.8, top=0.95, bottom=0.1)
- ax.legend(bbox_to_anchor=(1, 1), loc='upper left')
- buf = BytesIO()
- fig.savefig(buf, format="png")
- return buf
- return create, flush
- def paint(self):
- reader = ChannelReader()
- tuple_pathes = reader.many_tuple_path(self._days, self._chnames, self._card_types, self._spec)
- gen = allpathes(self._reader, tuple_pathes, self._days, self._spec)
- if len(self._days) == 0:
- return []
- day_stamp = self._days[0]
- fig_create, fig_flush = self._fig_funs()
- ax, fig = fig_create()
- x = np.array([d - self._start_time for d in range(self._start_time, self._end_time)])
- window = np.ones(10) / 10
- for _chname, _card_type, _spec, _data in gen:
- succ, count, y = calc_chratios(_data, pos_map, self._start_time - day_stamp, self._end_time - day_stamp)
- print(_chname)
- # y = np.convolve(y, window, 'same')
- ax.plot(x, y, ls='-', label=self._label(chname=_chname, succ=succ, count=count, card_type=_card_type, spec=_spec))
- ticks = [d - self._start_time for d in range(self._start_time, self._end_time + 1, self._interval)]
- xlables = [time.strftime('%d-%H:%M:%S', time.localtime(d + self._start_time)) for d in ticks]
- return fig_flush(ax, fig, ticks, xlables)
- def _label(self, chname, succ, count, card_type=None, spec=None):
- _card_type = None
- if card_type == 1:
- _card_type = 'SY'
- elif card_type == 2:
- _card_type = 'SH'
- elif card_type == 4:
- _card_type = 'YD'
- elif card_type == 5:
- _card_type = 'LT'
- elif card_type == 6:
- _card_type = 'DX'
- elif card_type == 7:
- _card_type = 'TH'
- lable = f"{chname}"
- if _card_type is not None:
- lable += f"-{_card_type}"
- if spec is not None:
- lable += f"-{spec}"
- if count > 0:
- ratio = round(succ * 100 / count, 2)
- else:
- ratio = 0.00
- lable += f":{ratio}%={succ}/{count}"
- return lable
|