Last active
May 13, 2022 07:38
-
-
Save rom1504/c6182f07d77eaa299c8166264ffa69f5 to your computer and use it in GitHub Desktop.
fsspec sync
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing.pool import ThreadPool | |
import fsspec | |
from tqdm import tqdm | |
import sys | |
m = int(sys.argv[1]) | |
t = int(sys.argv[2]) | |
path_s3 = "s3://laion5b/data/laion1B-nolang" | |
path_juwels = "/p/fastdata/mmlaion/laion1B-nolang" | |
a = fsspec.get_mapper(path_s3) | |
b = fsspec.get_mapper(path_juwels) | |
fs_finished, p_finished = fsspec.core.url_to_fs(path_s3) | |
finished_files = fs_finished.ls(p_finished, detail=True) | |
finished_set = [e['name'].split("/")[-1].split("_")[0] for e in finished_files if ".json" in e['name']] | |
finished_set = set(e for e in finished_set if int(e) % t == m) | |
finished_sizes = {e['name'].split("/")[-1].split(".")[0]:e['size'] for e in finished_files if ".tar" in e['name'] and e['name'].split("/")[-1].split(".")[0] in finished_set} | |
fs_already, p_already = fsspec.core.url_to_fs(path_juwels) | |
already_sizes = {e['name'].split("/")[-1].split(".")[0]:e['size'] for e in fs_already.ls(p_already, detail=True) if ".tar" in e['name']} | |
to_copy = [] | |
for name_finished, size_finished in finished_sizes.items(): | |
if name_finished not in already_sizes or already_sizes[name_finished] != size_finished: | |
to_copy.append(name_finished) | |
to_copy = sorted(to_copy) | |
def f(e): | |
# b[e+".parquet"]=a[e+".parquet"] | |
# b[e+"_stats.json"]=a[e+"_stats.json"] | |
b[e+".tar"]=a[e+".tar"] | |
print(to_copy) | |
with ThreadPool(8) as p: | |
for _ in tqdm(p.imap_unordered(f, to_copy), total=len(to_copy)): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment