1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- import unittest
- from refill import queueListener, ChannelReader, ChannelWriter, open_hdf5, mktime,day_stamp
- from refill import MerchantReader,MerchantWriter
- import signal as sig
- import sys,getopt
- import logging
- class TestCommand(unittest.TestCase):
- def test_copy_channel(self):
- def copy(reader,writer,paths):
- for path in paths:
- data = reader.read_path(path)
- writer.write_set(path,data)
- pass
- latest_time = mktime('2023-03-01 00:00:00')
- lastest_day = day_stamp(latest_time)
- reader = ChannelReader()
- hDFive = open_hdf5('/var/www/html/data/stdata/channel_bak.hdf5', True)
- writer = ChannelWriter(hDFive)
- days = reader.days()
- for day in days:
- day = int(day)
- if day < lastest_day:
- continue
- paths = reader.datasets(day)
- copy(reader,writer,paths)
- print(paths)
- pass
- def test_copy_mch(self):
- def copy(reader,writer,paths):
- for path in paths:
- data = reader.read_path(path)
- writer.write_set(path,data)
- pass
- latest_time = mktime('2023-03-01 00:00:00')
- lastest_day = day_stamp(latest_time)
- reader = MerchantReader()
- hDFive = open_hdf5('/var/www/html/data/stdata/merchant_bak.hdf5', True)
- writer = MerchantWriter(hDFive)
- days = reader.days()
- for day in days:
- day = int(day)
- if day < lastest_day:
- continue
- paths = reader.datasets(day)
- copy(reader,writer,paths)
- print(paths)
- pass
- if __name__ == '__main__':
- unittest.main()
|