Skip to content

Instantly share code, notes, and snippets.

@rogerhub
Last active October 29, 2015 01:29
Show Gist options
  • Save rogerhub/48cf58ee6dd55fc4d51f to your computer and use it in GitHub Desktop.
Save rogerhub/48cf58ee6dd55fc4d51f to your computer and use it in GitHub Desktop.
A simple sync manager (depends on rsync, fswatch, python)
#!/usr/bin/env python2.7
import json
import logging
import multiprocessing
import os
import signal
import socket
import struct
import subprocess
import sys
import threading
def send_payload(connection, payload):
payload_bytes = json.dumps(payload)
length = len(payload_bytes)
connection.send(struct.pack("!L", length))
connection.send(payload_bytes)
def receive_payload(connection):
length_bytes = connection.recv(4)
assert len(length_bytes) == 4
length, = struct.unpack("!L", length_bytes)
payload_bytes = ''
while len(payload_bytes) < length:
payload_bytes_read = connection.recv(length - len(payload_bytes))
if not payload_bytes_read:
return
payload_bytes += payload_bytes_read
return json.loads(payload_bytes)
class SmanDaemon(object):
def __init__(self):
self.socket = socket.socket(socket.AF_UNIX)
daemon_path = self.get_daemon_path()
if os.path.exists(daemon_path):
os.unlink(daemon_path)
self.socket.bind(self.get_daemon_path())
self.socket.listen(128)
self.jobs = {}
self.threads = []
self.lock = threading.RLock()
def _get_next_job_number(self):
with self.lock:
for n in range(1, 256):
if n not in self.jobs:
return n
@staticmethod
def get_daemon_path():
return os.path.expanduser("~/.smand.sock")
@staticmethod
def get_log_path():
return os.path.expanduser("~/.smand.log")
def handle_connection(self, connection):
payload = receive_payload(connection)
assert isinstance(payload, dict)
assert "command" in payload
if payload["command"] == "list":
with self.lock:
list_bytes = ""
if not self.jobs:
list_bytes += "No jobs\n"
for job_number in sorted(self.jobs.keys()):
job = self.jobs[job_number]
list_bytes += "%d %s -> %s (%s)\n" % (job_number, job.source, job.destination, job.cwd)
send_payload(connection, {"result": list_bytes})
elif payload["command"] == "remove":
assert "job_number" in payload
job_number = payload["job_number"]
info_bytes = ""
with self.lock:
assert job_number in self.jobs
job = self.jobs[job_number]
job.kill()
info_bytes += "%s -> %s\n" % (job.source, job.destination)
del self.jobs[job_number]
send_payload(connection, {"result": "Removed %s" % info_bytes})
elif payload["command"] == "kill":
logging.warn("Exiting daemon because of kill message")
for job in self.jobs.values():
job.kill()
for thread in self.threads:
thread.join(2)
send_payload(connection, {"result": ""})
os._exit(0)
elif payload["command"] == "add":
assert "source" in payload
assert "destination" in payload
assert "cwd" in payload
with self.lock:
job = self.SyncLoopWorker(payload["source"], payload["destination"], payload["cwd"])
job_number = self._get_next_job_number()
self.jobs[job_number] = job
sync_thread = threading.Thread(target=job.sync_loop)
sync_thread.start()
self.threads.append(sync_thread)
send_payload(connection, {"result": ""})
else:
send_payload(connection, {"result": "No such command.\n"})
class SyncLoopWorker(object):
def __init__(self, source, destination, cwd):
self.source = source
self.destination = destination
self.cwd = cwd
self.killing = False
self.process = None
def kill(self):
self.killing = True
if self.process:
try:
self.process.kill()
except OSError:
pass
self.process = None
def get_full_path(self):
return os.path.join(self.cwd, self.source)
def sync_loop(self):
while True:
if self.killing or not os.path.exists(self.get_full_path()):
break
self.process = subprocess.Popen(["fswatch", "-m", "fsevents_monitor", "-1", "-r", "-v",
self.source], cwd=self.cwd, close_fds=True)
self.process.wait()
if self.killing or not os.path.exists(self.get_full_path()):
break
self.process = subprocess.Popen(["rsync", "-vazh", "--checksum", "--progress",
"--exclude=.git", "--exclude=.gitignore",
"--exclude=.DS_Store", self.source,
self.destination], cwd=self.cwd, close_fds=True)
self.process.wait()
logging.warn("Exiting sync loop for %s, because source directory %s disappeared" %
(repr(self.destination), repr(self.get_full_path())))
def run(self):
log_file = open(self.get_log_path(), "a+", 0)
os.close(0)
os.dup2(log_file.fileno(), 1)
os.dup2(log_file.fileno(), 2)
log_file.close()
while True:
connection, _ = self.socket.accept()
handler_thread = threading.Thread(target=self.handle_connection, args=[connection])
handler_thread.start()
class SmanClient(object):
def _try_connect(self):
connection = socket.socket(socket.AF_UNIX)
connection.connect(SmanDaemon.get_daemon_path())
return connection
def _spawn(self, mark_when_done=None):
daemon = SmanDaemon()
if mark_when_done:
mark_when_done.set()
try:
daemon.run()
except Exception:
logging.error("Error while running SmanDaemon")
finally:
os._exit(0)
def connect(self):
try:
return self._try_connect()
except socket.error:
done_event = multiprocessing.Event()
pid = os.fork()
if pid:
done_event.wait()
return self._try_connect()
else:
self._spawn(done_event)
@staticmethod
def log(message):
sys.stdout.write(message)
if __name__ == "__main__":
client = SmanClient()
if len(sys.argv) < 2:
client.log("Usage: sman [list|remove|kill]\n" +
" sman SOURCE DESTINATION\n")
sys.exit(1)
command = sys.argv[1]
command_aliases = {"ls": "list",
"jobs": "list",
"rm": "remove",
"delete": "remove",
"log": "logs"}
if command in command_aliases:
command = command_aliases[command]
if command == "logs":
args = ["tail", "-n", "80", "-f", SmanDaemon.get_log_path()]
os.execvp(args[0], args)
elif command in ("list", "kill"):
# No-argument commands
payload = {"command": command}
elif command == "remove":
if len(sys.argv) != 3:
client.log("Invalid usage")
sys.exit(1)
payload = {"command": command,
"job_number": int(sys.argv[2])}
else:
if len(sys.argv) == 3:
source = sys.argv[1]
destination = sys.argv[2]
else:
client.log("Invalid usage\n")
sys.exit(1)
if not os.path.exists(source):
client.log("No such file or directory: %s\n" % source)
sys.exit(1)
payload = {"command": "add",
"source": source,
"destination": destination,
"cwd": os.getcwd()}
connection = client.connect()
send_payload(connection, payload)
response = receive_payload(connection)
assert isinstance(response, dict)
assert "result" in response
client.log(response["result"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment