Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Last active February 6, 2018 17:16
Show Gist options
  • Save maxfischer2781/df05633b316b52eaf208aac160031321 to your computer and use it in GitHub Desktop.
Save maxfischer2781/df05633b316b52eaf208aac160031321 to your computer and use it in GitHub Desktop.
Transform an ALICE catalogue dump to a hierarchy of whitelists
#!/usr/local/bin/python3
import argparse
import chainlet
from chainlet.concurrency import threads
import pickle
import os
import time
import datetime
CLI = argparse.ArgumentParser("Transform an ALICE catalogue dump to a hierarchy of whitelists")
CLI.add_argument(
'IN',
help='input file path',
)
CLI.add_argument(
'OUT',
help='output directory',
)
CLI.add_argument(
'--buffer',
help='lfns to pack at a time',
default=50000,
type=int,
)
@chainlet.genlet(prime=False)
def read_lfns(catalogue_path, buffer_size=500):
"""Read LFNs from a catalogue"""
buffer = []
max_bytes = os.stat(catalogue_path).st_size
start_time, records = time.time(), 0
with open(catalogue_path, 'rb') as catalogue:
for line in catalogue:
if line.startswith(b'#'):
continue
PFN, *_ = line.split(b',', 1)
lfn = tuple(PFN.split(b'/')[-3:])
buffer.append(lfn)
if len(buffer) >= buffer_size:
records += len(buffer)
yield buffer
buffer = []
report_progress(records, catalogue, max_bytes, start_time)
records += len(buffer)
yield buffer
report_progress(records, catalogue, max_bytes, start_time)
print('')
def report_progress(records, catalogue, max_bytes, start_time):
progress = catalogue.tell() / max_bytes
elapsed = time.time() - start_time
print('{:6.1%} [{:10d}r], ETA {}, total {} [{:10d}r]'.format(
progress,
records,
datetime.timedelta(seconds=round(elapsed * (1 - progress) / progress)),
datetime.timedelta(seconds=round(elapsed / progress)),
int(records / progress),
), end='\r')
@chainlet.forklet
@chainlet.funclet
def order_lfns(value):
hierarchy = {b'%02d' % base: {} for base in range(16)}
for base, mid, uuid in value:
try:
hierarchy[base][mid].append(uuid)
except KeyError:
hierarchy[base][mid] = [uuid]
return list(hierarchy.items())
@chainlet.forklet
@chainlet.funclet
def partition_lfns(value, chunks=10):
buffer = []
base, mid_uuids = value
chunk_size = round(len(mid_uuids) / chunks)
for mid, uuids in mid_uuids.items():
buffer.append((mid, uuids))
if len(buffer) >= chunk_size:
yield [base, {m: us for m, us in buffer}]
buffer = []
yield [base, {m: us for m, us in buffer}]
@chainlet.funclet
def store_lfns(value, output_directory):
base, mid_uuids = value
for mid, uuids in mid_uuids.items():
update_chunk(os.path.join(output_directory, base.decode(), mid.decode()), uuids)
def update_chunk(path, lfns):
try:
with open(path, 'rb') as chunk:
data = pickle.load(chunk)
except FileNotFoundError:
data = set()
data.union(lfns)
with open(path, 'wb') as chunk:
pickle.dump(data, chunk)
def main():
options = CLI.parse_args()
chain = read_lfns(catalogue_path=options.IN, buffer_size=options.buffer) >> threads(
order_lfns() >> partition_lfns(chunks=5) >> store_lfns(output_directory=options.OUT)
)
print(chain)
for base_dir in range(16):
os.makedirs(os.path.join(options.OUT, '%02d' % base_dir), exist_ok=True)
global threasd
for _ in chain:
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment