Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/local/bin/python3
- import os
- import argparse
- import pickle
- import time
- from typing import Set, Iterable
- import chainlet
- from chainlet.concurrency import threads
- CLI = argparse.ArgumentParser('Clean an ALICE SE Namespace based on a whitelist')
- CLI.add_argument(
- 'WHITELIST',
- help='whitelist base path',
- type=lambda val: bytes(val, 'utf-8'),
- )
- CLI.add_argument(
- 'SEBASE',
- help='SE namespace base path',
- type=lambda val: bytes(val, 'utf-8'),
- )
- CLI.add_argument(
- '--ignore-after',
- help='ignore any file created after this epoch date',
- default=1512082800,
- )
- @chainlet.forklet
- @chainlet.genlet(prime=False)
- def walk_namespace(se_base_path: bytes):
- for base_path in (('%02d' % base).encode() for base in range(16)):
- # we only need to clean up files that exist
- yield (
- os.path.join(base_path, mid_path)
- for mid_path
- in os.listdir(os.path.join(se_base_path, base_path))
- )
- @chainlet.funclet
- def whitelist_files(value: bytes, whitelist_path: bytes, se_base_path: bytes):
- relative_path = value
- try:
- with open(os.path.join(whitelist_path, relative_path), 'rb') as whitelist_pickle:
- whitelist = pickle.load(whitelist_pickle) # type: Set[bytes]
- except FileNotFoundError:
- whitelist = set()
- try:
- file_list = os.listdir(os.path.join(se_base_path, relative_path))
- except FileNotFoundError:
- raise chainlet.StopTraversal
- else:
- return (os.path.join(relative_path, file_path) for file_path in file_list if file_path not in whitelist)
- @chainlet.forklet
- @chainlet.funclet
- def cull_new(value: Iterable[bytes], se_base_path: bytes, ignore_after: int):
- file_paths = value
- for rel_path in file_paths:
- file_path = (os.path.join(se_base_path, rel_path))
- file_stat = os.stat(file_path)
- if file_stat.st_mtime < ignore_after:
- print(
- rel_path.decode(), '%dB' % file_stat.st_size,
- time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime(file_stat.st_mtime)), sep=','
- )
- yield rel_path
- def main():
- options = CLI.parse_args()
- se_base, whitelist, ignore_after = options.SEBASE, options.WHITELIST, options.ignore_after
- chain = walk_namespace(se_base_path=se_base) >> threads(
- whitelist_files(whitelist_path=whitelist, se_base_path=se_base) >> cull_new(se_base_path=se_base, ignore_after=ignore_after)
- )
- print(chain)
- for _ in chain:
- pass
- if __name__ == '__main__':
- main()
Add Comment
Please, Sign In to add comment