# from . import DataHandler #此时是导入文件 from .DataStream import DataWriteStream, day_stamp from .DataStream import EMchPosmap as pos_map import numpy as np __all__ = ['MerchantWriter'] import logging log = logging.getLogger('writer') class MerchantWriter(DataWriteStream): def write(self, method, params): if method == 'mch_submit': self._onSubmit(params) elif method == 'mch_succ': self._onSucc(params) elif method == 'mch_fail': self._onFail(params) else: pass def _onSubmit(self, params): def parse(input): return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'] mchid, time, spec, card_type, mch_amount = parse(params) path, pos = self.path_pos(mchid, time, spec, card_type) self.file[path][pos_map.submit_count, pos] += 1 self.file[path][pos_map.submit_amounts, pos] += mch_amount pass def _onSucc(self, params): def parse(input): return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount'] mchid, time, spec, card_type, mch_amount, channel_amount = parse(params) path, pos = self.path_pos(mchid, time, spec, card_type) self.file[path][pos_map.succ_count, pos] += 1 self.file[path][pos_map.succ_mch_amounts, pos] += mch_amount self.file[path][pos_map.succ_ch_amounts, pos] += channel_amount pass def _onFail(self, params): def parse(input): return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'] mchid, time, spec, card_type, mch_amount = parse(params) path, pos = self.path_pos(mchid, time, spec, card_type) self.file[path][pos_map.fail_count, pos] += 1 self.file[path][pos_map.fail_mch_amounts, pos] += mch_amount pass def path_pos(self, mchid, time, spec, card_type): today = day_stamp(time) path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}' log.debug("%s,%s",'MerchantWriter',path) hfive = self.file if path not in hfive: dim = pos_map.dim() hfive[path] = np.zeros((dim, 86400)) return path, time - today