Last active
February 6, 2018 17:16
-
-
Save maxfischer2781/df05633b316b52eaf208aac160031321 to your computer and use it in GitHub Desktop.
Transform an ALICE catalogue dump to a hierarchy of whitelists
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
import argparse | |
import chainlet | |
from chainlet.concurrency import threads | |
import pickle | |
import os | |
import time | |
import datetime | |
CLI = argparse.ArgumentParser("Transform an ALICE catalogue dump to a hierarchy of whitelists") | |
CLI.add_argument( | |
'IN', | |
help='input file path', | |
) | |
CLI.add_argument( | |
'OUT', | |
help='output directory', | |
) | |
CLI.add_argument( | |
'--buffer', | |
help='lfns to pack at a time', | |
default=50000, | |
type=int, | |
) | |
@chainlet.genlet(prime=False) | |
def read_lfns(catalogue_path, buffer_size=500): | |
"""Read LFNs from a catalogue""" | |
buffer = [] | |
max_bytes = os.stat(catalogue_path).st_size | |
start_time, records = time.time(), 0 | |
with open(catalogue_path, 'rb') as catalogue: | |
for line in catalogue: | |
if line.startswith(b'#'): | |
continue | |
PFN, *_ = line.split(b',', 1) | |
lfn = tuple(PFN.split(b'/')[-3:]) | |
buffer.append(lfn) | |
if len(buffer) >= buffer_size: | |
records += len(buffer) | |
yield buffer | |
buffer = [] | |
report_progress(records, catalogue, max_bytes, start_time) | |
records += len(buffer) | |
yield buffer | |
report_progress(records, catalogue, max_bytes, start_time) | |
print('') | |
def report_progress(records, catalogue, max_bytes, start_time): | |
progress = catalogue.tell() / max_bytes | |
elapsed = time.time() - start_time | |
print('{:6.1%} [{:10d}r], ETA {}, total {} [{:10d}r]'.format( | |
progress, | |
records, | |
datetime.timedelta(seconds=round(elapsed * (1 - progress) / progress)), | |
datetime.timedelta(seconds=round(elapsed / progress)), | |
int(records / progress), | |
), end='\r') | |
@chainlet.forklet | |
@chainlet.funclet | |
def order_lfns(value): | |
hierarchy = {b'%02d' % base: {} for base in range(16)} | |
for base, mid, uuid in value: | |
try: | |
hierarchy[base][mid].append(uuid) | |
except KeyError: | |
hierarchy[base][mid] = [uuid] | |
return list(hierarchy.items()) | |
@chainlet.forklet | |
@chainlet.funclet | |
def partition_lfns(value, chunks=10): | |
buffer = [] | |
base, mid_uuids = value | |
chunk_size = round(len(mid_uuids) / chunks) | |
for mid, uuids in mid_uuids.items(): | |
buffer.append((mid, uuids)) | |
if len(buffer) >= chunk_size: | |
yield [base, {m: us for m, us in buffer}] | |
buffer = [] | |
yield [base, {m: us for m, us in buffer}] | |
@chainlet.funclet | |
def store_lfns(value, output_directory): | |
base, mid_uuids = value | |
for mid, uuids in mid_uuids.items(): | |
update_chunk(os.path.join(output_directory, base.decode(), mid.decode()), uuids) | |
def update_chunk(path, lfns): | |
try: | |
with open(path, 'rb') as chunk: | |
data = pickle.load(chunk) | |
except FileNotFoundError: | |
data = set() | |
data.union(lfns) | |
with open(path, 'wb') as chunk: | |
pickle.dump(data, chunk) | |
def main(): | |
options = CLI.parse_args() | |
chain = read_lfns(catalogue_path=options.IN, buffer_size=options.buffer) >> threads( | |
order_lfns() >> partition_lfns(chunks=5) >> store_lfns(output_directory=options.OUT) | |
) | |
print(chain) | |
for base_dir in range(16): | |
os.makedirs(os.path.join(options.OUT, '%02d' % base_dir), exist_ok=True) | |
global threasd | |
for _ in chain: | |
pass | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment