Skip to content

Instantly share code, notes, and snippets.

@rom1504
Last active May 13, 2022 07:38
Show Gist options
  • Save rom1504/c6182f07d77eaa299c8166264ffa69f5 to your computer and use it in GitHub Desktop.
Save rom1504/c6182f07d77eaa299c8166264ffa69f5 to your computer and use it in GitHub Desktop.
fsspec sync
from multiprocessing.pool import ThreadPool
import fsspec
from tqdm import tqdm
import sys
m = int(sys.argv[1])
t = int(sys.argv[2])
path_s3 = "s3://laion5b/data/laion1B-nolang"
path_juwels = "/p/fastdata/mmlaion/laion1B-nolang"
a = fsspec.get_mapper(path_s3)
b = fsspec.get_mapper(path_juwels)
fs_finished, p_finished = fsspec.core.url_to_fs(path_s3)
finished_files = fs_finished.ls(p_finished, detail=True)
finished_set = [e['name'].split("/")[-1].split("_")[0] for e in finished_files if ".json" in e['name']]
finished_set = set(e for e in finished_set if int(e) % t == m)
finished_sizes = {e['name'].split("/")[-1].split(".")[0]:e['size'] for e in finished_files if ".tar" in e['name'] and e['name'].split("/")[-1].split(".")[0] in finished_set}
fs_already, p_already = fsspec.core.url_to_fs(path_juwels)
already_sizes = {e['name'].split("/")[-1].split(".")[0]:e['size'] for e in fs_already.ls(p_already, detail=True) if ".tar" in e['name']}
to_copy = []
for name_finished, size_finished in finished_sizes.items():
if name_finished not in already_sizes or already_sizes[name_finished] != size_finished:
to_copy.append(name_finished)
to_copy = sorted(to_copy)
def f(e):
# b[e+".parquet"]=a[e+".parquet"]
# b[e+"_stats.json"]=a[e+"_stats.json"]
b[e+".tar"]=a[e+".tar"]
print(to_copy)
with ThreadPool(8) as p:
for _ in tqdm(p.imap_unordered(f, to_copy), total=len(to_copy)):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment