# from . import DataHandler #此时是导入文件 from .DataStream import DataWriteStream, day_stamp from .DataStream import EMchPosmap as pos_map import numpy as np __all__ = ['MerchantWriter'] import logging log = logging.getLogger('writer') class MerchantWriter(DataWriteStream): def __init__(self, hfive): DataWriteStream.__init__(self, hfive) dim = pos_map.dim() self._cache = np.zeros((dim, 86400)) def write(self, method, params): flush = True if method == 'mch_submit': self._onSubmit(params) elif method == 'mch_succ': self._onSucc(params) elif method == 'mch_fail': self._onFail(params) else: flush = False if flush: hfive = self.file hfive.flush() def write_batch(self, batches): for path, items in batches.items(): self._write_batch(path, items) def _write_batch(self, path, messages): def _submit(params): return params['time'], params['mch_amount'] def _succ(params): return params['time'], params['mch_amount'], params['channel_amount'] def _fail(params): return params['time'], params['mch_amount'] def _pos(time): today = day_stamp(time) return time - today dset = self._data_set(path=path) self._cache[:, :] = dset for method, params in messages: if method == 'mch_submit': time, mch_amount = _submit(params) pos = _pos(time) rows = [pos_map.submit_count, pos_map.submit_amounts] self._cache[rows, pos] += [1, mch_amount] elif method == 'mch_succ': time, mch_amount, channel_amount = _succ(params) pos = _pos(time) rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts] self._cache[rows, pos] += [1, mch_amount, channel_amount] elif method == 'mch_fail': time, mch_amount = _fail(params) pos = _pos(time) rows = [pos_map.fail_count, pos_map.fail_mch_amounts] self._cache[rows, pos] += [1, mch_amount] dset[:, :] = self._cache hfive = self.file hfive.flush() def _onSubmit(self, params): def parse(params): return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'] mchid, time, spec, card_type, mch_amount = parse(params) dset, pos = self.path_pos(mchid, time, spec, card_type) rows = [pos_map.submit_count, pos_map.submit_amounts] vals = [1, mch_amount] dset[rows, pos] += vals pass def _onSucc(self, params): def parse(params): return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount'] mchid, time, spec, card_type, mch_amount, channel_amount = parse(params) dset, pos = self.path_pos(mchid, time, spec, card_type) rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts] vals = [1, mch_amount, channel_amount] dset[rows, pos] += vals pass def _onFail(self, params): def parse(params): return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'] mchid, time, spec, card_type, mch_amount = parse(params) dset, pos = self.path_pos(mchid, time, spec, card_type) rows = [pos_map.fail_count, pos_map.fail_mch_amounts] vals = [1, mch_amount] dset[rows, pos] += vals pass def get_path(self, method, params): def _parse(input): return input['mchid'], input['time'], input['spec'], input['card_type'] def _path(input): mchid, time, spec, card_type = _parse(input) today = day_stamp(time) path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}' return path if method in ['mch_submit','mch_succ', 'mch_fail']: path = _path(params) return path else: return None def path_pos(self, mchid, time, spec, card_type): today = day_stamp(time) path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}' log.debug("%s,%s", 'MerchantWriter', path) dset = self._data_set(path=path) return dset, time - today def _data_set(self, path): hfive = self.file if path not in hfive: dim = pos_map.dim() dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 900)) dset[:, :] = np.zeros((dim, 86400)) hfive.flush() else: dset = hfive[path] return dset