MerchantReader.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # from . import DataHandler #此时是导入文件
  2. from .DataStream import DataReadStream, day_stamp,open_hdf5
  3. from .DataStream import EMchPosmap as pos_map
  4. import numpy as np
  5. import re
  6. from collections import defaultdict
  7. __all__ = ['MerchantReader']
  8. import logging
  9. log = logging.getLogger('reader')
  10. class MerchantReader(DataReadStream):
  11. def __init__(self):
  12. file = '/var/www/html/data/stdata/merchant.hdf5'
  13. hfive = open_hdf5(file,False)
  14. super(MerchantReader,self).__init__(hfive)
  15. def __del__(self):
  16. self.close()
  17. super(MerchantReader, self).__del__()
  18. pass
  19. def tuple_path(self, day: int, mchids: set = None, card_types: set = None, spec: int = None):
  20. def parse(path):
  21. items = re.split(r'/', path)
  22. (_prifix, _version, today, mchid, card_type, spec) = items
  23. return int(mchid), int(card_type), int(spec)
  24. tuples = defaultdict(list)
  25. pathes = self.datasets(day)
  26. for path in pathes:
  27. _mchid, _card_type, _spec = parse(path)
  28. tuples[_mchid].append((_card_type, _spec))
  29. def name_filter(tuples, mchids):
  30. all = True if mchids is None or len(mchids) == 0 else False
  31. if all:
  32. return tuples
  33. else:
  34. result = defaultdict(list)
  35. for mchid, tup in tuples.items():
  36. if mchid in mchids:
  37. result[mchid] = tup
  38. return result
  39. tuples = name_filter(tuples, mchids)
  40. def typespec_filter(tuples, card_types, spec):
  41. result = defaultdict(list)
  42. for name, ls in tuples.items():
  43. for tup in ls:
  44. _card_type, _spec = tup
  45. if _card_type in card_types:
  46. if spec is None or _spec == spec:
  47. result[name].append((_card_type, _spec))
  48. return result
  49. tuples = typespec_filter(tuples,card_types,spec)
  50. return tuples
  51. def many_tuple_path(self, days: list, mchids: set = None, card_types: set = None, spec: int = None):
  52. def merge(l,r):
  53. for mchid,ls in l.items():
  54. if mchid in r:
  55. r[mchid].union(set(ls))
  56. else:
  57. r[mchid] = set(ls)
  58. return r
  59. all = defaultdict(set)
  60. for day in days:
  61. tuples = self.tuple_path(day, mchids, card_types, spec)
  62. all = merge(tuples,all)
  63. return all
  64. def init_data(self, days):
  65. dim = pos_map.dim()
  66. return np.zeros((dim, 86400 * days))
  67. def read(self, day_stamp, mchid, card_type, spec):
  68. path = f'/{self._version}/{day_stamp}/{mchid}/{card_type}/{spec}'
  69. hfive = self.file
  70. if path in hfive:
  71. return hfive[path]
  72. else:
  73. return None
  74. pass