testCommand.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import unittest
  2. from refill import queueListener, ChannelReader, ChannelWriter, open_hdf5, mktime,day_stamp
  3. from refill import MerchantReader,MerchantWriter
  4. import signal as sig
  5. import sys,getopt
  6. import logging
  7. class TestCommand(unittest.TestCase):
  8. def test_copy_channel(self):
  9. def copy(reader,writer,paths):
  10. for path in paths:
  11. data = reader.read_path(path)
  12. writer.write_set(path,data)
  13. pass
  14. latest_time = mktime('2023-03-01 00:00:00')
  15. lastest_day = day_stamp(latest_time)
  16. reader = ChannelReader()
  17. hDFive = open_hdf5('/var/www/html/data/stdata/channel_bak.hdf5', True)
  18. writer = ChannelWriter(hDFive)
  19. days = reader.days()
  20. for day in days:
  21. day = int(day)
  22. if day < lastest_day:
  23. continue
  24. paths = reader.datasets(day)
  25. copy(reader,writer,paths)
  26. print(paths)
  27. pass
  28. def test_copy_mch(self):
  29. def copy(reader,writer,paths):
  30. for path in paths:
  31. data = reader.read_path(path)
  32. writer.write_set(path,data)
  33. pass
  34. latest_time = mktime('2023-03-01 00:00:00')
  35. lastest_day = day_stamp(latest_time)
  36. reader = MerchantReader()
  37. hDFive = open_hdf5('/var/www/html/data/stdata/merchant_bak.hdf5', True)
  38. writer = MerchantWriter(hDFive)
  39. days = reader.days()
  40. for day in days:
  41. day = int(day)
  42. if day < lastest_day:
  43. continue
  44. paths = reader.datasets(day)
  45. copy(reader,writer,paths)
  46. print(paths)
  47. pass
  48. if __name__ == '__main__':
  49. unittest.main()