Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Created February 9, 2018 13:30
Show Gist options
  • Save maxfischer2781/0c7328a1f84ab7ba440936b51c21ef7f to your computer and use it in GitHub Desktop.
Save maxfischer2781/0c7328a1f84ab7ba440936b51c21ef7f to your computer and use it in GitHub Desktop.
Compare an ALICE namespace to a catalogue whitelist
#!/usr/local/bin/python3
import os
import argparse
import pickle
import time
from typing import Set, Iterable
import chainlet
from chainlet.concurrency import threads
CLI = argparse.ArgumentParser('Clean an ALICE SE Namespace based on a whitelist')
CLI.add_argument(
'WHITELIST',
help='whitelist base path',
type=lambda val: bytes(val, 'utf-8'),
)
CLI.add_argument(
'SEBASE',
help='SE namespace base path',
type=lambda val: bytes(val, 'utf-8'),
)
CLI.add_argument(
'--ignore-after',
help='ignore any file created after this epoch date',
default=1512082800,
)
@chainlet.forklet
@chainlet.genlet(prime=False)
def walk_namespace(se_base_path: bytes):
for base_path in (('%02d' % base).encode() for base in range(16)):
# we only need to clean up files that exist
yield (
os.path.join(base_path, mid_path)
for mid_path
in os.listdir(os.path.join(se_base_path, base_path))
)
@chainlet.funclet
def whitelist_files(value: bytes, whitelist_path: bytes, se_base_path: bytes):
relative_path = value
try:
with open(os.path.join(whitelist_path, relative_path), 'rb') as whitelist_pickle:
whitelist = pickle.load(whitelist_pickle) # type: Set[bytes]
except FileNotFoundError:
whitelist = set()
try:
file_list = os.listdir(os.path.join(se_base_path, relative_path))
except FileNotFoundError:
raise chainlet.StopTraversal
else:
return (os.path.join(relative_path, file_path) for file_path in file_list if file_path not in whitelist)
@chainlet.forklet
@chainlet.funclet
def cull_new(value: Iterable[bytes], se_base_path: bytes, ignore_after: int):
file_paths = value
for rel_path in file_paths:
file_path = (os.path.join(se_base_path, rel_path))
file_stat = os.stat(file_path)
if file_stat.st_mtime < ignore_after:
print(
rel_path.decode(), '%dB' % file_stat.st_size,
time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime(file_stat.st_mtime)), sep=','
)
yield rel_path
def main():
options = CLI.parse_args()
se_base, whitelist, ignore_after = options.SEBASE, options.WHITELIST, options.ignore_after
chain = walk_namespace(se_base_path=se_base) >> threads(
whitelist_files(whitelist_path=whitelist, se_base_path=se_base) >> cull_new(se_base_path=se_base, ignore_after=ignore_after)
)
print(chain)
for _ in chain:
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment