Created
April 17, 2024 03:16
-
-
Save ryaminal/38b31ec03eb994f9a0adfd87848eb4ad to your computer and use it in GitHub Desktop.
Non generic rsync for fsspec
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Heavily inspired from fsspec's GenericFileSystem implementation of rsync | |
import logging | |
from typing import Any, Callable | |
from fsspec.implementations.sftp import AbstractFileSystem | |
from gcsfs import GCSFileSystem | |
from rich.logging import RichHandler | |
from sshfs import SSHFileSystem | |
LOG_FORMAT = "%(message)s" | |
LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" | |
logging.basicConfig(level="INFO", format=LOG_FORMAT, handlers=[RichHandler()]) | |
log = logging.getLogger(__name__) | |
def copy_file_fs_to_fs_no_temp( | |
source_fs: AbstractFileSystem, source_path: str, dest_fs: AbstractFileSystem, dest_path: str | |
): | |
log.info(f"Copying {source_path} -> {dest_path}") | |
read_blocksize = 2**20 | |
kw = {"blocksize": 0, "cache_type": "none"} | |
with source_fs.open(source_path, "rb", **kw) as src: | |
with dest_fs.open(dest_path, "wb", **kw) as dest: | |
while True: | |
data = src.read(read_blocksize) | |
if not data: | |
break | |
dest.write(data) | |
log.info(f"Copied {source_path} -> {dest_path}") | |
# TODO: async this ish | |
def copy_sftp_to_gcs( | |
source_url: str, | |
dest_url: str, | |
delete_missing: bool = False, | |
source_diff_field: str | Callable[[dict], Any] = "size", | |
dest_diff_field: str | Callable[[dict], Any] = "size", | |
update_cond: str = "different", | |
): | |
""" | |
Copies new and changed files from SFTP server (including nested directories) to Google Cloud Storage. | |
Args: | |
sftp_url (str): URL of the SFTP server (e.g. sftp://user@host:port) | |
gcs_url (str): Name of the Google Cloud Storage bucket (e.g. gs://my-bucket/my-dir/) | |
""" | |
# Initialize file systems | |
_kwargs = SSHFileSystem._get_kwargs_from_urls(source_url) | |
source_fs = SSHFileSystem(**_kwargs) | |
dest_fs = GCSFileSystem() | |
# strip protocols from url leaving only paths e.g. sftp://user@host:port/dir1/dir2/stuff -> /dir1/dir2/stuff | |
source_path = source_fs._strip_protocol(source_url) | |
dest_path = dest_fs._strip_protocol(dest_url) | |
# Find all files in SFTP source directory with details | |
if not source_fs.isdir(source_path): | |
raise ValueError(f"Path {source_path} is not a directory") | |
# get all files and dirs for source and dest | |
source_files_and_dirs: dict[str, dict] = source_fs.find(source_path, detail=True, withdirs=True) | |
dest_files_and_dirs: dict[str, dict] = dest_fs.find(dest_path, detail=True, withdirs=True) | |
# Create directories that don't already exist in the destination | |
source_dirs_to_create = [ | |
_dir | |
for _dir, details in source_files_and_dirs.items() | |
if details["type"] == "directory" and _dir.replace(source_path, dest_path) not in dest_files_and_dirs | |
] | |
for _dir in source_dirs_to_create: | |
# with the current dest of gcs, this is a noop. bucket must exist | |
dest_fs.makedirs(_dir, exist_ok=True) | |
source_files = {_path: details for _path, details in source_files_and_dirs.items() if details["type"] == "file"} | |
for _path, details in source_files.copy().items(): # copy so we can modify the original | |
dest_file = _path.replace(source_path, dest_path) | |
if dest_file in dest_files_and_dirs: | |
match update_cond: | |
case "different": | |
src_field = ( | |
source_diff_field(details) if callable(source_diff_field) else details[source_diff_field] | |
) | |
dest_details = dest_files_and_dirs[dest_file] | |
dest_field = ( | |
dest_diff_field(dest_details) if callable(dest_diff_field) else dest_details[dest_diff_field] | |
) | |
if src_field != dest_field: | |
source_files[_path] = dest_file | |
else: | |
source_files.pop(_path) | |
case "always": | |
source_files[_path] = dest_file | |
case _: | |
# noop, maybe debug log? | |
pass | |
else: | |
source_files[_path] = dest_file | |
if source_files: | |
log.debug(f"Copying {len(source_files)} files from {source_url} to {dest_url}") | |
for src, dst in source_files.items(): | |
copy_file_fs_to_fs_no_temp(source_fs, src, dest_fs, dst) | |
if delete_missing: | |
# TODO: handle deleting files in destination? | |
raise NotImplementedError("delete_missing not yet implemented") | |
if __name__ == "__main__": | |
source_url = "sftp://username@host:12345/stuff" | |
dest_url = "gs://bucket/dir1/dir2" | |
copy_sftp_to_gcs(source_url, dest_url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment