Created
April 9, 2020 10:17
-
-
Save XVilka/f124936a336a5a43f54915369719e626 to your computer and use it in GitHub Desktop.
Dotfiles Syncronization Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# pylint: disable=missing-docstring | |
# pylint: disable=invalid-name | |
# pylint: disable=wrong-import-position | |
# pylint: disable=import-outside-toplevel | |
# pylint: disable=logging-format-interpolation | |
"""Dotfiles Syncronization Script | |
This script allows to syncronize your dotfiles among different computers with | |
various environment and used programs. It can work locally and remotely through | |
SSH tunnel. | |
It has two main modes - local and remote. The files to be syncronized are described | |
in the JSON file following this structure: | |
.. code-block:: json | |
{ | |
"configs": { | |
"single entry": { | |
"config": "repository/local/path", | |
"location": "target/location", | |
}, | |
"category": { | |
"single entry": { | |
"config": "repository/local/path", | |
"location": "target/location", | |
"ssh": false, | |
"msys": true | |
}, | |
... | |
}, | |
... | |
} | |
The structure of the config allows to have a different dotfiles structure in the | |
local storage from the real config files location. By default it reads "./configfiles.json" | |
but the location of it can be specified with ``-l`` option. | |
The usual workflow is to have the "~/dotfiles" directory which is a git repository | |
with your files that can be syncronized through GitHub or any other service. And then | |
running ``sync.py`` to update your real configuration files. | |
The default base path for copying files is ``$HOME``, but a different one can be specified | |
with ``-p`` option. The configuration entry can contain the platform arguments, | |
like ``"ssh": false`` and ``"msys": true``. This example means that the file will not | |
be copied during the remote mode, and will be installed for the MSYS2 configuration | |
on Windows platform. The supported platforms are: "msys", "docker", "ssh". | |
It can copy the files to the location (``-c`` option) or just create symlinks instead (the default | |
mode). If the target file is already in place it's possible to force the replace (or fix the | |
symbolic link) with the ``-r`` option. | |
Remote syncronization allows you upload the files to the remote machine through either direct | |
SSH connection, or using a SSH tunnel. It reads the current SSH config file (e.g. "~/.ssh/config") | |
and loads the settings from there. For example, if you have these lines in your SSH config: | |
.. code-block:: txt | |
Host myhost | |
HostName cute.domain.address.io | |
Port 9999 | |
User myuser | |
IdentityFile ~/.ssh/myhost_rsa | |
it will allow you to specify just the name of the host with command ``sync.py -m remotessh myhost``. | |
In case you want to use the SSH tunnel, the corresponding mode is ``-m sshtunnel``. It also reads | |
the settings from your SSH config: | |
.. code-block:: txt | |
Host server1 | |
HostName cute.address.io | |
IdentityFile ~/.ssh/myhost_rsa | |
Host server2_behind_server1 | |
HostName ugly.address.io | |
IdentityFile ~/.ssh/myhost_rsa | |
ProxyCommand ssh server1 -W %h:%p | |
The script requires only two external Python packages: | |
* paramiko | |
* sshtunnel | |
Both are not a requirement if only the local mode is being used. | |
""" | |
import os | |
import os.path as path | |
import sys | |
import argparse | |
import shutil | |
import json | |
import configparser | |
import getpass | |
import logging | |
if sys.version_info.major < 3 or sys.version_info.minor < 7: | |
raise Exception("Only Python >= 3.7 supported!") | |
from enum import Enum | |
from dataclasses import dataclass | |
logging.basicConfig(format="%(message)s", level=logging.DEBUG) | |
# -------------------------------------------------------------------- | |
# 1. Check if all needed executables and libraries are installed | |
missing_modules_message = """ | |
paramiko and sshtunnel are not found! | |
Please install it using the following command: | |
pip3 install -U --user paramiko sshtunnel | |
""" | |
def check_module(package_name): | |
import importlib.util | |
spec = importlib.util.find_spec(package_name) | |
if spec is None: | |
return False | |
return True | |
# ----------------------------------------------------------------------------- | |
# Various helpers | |
def ignore_platform(item, current, target): | |
if target in item: | |
if (not item[target]) and (current == target): | |
return True | |
return False | |
return False | |
# Check the symlink and recreate if broken | |
def recreate_symlink(dest, source): | |
target_path = os.readlink(dest) | |
if not path.isabs(target_path): | |
target_path = path.join(path.dirname(dest), target_path) | |
if not target_path == source: | |
# remove file | |
logging.info("Removing broken link {0} -> {1}".format(dest, target_path)) | |
os.remove(dest) | |
logging.info("Creating symlink {0} -> {1}".format(dest, source)) | |
os.symlink(source, dest) | |
else: | |
logging.warning("Symlink is already valid {0} -> {1}".format(dest, target_path)) | |
# Replace file or symlink if exists already | |
def replace_files(dest, source): | |
if path.islink(dest): | |
recreate_symlink(dest, source) | |
elif path.isfile(dest): | |
# remove file | |
logging.warning("Removing regular file {0}".format(dest)) | |
os.remove(dest) | |
logging.info("Creating symlink {0} -> {1}".format(dest, source)) | |
os.symlink(source, dest) | |
else: | |
destdir = path.dirname(dest) | |
if path.exists(destdir): | |
logging.info("Creating symlink {0} -> {1}".format(dest, source)) | |
os.symlink(source, dest) | |
else: | |
logging.error("Directory {0} doesn't exist".format(destdir)) | |
# Plain symlink without replace | |
def symlink_files(dest, source): | |
if (not path.isfile(dest)) and (not path.islink(dest)): | |
dest_dir = path.dirname(dest) | |
if path.exists(dest_dir): | |
logging.info("Creating symlink {0} -> {1}".format(dest, source)) | |
os.symlink(source, dest) | |
else: | |
logging.error("Directory {0} doesn't exist".format(dest_dir)) | |
else: | |
logging.warning("File {0} exists".format(dest)) | |
# ----------------------------------------------------------------------------- | |
class TransferMode(Enum): | |
localcopy = 1 | |
remotessh = 2 | |
sshtunnel = 3 | |
def __str__(self): | |
return self.name | |
def __repr__(self): | |
return str(self) | |
@staticmethod | |
def argparse(s): | |
try: | |
return TransferMode[s] | |
except KeyError: | |
raise s | |
# ----------------------------------------------------------------------------- | |
# A wrapper for paramiko and sshtunnel to not require these modules | |
# if the remotessh/tunnel modes are not activated | |
class SSHFunctions(): | |
def __init__(self): | |
# Do not check if it's just a local copy | |
if (not check_module("paramiko")) and (not check_module("sshtunnel")): | |
raise Exception(missing_modules_message) | |
import paramiko | |
# TODO: Switch to the vanilla paramiko? | |
import sshtunnel | |
# A workaround for static analyzers | |
self.paramiko = paramiko | |
self.sshtunnel = sshtunnel | |
def config_parser(self, target, configpath): | |
# Initialize the configuration | |
cfg = {'hostname': target.machine, 'port': None, 'username': None, 'password': None} | |
# Parse the system SSH configuration file | |
ssh_config = self.paramiko.SSHConfig() | |
# TODO: Allow different location? | |
user_config_file = os.path.expanduser("~/.ssh/config") | |
if os.path.exists(user_config_file): | |
with open(user_config_file) as f: | |
ssh_config.parse(f) | |
# Search for the specified machine name (hostname) in the SSH config | |
user_config = ssh_config.lookup(cfg['hostname']) | |
# If the entry exists - use the SSH configuration | |
if user_config: | |
for k in ('hostname', 'username', 'port'): | |
if k in user_config: | |
cfg[k] = user_config[k] | |
# Load also the key file if present | |
if 'identityfile' in user_config: | |
cfg['key_filename'] = user_config['identityfile'][0] | |
cfg['look_for_keys'] = False | |
# TODO: Add support for `ProxyJump` directive | |
if 'proxycommand' in user_config: | |
cfg['sock'] = self.paramiko.ProxyCommand(user_config['proxycommand']) | |
# If the username not in the SSH config - use the current one | |
if 'username' not in user_config: | |
cfg['username'] = getpass.getuser() | |
# TODO: Maybe ask the user/password interactively instead? | |
logging.debug("Loaded SSH connection configuration from {0}".format(user_config_file)) | |
# If it doesn't - try to load the custom configuration file | |
else: | |
# TODO: Allow the keys auth in the configuration options | |
# Read the settings from configuration file | |
conf = configparser.ConfigParser() | |
conf.read([os.path.expanduser(configpath)]) | |
sect = "SSH" | |
if sect in conf.sections(): | |
cfg['hostname'] = conf.get(sect, "REMOTEHOST") | |
cfg['port'] = conf.get(sect, "REMOTEPORT") | |
cfg['username'] = conf.get(sect, "USERNAME") | |
# TODO: Allow to specify the key! | |
cfg['password'] = conf.get(sect, "PASSWORD") | |
logging.debug("Loaded SSH connection configuration from {0}".format(configpath)) | |
else: | |
return None | |
return cfg | |
# Separate connection for copying the files over | |
def createSSHClient(self, cfg): | |
self.paramiko.util.log_to_file("sync_ssh_upload.log") | |
client = self.paramiko.SSHClient() | |
client.load_system_host_keys() | |
client.set_missing_host_key_policy(self.paramiko.AutoAddPolicy()) | |
logging.debug("Connecting to {0}@{1}:{2}..." | |
.format(cfg['username'], cfg['hostname'], cfg['port'])) | |
client.connect(**cfg) | |
return client | |
# Forwarder for anything other rather than just copying | |
def createSSHForwarder(self, sshtunconf): | |
server = self.sshtunnel.SSHTunnelForwarder( | |
(sshtunconf.host, int(sshtunconf.port)), | |
ssh_username=sshtunconf.username, | |
ssh_password=sshtunconf.password, | |
remote_bind_address=('127.0.0.1', int(sshtunconf.remote_port))) | |
return server | |
# ----------------------------------------------------------------------------- | |
class SSHConfig(): | |
def __init__(self, functions, target, mode=TransferMode.localcopy, configpath="sync.conf"): | |
self.ssh = None | |
# TODO: Parse the URLs like "user:[email protected]:port" | |
if target and target.machine: | |
self.ssh = functions.config_parser(target, configpath) | |
if (mode is not TransferMode.localcopy) and (self.ssh is None): | |
raise Exception("Error - can't find suitable configuration for remote installation!") | |
else: | |
if mode is not TransferMode.localcopy: | |
raise ValueError("Error - target can't be empty!") | |
# ----------------------------------------------------------------------------- | |
# ----------------------------------------------------------------------------- | |
# For accessing everything through SSH tunnel | |
# TODO: Move inside the SSHFunctions maybe? | |
# TODO: Load ssh tunnels also from the .ssh/config file (like in SSHConfig class) | |
class DotfilesSSHTunnel(): | |
def __init__(self, config, functions): | |
self.config = config | |
self.functions = functions | |
self.tunnels = [] | |
def find_forward(self, remote_port): | |
result = None | |
for t in self.tunnels: | |
if t['remote_port'] == remote_port: | |
result = t | |
return result | |
# Returns local bind port number | |
def add_forward(self, remote_port): | |
def create_tunnel(): | |
sshtunnel = self.functions.sshtunnel | |
tunnel = sshtunnel.SSHTunnelForwarder( | |
(self.config.host, int(self.config.port)), | |
ssh_username=self.config.username, | |
ssh_password=self.config.password, | |
remote_bind_address=('localhost', int(remote_port)), | |
logger=sshtunnel.create_logger(loglevel="ERROR") | |
) | |
if tunnel: | |
tunnel.start() | |
local_port = str(tunnel.local_bind_port) | |
forwarder = { | |
'remote_port': remote_port, | |
'local_port': local_port, | |
'tunnel': tunnel, | |
} | |
self.tunnels.append(forwarder) | |
logging.debug("Forward for {0} port successful!".format(remote_port)) | |
logging.debug("Added:\n{0}".format(forwarder)) | |
return | |
create_tunnel() | |
return self.find_forward(remote_port) | |
def del_forward(self, remote_port): | |
t = self.find_forward(remote_port) | |
if t: | |
tunnel = t['tunnel'] | |
tunnel.stop() | |
self.tunnels.remove(t) | |
# ----------------------------------------------------------------------------- | |
class _SSH(): | |
def __init__(self, sshfunctions): | |
self.sftpsession = None | |
self.tunnel = None | |
self.functions = sshfunctions | |
# High-level access for files transfer | |
class DotfilesTransferSSH(): | |
def __init__(self, mode, config, sshfunctions): | |
self.mode = mode | |
self.config = config | |
# can contain ssh.tunnel, ssh.sftpsession, etc | |
self.ssh = _SSH(sshfunctions) | |
def ssh_start(self): | |
# Start SSH server | |
if self.config and self.config.ssh: | |
logging.info("Connecting to the remote machine...") | |
ssh = self.ssh.functions.createSSHClient(self.config.ssh) | |
sftpclient = self.ssh.functions.paramiko.sftp_client.SFTPClient | |
self.ssh.sftpsession = sftpclient.from_transport(ssh.get_transport()) | |
else: | |
logging.error("Error initializing SSH tranfer - missing configuration!") | |
def start(self): | |
if self.mode == TransferMode.sshtunnel: | |
# Initialize tunnel first | |
logging.info("Initializing SSH tunnel...") | |
elif self.mode == TransferMode.remotessh: | |
# Simple SFTP copy | |
self.ssh_start() | |
else: | |
raise Exception("Wrong mode!") | |
# ------------------------------------------------------------------------------ | |
def ssh_close(self): | |
if self.ssh and self.ssh.sftpsession: | |
self.ssh.sftpsession.close() | |
if self.ssh and self.ssh.tunnel: | |
self.ssh.tunnel.stop() | |
def close(self): | |
if self.mode == TransferMode.remotessh or self.mode == TransferMode.sshtunnel: | |
logging.info("Closing all remote SSH connections...") | |
self.ssh_close() | |
else: | |
raise Exception("Wrong mode!") | |
# ------------------------------------------------------------------------------ | |
# Copy files over SSH | |
def ssh_copy(self, dest, source): | |
try: | |
self.ssh.sftpsession.put(source, dest, confirm=True) | |
logging.info("SFTP: {0} successfully uploaded".format(source)) | |
return True | |
except Exception as e: | |
# TODO: Check if file exists (check size) | |
logging.error("SFTP: {0} error during {1} uploading".format(e, source)) | |
return False | |
def copy(self, dest, source): | |
if self.mode == TransferMode.sshtunnel or self.mode == TransferMode.remotessh: | |
# SFTP copy | |
logging.info("SFTP: copying {0} to {1}...".format(source, dest)) | |
self.ssh_copy(dest, source) | |
else: | |
raise Exception("Wrong mode!") | |
# High-level access for files transfer | |
class DotfilesTransferLocal(): | |
def __init__(self, mode): | |
self.mode = mode | |
def start(self): | |
logging.info("Starting to copy the local files...") | |
# ------------------------------------------------------------------------------ | |
def close(self): | |
pass | |
# Copy files and create directories if required | |
def copy(self, dest, source): | |
logging.info('Copying {0} to {1}'.format(source, dest)) | |
os.makedirs(os.path.dirname(dest), exist_ok=True) | |
shutil.copyfile(source, dest) | |
# ----------------------------------------------------------------------------- | |
def is_category(entry): | |
if "config" in entry and "location" in entry: | |
return False | |
return True | |
# TODO: Support the "after" to run the command after the copy was done | |
class Dotfiles(): | |
def __init__(self, target, mode=TransferMode.localcopy): | |
self.target = target | |
if mode is not TransferMode.localcopy: | |
# Load SSH dependencies and helpers | |
sshfunctions = SSHFunctions() | |
# Load the SSH configuration and keys | |
sshconfig = SSHConfig(sshfunctions, target, mode) | |
if not sshconfig: | |
raise Exception("Error initializing the transfer - missing configuration!") | |
self.transfer = DotfilesTransferSSH(mode, sshconfig, sshfunctions) | |
else: | |
self.transfer = DotfilesTransferLocal(mode) | |
self.transfer.start() | |
# Create backups | |
def local_backup(self, source, dest): | |
logging.info('Copying {0} to {1}'.format(source, dest)) | |
err_msg = 'Config is not at path: {0}'.format(path.abspath(source)) | |
assert path.isfile(source), err_msg | |
shutil.copyfile(source, dest) | |
# TODO: Add SSH backups | |
def ssh_backup(self, source, dest): | |
pass | |
def backup(self, source, dest): | |
if self.transfer and self.transfer.mode is TransferMode.localcopy: | |
self.local_backup(source, dest) | |
else: | |
self.ssh_backup(source, dest) | |
def local_copy_file(self, options, entry): | |
if ignore_platform(entry, "msys", self.target.platform): | |
return | |
if ignore_platform(entry, "docker", self.target.platform): | |
return | |
loc_path = os.path.join(target.path, entry['location']) | |
conf_full_path = path.abspath(path.expanduser(entry['config'])) | |
loc_full_path = path.abspath(path.expanduser(loc_path)) | |
if options.backup: | |
self.backup(loc_full_path, conf_full_path) | |
else: | |
err_msg = 'Config is not at path: {0}'.format(path.abspath(conf_full_path)) | |
assert path.isfile(conf_full_path), err_msg | |
if options.copy: | |
self.transfer.copy(loc_full_path, conf_full_path) | |
else: | |
# Force rewriting the files, replace them | |
if options.replace: | |
replace_files(loc_full_path, conf_full_path) | |
# Do not replace existing files | |
else: | |
symlink_files(loc_full_path, conf_full_path) | |
def ssh_copy_file(self, options, entry): | |
# Process only whitelisted files! | |
if ignore_platform(entry, "ssh", "ssh"): | |
return | |
loc_path = entry['location'] | |
conf_full_path = path.abspath(path.expanduser(entry['config'])) | |
if options.backup: | |
self.backup(loc_path, conf_full_path) | |
self.transfer.copy(loc_path, conf_full_path) | |
# ----------------------------------------------------------------------------- | |
# TODO: handle also the diffs between existing and repository files | |
# TODO: Handle exceptions in case of missing files, directories, etc | |
def local_copy(self, options, filelist): | |
script_location = path.abspath(__file__) | |
config_directory = path.dirname(script_location) | |
os.chdir(config_directory) | |
for k, v in filelist.items(): | |
# Filelist entries can be pure entry or a category | |
if is_category(v): | |
logging.info("Processing category \"{0}\"".format(k)) | |
for _, y in v.items(): | |
self.local_copy_file(options, y) | |
else: | |
self.local_copy_file(options, v) | |
# ----------------------------------------------------------------------------- | |
# TODO: handle also the diffs between existing and repository files | |
# TODO: Handle exceptions in case of missing files, directories, etc | |
def ssh_copy(self, options, filelist): | |
script_location = path.abspath(__file__) | |
config_directory = path.dirname(script_location) | |
os.chdir(config_directory) | |
for k, v in filelist.items(): | |
# Filelist entries can be pure entry or a category | |
if is_category(v): | |
logging.info("Processing category \"{0}\"".format(k)) | |
for _, y in v.items(): | |
self.ssh_copy_file(options, y) | |
else: | |
self.ssh_copy_file(options, v) | |
def copy(self, options, filelist): | |
if self.transfer and self.transfer.mode is TransferMode.localcopy: | |
logging.info("Installing the dotfiles locally...") | |
self.local_copy(options, filelist) | |
else: | |
logging.info("Installing the dotfiles remotely...") | |
self.ssh_copy(options, filelist) | |
def close(self): | |
if self.transfer: | |
self.transfer.close() | |
# ----------------------------------------------------------------------------- | |
@dataclass | |
class Target: | |
platform: str = sys.platform | |
path: str = "~" | |
machine: str = None | |
files: str = "configfiles.json" | |
@dataclass | |
class Options: | |
backup: bool = False | |
copy: bool = False | |
replace: bool = False | |
# ----------------------------------------------------------------------------- | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-b", "--backup", action='store_true', | |
help="Backup the files from your $HOME") | |
parser.add_argument("-m", "--mode", type=TransferMode.argparse, | |
choices=list(TransferMode), default=TransferMode.localcopy, | |
help="Specify the transfer mode") | |
parser.add_argument("-c", "--copy", action='store_true', | |
help="Copy the files instead of creating symlinks") | |
parser.add_argument("-r", "--replace", action='store_true', | |
help="Force rewriting the files, replace them") | |
parser.add_argument("-l", "--list", action="store", | |
help="Specify the config files list") | |
parser.add_argument("-p", "--path", action="store", | |
help="Specify target path if different from $HOME") | |
parser.add_argument("-t", "--target", action="store", | |
help="Specify the target platform if different from current") | |
# And one more argument - the hostname, if the remote modes are chosen | |
parser.add_argument("machine", nargs="?", | |
help="Specify the machine name in case of remote installation") | |
args = parser.parse_args() | |
# Some defaults | |
target = Target() | |
# Non-default target options | |
if args.list is not None: | |
target.files = args.list | |
if args.target is not None: | |
target.platform = args.target | |
if args.path is not None: | |
target.path = args.path | |
if args.machine is not None: | |
target.machine = args.machine | |
# Set the operation options | |
options = Options() | |
options.backup = args.backup | |
options.copy = args.copy # Makes sense only for the local installations! | |
options.replace = args.replace | |
# Initialize the configuration, and a transfer, e.g. for SSH tunnel | |
dotfiles = Dotfiles(target, mode=args.mode) | |
# Load categories from JSON file | |
# TODO: Support also YAML | |
with open(target.files, "r") as json_file: | |
configs = json.load(json_file) | |
filelist = configs["configs"] | |
logging.info("Processing the {0} dotfiles list...".format(target.files)) | |
dotfiles.copy(options, filelist) | |
# Close all connections gracefully if there were any | |
dotfiles.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
That;s my kinda script :-) neatly written... enjoyed browsing thru it - you definitely put some thought in it from the start, but kept it really readable 👍