Created
April 17, 2019 16:10
-
-
Save quark-zju/11d4678051cc64420bfd57b81dda5d0f to your computer and use it in GitHub Desktop.
A naive incremental sync script via sftp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""A naive incremental sync script via sftp | |
When do you want to use this? | |
- want a non-lazy local file system (not fuse/sshfs) | |
- sftp is configured easier to use than ssh (rsync or git) | |
This script is very simple and does not handle complex cases. Use rsync to: | |
- set up an initial mirror | |
- re-sync after network errors | |
- remove or mirror empty directories | |
""" | |
from contextlib import contextmanager | |
import errno | |
import os | |
import paramiko | |
import socket | |
import subprocess | |
import sys | |
import time | |
join = os.path.join | |
def connect(sshargs): | |
"""Connect, and get a SFTPClient""" | |
return RetryClient(sshargs) | |
class Channel(object): | |
"""Wrapper for a socket object. | |
SFTPClient requires "Channel" to have a "get_name" method | |
""" | |
def __init__(self, sock): | |
self.sock = sock | |
def get_name(self): | |
return "" | |
def send(self, *args, **kwargs): | |
return self.sock.send(*args, **kwargs) | |
def recv(self, *args, **kwargs): | |
return self.sock.recv(*args, **kwargs) | |
class RetryClient(object): | |
def __init__(self, sshargs): | |
self.sshargs = sshargs | |
self._client = None | |
def _reconnect(self): | |
s1, s2 = socket.socketpair() | |
with scopedeprint("Connecting %r" % self.sshargs): | |
p = subprocess.Popen( | |
["ssh"] + self.sshargs + ["-x", "-a", "-s", "sftp"], | |
stdin=s1.fileno(), | |
stdout=s1.fileno(), | |
) | |
self._client = paramiko.sftp_client.SFTPClient(Channel(s2)) | |
@property | |
def client(self): | |
if self._client is None: | |
self._reconnect() | |
return self._client | |
def _retrywrapper(self, funcname): | |
def wrapper(*args, **kwargs): | |
func = getattr(self.client, funcname) | |
try: | |
return func(*args, **kwargs) | |
except socket.error as ex: | |
if ex.errno == errno.EPIPE: | |
# Reconnect | |
self._reconnect() | |
return wrapper(*args, **kwargs) | |
else: | |
raise | |
return wrapper | |
def mkdirp(self, path): | |
mkdir = self._retrywrapper("mkdir") | |
try: | |
return mkdir(path) | |
except IOError as ex: | |
parent = os.path.dirname(path) | |
if ex.errno == errno.ENOENT and path != parent: | |
self.mkdirp(parent) | |
return mkdir(path) | |
raise | |
def put(self, localpath, remotepath, confirm=False): | |
put = self._retrywrapper("put") | |
try: | |
return put(localpath=localpath, remotepath=remotepath, confirm=confirm) | |
except IOError as ex: | |
if ex.errno == errno.ENOENT and "/" in remotepath: | |
self.mkdirp(os.path.dirname(remotepath)) | |
return put(localpath=localpath, remotepath=remotepath, confirm=confirm) | |
raise | |
def __getattr__(self, name): | |
return self._retrywrapper(name) | |
def eprint(msg): | |
sys.stderr.write("%s\n" % msg) | |
_scopelevel = [0] | |
@contextmanager | |
def scopedeprint(msg): | |
if _scopelevel[0] > 0: | |
sys.stderr.write(" %s ..." % msg) | |
yield | |
return | |
sys.stderr.write("%s ..." % msg) | |
sys.stderr.flush() | |
_scopelevel[0] += 1 | |
try: | |
yield | |
except Exception as ex: | |
_scopelevel[0] -= 1 | |
sys.stderr.write(" %s\n" % ex) | |
sys.stderr.flush() | |
raise | |
else: | |
_scopelevel[0] -= 1 | |
sys.stderr.write(" done\n") | |
sys.stderr.flush() | |
def find(path): | |
for root, subdirs, files in os.walk(path): | |
for name in files: | |
yield join(root, name) | |
def stat(path): | |
try: | |
return os.lstat(path).st_mtime | |
except OSError: | |
return 0 | |
def main(args): | |
if len(args) < 2 or any(s in args for s in ("--help", "-h")): | |
eprint("Usage: $0 SSH_ARGS REMOTE_PATH LOCAL_PATH\n") | |
eprint("Mirror local file changes to remote via the sftp protocol.") | |
eprint("This tool does not do the initial mirroring. Use 'rsync' for that.") | |
return 1 | |
localpath = args[-1] | |
remotepath = args[-2] | |
sshargs = args[:-2] | |
client = connect(sshargs) | |
with scopedeprint("Scanning %s" % localpath): | |
state = {p: stat(p) for p in find(localpath)} | |
eprint("Waiting for changes on %s" % localpath) | |
while True: | |
newstate = {p: stat(p) for p in find(localpath)} | |
for path, mtime in newstate.items(): | |
if mtime != state.get(path): | |
assert path.startswith(localpath + "/") | |
remote = join(remotepath, path[len(localpath) + 1 :]) | |
with scopedeprint("Uploading %s" % path): | |
client.put(localpath=path, remotepath=remote, confirm=False) | |
for path in set(state.keys()) - set(newstate.keys()): | |
assert path.startswith(localpath + "/") | |
remote = join(remotepath, path[len(localpath) + 1 :]) | |
with scopedeprint("Removing %s" % path): | |
client.unlink(remote) | |
state = newstate | |
time.sleep(0.5) | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv[1:]) or 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment