123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # from . import DataHandler #此时是导入文件
- from .DataStream import DataWriteStream, day_stamp
- from .DataStream import EMchPosmap as pos_map
- import numpy as np
- __all__ = ['MerchantWriter']
- import logging
- log = logging.getLogger('writer')
- class MerchantWriter(DataWriteStream):
- def write(self, method, params):
- flush = True
- if method == 'mch_submit':
- self._onSubmit(params)
- elif method == 'mch_succ':
- self._onSucc(params)
- elif method == 'mch_fail':
- self._onFail(params)
- else:
- flush = False
- # if flush:
- # hfive = self.file
- # hfive.flush()
- def _onSubmit(self, params):
- def parse(input):
- return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
- mchid, time, spec, card_type, mch_amount = parse(params)
- dset, pos = self.path_pos(mchid, time, spec, card_type)
- rows = [pos_map.submit_count, pos_map.submit_amounts]
- vals = [1, mch_amount]
- # dset[pos,rows] += vals
- dset[rows,pos] += vals
- # dset[pos_map.submit_count, pos] += 1
- # dset[pos_map.submit_amounts, pos] += mch_amount
- pass
- def _onSucc(self, params):
- def parse(input):
- return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount'], params['channel_amount']
- mchid, time, spec, card_type, mch_amount, channel_amount = parse(params)
- dset, pos = self.path_pos(mchid, time, spec, card_type)
- rows = [pos_map.succ_count, pos_map.succ_mch_amounts, pos_map.succ_ch_amounts]
- vals = [1, mch_amount, channel_amount]
- dset[rows, pos] += vals
- # dset[pos_map.succ_count, pos] += 1
- # dset[pos_map.succ_mch_amounts, pos] += mch_amount
- # dset[pos_map.succ_ch_amounts, pos] += channel_amount
- pass
- def _onFail(self, params):
- def parse(input):
- return params['mchid'], params['time'], params['spec'], params['card_type'], params['mch_amount']
- mchid, time, spec, card_type, mch_amount = parse(params)
- dset, pos = self.path_pos(mchid, time, spec, card_type)
- rows = [pos_map.fail_count, pos_map.fail_mch_amounts]
- vals = [1, mch_amount]
- dset[pos,rows] += vals
- # dset[pos_map.fail_count, pos] += 1
- # dset[pos_map.fail_mch_amounts, pos] += mch_amount
- pass
- def path_pos(self, mchid, time, spec, card_type):
- today = day_stamp(time)
- path = f'/{self._version}/{today}/{mchid}/{card_type}/{spec}'
- log.debug("%s,%s",'MerchantWriter',path)
- hfive = self.file
- if path not in hfive:
- dim = pos_map.dim()
- # dset = hfive.create_dataset(path, (86400,dim), chunks=(3600,dim))
- # dset[:, :] = np.zeros((86400,dim))
- dset = hfive.create_dataset(path, (dim,86400), chunks=(dim,3600))
- dset[:, :] = np.zeros((dim,86400))
- hfive.flush()
- else:
- dset = hfive[path]
- return dset, time - today
|