Created
February 9, 2018 13:30
-
-
Save maxfischer2781/0c7328a1f84ab7ba440936b51c21ef7f to your computer and use it in GitHub Desktop.
Compare an ALICE namespace to a catalogue whitelist
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
import os | |
import argparse | |
import pickle | |
import time | |
from typing import Set, Iterable | |
import chainlet | |
from chainlet.concurrency import threads | |
CLI = argparse.ArgumentParser('Clean an ALICE SE Namespace based on a whitelist') | |
CLI.add_argument( | |
'WHITELIST', | |
help='whitelist base path', | |
type=lambda val: bytes(val, 'utf-8'), | |
) | |
CLI.add_argument( | |
'SEBASE', | |
help='SE namespace base path', | |
type=lambda val: bytes(val, 'utf-8'), | |
) | |
CLI.add_argument( | |
'--ignore-after', | |
help='ignore any file created after this epoch date', | |
default=1512082800, | |
) | |
@chainlet.forklet | |
@chainlet.genlet(prime=False) | |
def walk_namespace(se_base_path: bytes): | |
for base_path in (('%02d' % base).encode() for base in range(16)): | |
# we only need to clean up files that exist | |
yield ( | |
os.path.join(base_path, mid_path) | |
for mid_path | |
in os.listdir(os.path.join(se_base_path, base_path)) | |
) | |
@chainlet.funclet | |
def whitelist_files(value: bytes, whitelist_path: bytes, se_base_path: bytes): | |
relative_path = value | |
try: | |
with open(os.path.join(whitelist_path, relative_path), 'rb') as whitelist_pickle: | |
whitelist = pickle.load(whitelist_pickle) # type: Set[bytes] | |
except FileNotFoundError: | |
whitelist = set() | |
try: | |
file_list = os.listdir(os.path.join(se_base_path, relative_path)) | |
except FileNotFoundError: | |
raise chainlet.StopTraversal | |
else: | |
return (os.path.join(relative_path, file_path) for file_path in file_list if file_path not in whitelist) | |
@chainlet.forklet | |
@chainlet.funclet | |
def cull_new(value: Iterable[bytes], se_base_path: bytes, ignore_after: int): | |
file_paths = value | |
for rel_path in file_paths: | |
file_path = (os.path.join(se_base_path, rel_path)) | |
file_stat = os.stat(file_path) | |
if file_stat.st_mtime < ignore_after: | |
print( | |
rel_path.decode(), '%dB' % file_stat.st_size, | |
time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime(file_stat.st_mtime)), sep=',' | |
) | |
yield rel_path | |
def main(): | |
options = CLI.parse_args() | |
se_base, whitelist, ignore_after = options.SEBASE, options.WHITELIST, options.ignore_after | |
chain = walk_namespace(se_base_path=se_base) >> threads( | |
whitelist_files(whitelist_path=whitelist, se_base_path=se_base) >> cull_new(se_base_path=se_base, ignore_after=ignore_after) | |
) | |
print(chain) | |
for _ in chain: | |
pass | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment