-
-
Save rogerhub/48cf58ee6dd55fc4d51f to your computer and use it in GitHub Desktop.
A simple sync manager (depends on rsync, fswatch, python)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
import json | |
import logging | |
import multiprocessing | |
import os | |
import signal | |
import socket | |
import struct | |
import subprocess | |
import sys | |
import threading | |
def send_payload(connection, payload): | |
payload_bytes = json.dumps(payload) | |
length = len(payload_bytes) | |
connection.send(struct.pack("!L", length)) | |
connection.send(payload_bytes) | |
def receive_payload(connection): | |
length_bytes = connection.recv(4) | |
assert len(length_bytes) == 4 | |
length, = struct.unpack("!L", length_bytes) | |
payload_bytes = '' | |
while len(payload_bytes) < length: | |
payload_bytes_read = connection.recv(length - len(payload_bytes)) | |
if not payload_bytes_read: | |
return | |
payload_bytes += payload_bytes_read | |
return json.loads(payload_bytes) | |
class SmanDaemon(object): | |
def __init__(self): | |
self.socket = socket.socket(socket.AF_UNIX) | |
daemon_path = self.get_daemon_path() | |
if os.path.exists(daemon_path): | |
os.unlink(daemon_path) | |
self.socket.bind(self.get_daemon_path()) | |
self.socket.listen(128) | |
self.jobs = {} | |
self.threads = [] | |
self.lock = threading.RLock() | |
def _get_next_job_number(self): | |
with self.lock: | |
for n in range(1, 256): | |
if n not in self.jobs: | |
return n | |
@staticmethod | |
def get_daemon_path(): | |
return os.path.expanduser("~/.smand.sock") | |
@staticmethod | |
def get_log_path(): | |
return os.path.expanduser("~/.smand.log") | |
def handle_connection(self, connection): | |
payload = receive_payload(connection) | |
assert isinstance(payload, dict) | |
assert "command" in payload | |
if payload["command"] == "list": | |
with self.lock: | |
list_bytes = "" | |
if not self.jobs: | |
list_bytes += "No jobs\n" | |
for job_number in sorted(self.jobs.keys()): | |
job = self.jobs[job_number] | |
list_bytes += "%d %s -> %s (%s)\n" % (job_number, job.source, job.destination, job.cwd) | |
send_payload(connection, {"result": list_bytes}) | |
elif payload["command"] == "remove": | |
assert "job_number" in payload | |
job_number = payload["job_number"] | |
info_bytes = "" | |
with self.lock: | |
assert job_number in self.jobs | |
job = self.jobs[job_number] | |
job.kill() | |
info_bytes += "%s -> %s\n" % (job.source, job.destination) | |
del self.jobs[job_number] | |
send_payload(connection, {"result": "Removed %s" % info_bytes}) | |
elif payload["command"] == "kill": | |
logging.warn("Exiting daemon because of kill message") | |
for job in self.jobs.values(): | |
job.kill() | |
for thread in self.threads: | |
thread.join(2) | |
send_payload(connection, {"result": ""}) | |
os._exit(0) | |
elif payload["command"] == "add": | |
assert "source" in payload | |
assert "destination" in payload | |
assert "cwd" in payload | |
with self.lock: | |
job = self.SyncLoopWorker(payload["source"], payload["destination"], payload["cwd"]) | |
job_number = self._get_next_job_number() | |
self.jobs[job_number] = job | |
sync_thread = threading.Thread(target=job.sync_loop) | |
sync_thread.start() | |
self.threads.append(sync_thread) | |
send_payload(connection, {"result": ""}) | |
else: | |
send_payload(connection, {"result": "No such command.\n"}) | |
class SyncLoopWorker(object): | |
def __init__(self, source, destination, cwd): | |
self.source = source | |
self.destination = destination | |
self.cwd = cwd | |
self.killing = False | |
self.process = None | |
def kill(self): | |
self.killing = True | |
if self.process: | |
try: | |
self.process.kill() | |
except OSError: | |
pass | |
self.process = None | |
def get_full_path(self): | |
return os.path.join(self.cwd, self.source) | |
def sync_loop(self): | |
while True: | |
if self.killing or not os.path.exists(self.get_full_path()): | |
break | |
self.process = subprocess.Popen(["fswatch", "-m", "fsevents_monitor", "-1", "-r", "-v", | |
self.source], cwd=self.cwd, close_fds=True) | |
self.process.wait() | |
if self.killing or not os.path.exists(self.get_full_path()): | |
break | |
self.process = subprocess.Popen(["rsync", "-vazh", "--checksum", "--progress", | |
"--exclude=.git", "--exclude=.gitignore", | |
"--exclude=.DS_Store", self.source, | |
self.destination], cwd=self.cwd, close_fds=True) | |
self.process.wait() | |
logging.warn("Exiting sync loop for %s, because source directory %s disappeared" % | |
(repr(self.destination), repr(self.get_full_path()))) | |
def run(self): | |
log_file = open(self.get_log_path(), "a+", 0) | |
os.close(0) | |
os.dup2(log_file.fileno(), 1) | |
os.dup2(log_file.fileno(), 2) | |
log_file.close() | |
while True: | |
connection, _ = self.socket.accept() | |
handler_thread = threading.Thread(target=self.handle_connection, args=[connection]) | |
handler_thread.start() | |
class SmanClient(object): | |
def _try_connect(self): | |
connection = socket.socket(socket.AF_UNIX) | |
connection.connect(SmanDaemon.get_daemon_path()) | |
return connection | |
def _spawn(self, mark_when_done=None): | |
daemon = SmanDaemon() | |
if mark_when_done: | |
mark_when_done.set() | |
try: | |
daemon.run() | |
except Exception: | |
logging.error("Error while running SmanDaemon") | |
finally: | |
os._exit(0) | |
def connect(self): | |
try: | |
return self._try_connect() | |
except socket.error: | |
done_event = multiprocessing.Event() | |
pid = os.fork() | |
if pid: | |
done_event.wait() | |
return self._try_connect() | |
else: | |
self._spawn(done_event) | |
@staticmethod | |
def log(message): | |
sys.stdout.write(message) | |
if __name__ == "__main__": | |
client = SmanClient() | |
if len(sys.argv) < 2: | |
client.log("Usage: sman [list|remove|kill]\n" + | |
" sman SOURCE DESTINATION\n") | |
sys.exit(1) | |
command = sys.argv[1] | |
command_aliases = {"ls": "list", | |
"jobs": "list", | |
"rm": "remove", | |
"delete": "remove", | |
"log": "logs"} | |
if command in command_aliases: | |
command = command_aliases[command] | |
if command == "logs": | |
args = ["tail", "-n", "80", "-f", SmanDaemon.get_log_path()] | |
os.execvp(args[0], args) | |
elif command in ("list", "kill"): | |
# No-argument commands | |
payload = {"command": command} | |
elif command == "remove": | |
if len(sys.argv) != 3: | |
client.log("Invalid usage") | |
sys.exit(1) | |
payload = {"command": command, | |
"job_number": int(sys.argv[2])} | |
else: | |
if len(sys.argv) == 3: | |
source = sys.argv[1] | |
destination = sys.argv[2] | |
else: | |
client.log("Invalid usage\n") | |
sys.exit(1) | |
if not os.path.exists(source): | |
client.log("No such file or directory: %s\n" % source) | |
sys.exit(1) | |
payload = {"command": "add", | |
"source": source, | |
"destination": destination, | |
"cwd": os.getcwd()} | |
connection = client.connect() | |
send_payload(connection, payload) | |
response = receive_payload(connection) | |
assert isinstance(response, dict) | |
assert "result" in response | |
client.log(response["result"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment