123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- # from . import DataHandler #此时是导入文件
- from .DataStream import DataWriteStream, day_stamp
- from .DataStream import EChPosmap as pos_map
- import numpy as np
- __all__ = ['ChannelWriter']
- import logging
- log = logging.getLogger('writer')
- class ChannelWriter(DataWriteStream):
- def __init__(self, hfive):
- DataWriteStream.__init__(self, hfive)
- dim = pos_map.dim()
- self._cache = np.zeros((dim, 86400))
- def write(self, method, params):
- flush = True
- if method == 'ch_commit':
- self._onCommit(params)
- elif method == 'ch_succ':
- self._onSucc(params)
- elif method == 'ch_fail':
- self._onFail(params)
- else:
- flush = False
- if flush:
- hfive = self.file
- hfive.flush()
- pass
- def write_batch(self, batches):
- for path, items in batches.items():
- self._write_batch(path, items)
- def _write_batch(self, path, messages):
- def _commit(input):
- return input['time'], input['channel_amount']
- def _succ(input):
- return input['time'], input['channel_amount'], input['period']
- def _fail(input):
- return input['time'], input['channel_amount'], input['period']
- def _pos(time):
- today = day_stamp(time)
- return time - today
- dset = self._data_set(path=path)
- self._cache[:, :] = dset
- for method, params in messages:
- if method == 'ch_commit':
- time, channel_amount = _commit(params)
- pos = _pos(time)
- rows = [pos_map.commit_count, pos_map.commit_amounts]
- self._cache[rows, pos] += [1, channel_amount]
- elif method == 'ch_succ':
- time, channel_amount, period = _succ(params)
- pos = _pos(time)
- rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
- self._cache[rows, pos] += [1, channel_amount, period]
- elif method == 'ch_fail':
- time, channel_amount, period = _fail(params)
- pos = _pos(time)
- rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
- self._cache[rows, pos] += [1, channel_amount, period]
- dset[:, :] = self._cache
- hfive = self.file
- hfive.flush()
- def _onCommit(self, params):
- def parse(input):
- return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount']
- chname, time, spec, card_type, channel_amount = parse(params)
- dset, pos = self.path_pos(chname, time, spec, card_type)
- rows = [pos_map.commit_count, pos_map.commit_amounts]
- vals = [1, channel_amount]
- dset[rows, pos] += vals
- pass
- def _onSucc(self, params):
- def parse(input):
- return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
- chname, time, spec, card_type, channel_amount, period = parse(params)
- dset, pos = self.path_pos(chname, time, spec, card_type)
- rows = [pos_map.succ_count, pos_map.succ_amounts, pos_map.succ_periods]
- vals = [1, channel_amount, period]
- dset[rows, pos] += vals
- pass
- def _onFail(self, params):
- def parse(input):
- return input['channel_name'], input['time'], input['spec'], input['card_type'], input['channel_amount'], input['period']
- chname, time, spec, card_type, channel_amount, period = parse(params)
- dset, pos = self.path_pos(chname, time, spec, card_type)
- rows = [pos_map.fail_count, pos_map.fail_amounts, pos_map.fail_periods]
- vals = [1, channel_amount, period]
- dset[rows, pos] += vals
- pass
- def get_path(self, method, params):
- def _parse(input):
- return input['channel_name'], input['time'], input['spec'], input['card_type']
- def _path(input):
- chname, time, spec, card_type = _parse(input)
- today = day_stamp(time)
- path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
- return path
- if method in ['ch_commit','ch_succ','ch_fail']:
- path = _path(params)
- return path
- else:
- return None
- def path_pos(self, chname, time, spec, card_type):
- today = day_stamp(time)
- path = f'/{self._version}/{today}/{chname}/{card_type}/{spec}'
- # log.debug("%s,%s", 'ChannelWriter', path)
- dset = self._data_set(path=path)
- return dset, time - today
- def _data_set(self, path):
- hfive = self.file
- if path not in hfive:
- dim = pos_map.dim()
- dset = hfive.create_dataset(path, (dim, 86400), chunks=(dim, 900))
- dset[:, :] = np.zeros((dim, 86400))
- hfive.flush()
- else:
- dset = hfive[path]
- return dset
|