# from . import DataHandler #此时是导入文件 from .DataStream import DataWriteStream, day_stamp from .DataStream import EChPosmap as pos_map import numpy as np __all__ = ['ChannelWriter'] import logging log = logging.getLogger('writer') class ChannelWriter(DataWriteStream): def write(self, method, params): flush = True if method == 'ch_commit': self._onCommit(params) elif method == 'ch_succ': self._onSucc(params) elif method == 'ch_fail': self._onFail(params) else: flush = False if flush: hfive = self.file hfive.flush() pass def _onCommit(self, params): def parse(input): return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'] chname, time, spec, card_type, channel_amount = parse(params) dset, pos = self.path_pos(chname, time, spec, card_type) rows = [pos_map.commit_count, pos_map.commit_amounts] vals = [1, channel_amount] dset[rows, pos] += vals # dset[pos_map.commit_count, pos] += 1 # dset[pos_map.commit_amounts, pos] += channel_amount pass def _onSucc(self, params): def parse(input): return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period'] chname, time, spec, card_type, channel_amount, period = parse(params) dset, pos = self.path_pos(chname, time, spec, card_type) rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods] vals = [1, channel_amount, period] dset[rows, pos] += vals # dset[pos_map.succ_count, pos] += 1 # dset[pos_map.succ_amounts, pos] += channel_amount # dset[pos_map.succ_periods, pos] += period pass def _onFail(self, params): def parse(input): return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period'] chname, time, spec, card_type, channel_amount, period = parse(params) dset, pos = self.path_pos(chname, time, spec, card_type) rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods] vals = [1, channel_amount, period] dset[rows, pos] += vals # dset[pos_map.fail_count, pos] += 1 # dset[pos_map.fail_amounts, pos] += channel_amount # dset[pos_map.fail_periods, pos] += period pass def path_pos(self, chname, time, spec, card_type): today = day_stamp(time) path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}' log.debug("%s,%s", 'ChannelWriter', path) hfive = self.file if path not in hfive: dim = pos_map.dim() dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 3600)) dset[:, :] = np.zeros((dim, 86400)) hfive.flush() else: dset = hfive[path] return dset, time - today