Created
November 20, 2015 12:54
-
-
Save subak/16294f4b40050ba14ff1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# unox | |
# | |
# Author: Hannes Landeholm <[email protected]> | |
# | |
# The Unison beta (2.48) comes with file system change monitoring (repeat = watch) | |
# through an abstract "unison-fsmonitor" adapter that integrates with each respective | |
# OS file update watch interface. This allows responsive dropbox like master-master sync | |
# of files over SSH. The Unison beta comes with an adapter for Windows and Linux but | |
# unfortunately lacks one for OS X. | |
# | |
# This script implements the Unison fswatch protocol (see /src/fswatch.ml) | |
# and is intended to be installed as unison-fsmonitor in the PATH in OS X. This is the | |
# missing puzzle piece for repeat = watch support for Unison in in OS X. | |
# | |
# Dependencies: pip install macfsevents | |
# | |
# Licence: MPLv2 (https://www.mozilla.org/MPL/2.0/) | |
import sys | |
import os | |
import time | |
import fsevents | |
import urllib.parse | |
import traceback | |
my_log_prefix = "[unox]" | |
_in_debug = "--debug" in sys.argv | |
_in_debug_plus = False | |
# Global MacFSEvents observer. | |
observer = fsevents.Observer() | |
observer.start() | |
# Dict of monitored replicas. | |
# Replica hash mapped to fsevents.Stream objects. | |
replicas = {} | |
# Dict of pending replicas that are beeing waited on. | |
# Replica hash mapped to True if replica is pending. | |
pending_reps = {} | |
# Dict of triggered replicas. | |
# Replica hash mapped to recursive dict where keys are path tokens or True for pending leaf. | |
triggered_reps = {} | |
def format_exception(e): | |
# Thanks for not bundling this function in the Python library Guido. *facepalm* | |
exception_list = traceback.format_stack() | |
exception_list = exception_list[:-2] | |
exception_list.extend(traceback.format_tb(sys.exc_info()[2])) | |
exception_list.extend(traceback.format_exception_only(sys.exc_info()[0], sys.exc_info()[1])) | |
exception_str = "Traceback (most recent call last):\n" | |
exception_str += "".join(exception_list) | |
exception_str = exception_str[:-1] | |
return exception_str | |
def _debug_triggers(): | |
global pending_reps, triggered_reps | |
if not _in_debug_plus: | |
return | |
wait_info = "" | |
if len(pending_reps) > 0: | |
wait_info = " | wait=" + str(pending_reps) | |
sys.stderr.write(my_log_prefix + "[DEBUG+]: trig=" + str(triggered_reps) + wait_info + "\n") | |
def _debug(msg): | |
sys.stderr.write(my_log_prefix + "[DEBUG]: " + msg.strip() + "\n") | |
def warn(msg): | |
sys.stderr.write(my_log_prefix + "[WARN]: " + msg.strip() + "\n") | |
def sendCmd(cmd, args): | |
raw_cmd = cmd | |
for arg in args: | |
raw_cmd += " " + urllib.parse.quote(arg); | |
if _in_debug: _debug("sendCmd: " + raw_cmd) | |
sys.stdout.write(raw_cmd + "\n") | |
# Safely injects a command to send from non-receive context. | |
def injectCmd(cmd, args): | |
sendCmd(cmd, args) | |
sys.stdout.flush() | |
def sendAck(): | |
sendCmd("OK", []) | |
def sendError(msg): | |
sendCmd("ERROR", [msg]) | |
os._exit(1) | |
def recvCmd(): | |
# We flush before stalling on read instead of | |
# flushing every write for optimization purposes. | |
sys.stdout.flush() | |
line = sys.stdin.readline() | |
if not line.endswith("\n"): | |
# End of stream means we're done. | |
if _in_debug: _debug("stdin closed, exiting") | |
sys.exit(0) | |
if _in_debug: _debug("recvCmd: " + line) | |
# Parse cmd and args. Args are url encoded. | |
words = line.strip().split(" ") | |
args = [] | |
for word in words[1:]: | |
args.append(urllib.parse.unquote(word)) | |
return [words[0], args] | |
def pathTokenize(path): | |
path_toks = [] | |
for path_tok in path.split("/"): | |
if len(path_tok) > 0: | |
path_toks.append(path_tok) | |
return path_toks | |
def triggerReplica(replica, local_path_toks): | |
global pending_reps, triggered_reps | |
if replica in pending_reps: | |
# Got event for pending replica, notify and reset wait. | |
injectCmd("CHANGES", [replica]) | |
pending_reps = {} | |
# Handle root. | |
if len(local_path_toks) == 0: | |
triggered_reps[replica] = True | |
return | |
elif not replica in triggered_reps: | |
cur_lvl = {} | |
triggered_reps[replica] = cur_lvl | |
else: | |
cur_lvl = triggered_reps[replica] | |
# Iterate through branches. | |
for branch_path_tok in local_path_toks[:len(local_path_toks) - 1]: | |
if cur_lvl == True: | |
return | |
if not branch_path_tok in cur_lvl: | |
new_lvl = {} | |
cur_lvl[branch_path_tok] = new_lvl | |
else: | |
new_lvl = cur_lvl[branch_path_tok] | |
cur_lvl = new_lvl | |
# Handle leaf. | |
if cur_lvl == True: | |
return | |
leaf_path_tok = local_path_toks[len(local_path_toks) - 1] | |
cur_lvl[leaf_path_tok] = True | |
_debug_triggers() | |
# Starts monitoring of a replica. | |
def startReplicaMon(replica, fspath, path): | |
global replicas, observer | |
if not replica in replicas: | |
# Ensure fspath has trailing slash. | |
fspath = os.path.join(fspath, "") | |
if _in_debug: _debug("start monitoring of replica [" + replica + "] [" + fspath + "]") | |
def replicaFileEventCallback(path, mask): | |
try: | |
if not path.startswith(fspath): | |
return warn("unexpected file event at path [" + path + "] for [" + fspath + "]") | |
local_path = path[len(fspath):] | |
local_path_toks = pathTokenize(local_path) | |
if _in_debug: _debug("replica:[" + replica + "] file event @[" + local_path + "] (" + path + ")") | |
triggerReplica(replica, local_path_toks) | |
except Exception as e: | |
# Because python is a horrible language it has a special behavior for non-main threads that | |
# fails to catch an exception. Instead of crashing the process, only the thread is destroyed. | |
# We fix this with this catch all exception handler. | |
sys.stderr.write(format_exception(e)) | |
sys.stderr.flush() | |
os._exit(1) | |
try: | |
# OS X has no interface for "file level" events. You would have to implement this manually in userspace, | |
# and compare against a snapshot. This means there's no point in us doing it, better leave it to Unison. | |
if _in_debug: _debug("replica:[" + replica + "] watching path [" + fspath + "]") | |
stream = fsevents.Stream(replicaFileEventCallback, fspath) | |
observer.schedule(stream) | |
except (FileNotFoundError, NotADirectoryError) as e: | |
sendError(str(e)) | |
replicas[replica] = { | |
"stream": stream, | |
"fspath": fspath | |
} | |
sendAck() | |
while True: | |
[cmd, args] = recvCmd(); | |
if cmd == "DIR": | |
sendAck() | |
elif cmd == "LINK": | |
sendError("link following is not supported by unison-watchdog, please disable this option (-links)") | |
elif cmd == "DONE": | |
return | |
else: | |
sendError("unexpected cmd in replica start: " + cmd) | |
def reportRecursiveChanges(local_path, cur_lvl): | |
if (cur_lvl == True): | |
sendCmd("RECURSIVE", [local_path]) | |
return | |
for path_tok, new_lvl in cur_lvl.items(): | |
reportRecursiveChanges(os.path.join(local_path, path_tok), new_lvl); | |
def main(): | |
global replicas, pending_reps, triggered_reps | |
# Version handshake. | |
sendCmd("VERSION", ["1"]) | |
[cmd, args] = recvCmd(); | |
if cmd != "VERSION": | |
sendError("unexpected version cmd: " + cmd) | |
[v_no] = args | |
if v_no != "1": | |
warn("unexpected version: " + v_no) | |
# Start watch operation. | |
_debug_triggers() | |
while True: | |
[cmd, args] = recvCmd(); | |
# Cancel pending waits when any other command is received. | |
if cmd != "WAIT": | |
pending_reps = {} | |
# Check command. | |
if cmd == "DEBUG": | |
_in_debug = True | |
elif cmd == "START": | |
# Start observing replica. | |
if len(args) >= 3: | |
[replica, fspath, path] = args | |
else: | |
# No path, only monitoring fspath. | |
[replica, fspath] = args | |
path = "" | |
startReplicaMon(replica, fspath, path) | |
elif cmd == "WAIT": | |
# Start waiting for another replica. | |
[replica] = args | |
if not replica in replicas: | |
sendError("unknown replica: " + replica) | |
if replica in triggered_reps: | |
# Is pre-triggered replica. | |
sendCmd("CHANGES", replica) | |
pending_reps = {} | |
else: | |
pending_reps[replica] = True | |
_debug_triggers() | |
elif cmd == "CHANGES": | |
# Get pending replicas. | |
[replica] = args | |
if not replica in replicas: | |
sendError("unknown replica: " + replica) | |
if replica in triggered_reps: | |
reportRecursiveChanges("", triggered_reps[replica]) | |
del triggered_reps[replica] | |
sendCmd("DONE", []) | |
_debug_triggers() | |
elif cmd == "RESET": | |
# Stop observing replica. | |
[replica] = args | |
if not replica in replicas: | |
warn("unknown replica: " + replica) | |
continue | |
stream = replicas[replica]["stream"] | |
if stream is not None: | |
observer.unschedule(stream) | |
del replicas[replica] | |
if replica in triggered_reps: | |
del triggered_reps[replica] | |
_debug_triggers() | |
else: | |
sendError("unexpected root cmd: " + cmd) | |
if __name__ == '__main__': | |
try: | |
main() | |
finally: | |
for replica in replicas: | |
observer.unschedule(replicas[replica]["stream"]) | |
observer.stop() | |
observer.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment