-
-
Save thautwarm/2ddb524401b744cdea2cdbdeb6ed817b to your computer and use it in GitHub Desktop.
switching git users with configuration files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import annotations | |
from pathlib import Path | |
from colorama import Fore, Style | |
from shutil import make_archive, unpack_archive | |
from contextlib import contextmanager | |
import io | |
import hashlib | |
import sys | |
import json | |
import wisepy2 | |
import typing | |
import base64 | |
import yaml | |
import struct | |
class Resource(typing.TypedDict): | |
is_dir: bool | |
data: str | |
class UserInfo(typing.TypedDict): | |
user: str | |
data: dict[str, Resource] | |
class Config(typing.TypedDict): | |
current_user: str | |
users: dict[str, UserInfo] | |
ssh_data_dir = Path("~").expanduser().joinpath(".dmgr").absolute() | |
ssh_data_dir.mkdir(parents=True, exist_ok=True, mode=0o700) | |
home = Path("~").expanduser().absolute() | |
def zip_directory_to_buf(directory: str): | |
"""Create a zip file in memory from a directory.""" | |
tmp_dir = ssh_data_dir.joinpath("tmp") | |
tmp_dir.mkdir(parents=True, exist_ok=True, mode=0o700) | |
make_archive(tmp_dir.joinpath("tmp"), "zip", directory) | |
with open(tmp_dir.joinpath("tmp.zip"), "rb") as f: | |
data = f.read() | |
tmp_dir.joinpath("tmp.zip").unlink() | |
return data | |
def zip_buf_to_directory(data: bytes, target_directory: str): | |
"""Unpack a zip file from memory to a directory.""" | |
tmp_dir = ssh_data_dir.joinpath("tmp") | |
tmp_dir.mkdir(parents=True, exist_ok=True, mode=0o700) | |
with open(tmp_dir.joinpath("tmp.zip"), "wb") as f: | |
f.write(data) | |
unpack_archive(tmp_dir.joinpath("tmp.zip"), target_directory) | |
tmp_dir.joinpath("tmp.zip").unlink() | |
@contextmanager | |
def with_config(): | |
_datafile = ssh_data_dir.joinpath("config.json") | |
config: Config | |
if not _datafile.exists(): | |
_datafile.write_text("{}", encoding='utf-8') | |
config = {} | |
else: | |
config = json.loads(_datafile.read_text(encoding='utf-8')) | |
try: | |
yield config | |
finally: | |
_datafile.write_text( | |
json.dumps(config, indent=4, ensure_ascii=False), | |
encoding='utf-8' | |
) | |
def visit_field_(c: dict, field: str, default): | |
if field in c: | |
return c[field] | |
c[field] = default | |
return default | |
def to_b64_str(s: bytes): | |
return base64.b64encode(s).decode('utf-8') | |
def from_b64_str(s: str): | |
return base64.b64decode(s.encode('utf-8')) | |
def create_resource(path: Path): | |
if path.is_dir(): | |
data = zip_directory_to_buf(path) | |
return Resource(is_dir=True, data=to_b64_str(data)) | |
data = path.read_bytes() | |
return Resource(is_dir=False, data=to_b64_str(data)) | |
def from_resource(resource: Resource, path: str): | |
data = from_b64_str(resource["data"]) | |
if resource["is_dir"]: | |
zip_buf_to_directory(data, path) | |
else: | |
Path(path).write_bytes(data) | |
class Main: | |
@staticmethod | |
def add(path: str): | |
file = Path(path).absolute() | |
if not file.exists(): | |
print(Fore.YELLOW + f"File {path!r} does not exist." + Style.RESET_ALL) | |
sys.exit(1) | |
if not file.is_relative_to(home): | |
print(Fore.RED + f"File {path!r} is not in your home directory." + Style.RESET_ALL) | |
sys.exit(1) | |
with with_config() as config: | |
# package 'file' into a zip file in memory | |
user = visit_field_(config, "current_user", home.name) | |
user_info = visit_field_(visit_field_(config, "users", {}), user, {}) | |
user_info['user'] = user | |
data = visit_field_(user_info, "data", {}) | |
data[file.relative_to(home).as_posix()] = create_resource(file) | |
print(Fore.GREEN + f"Added file {path!r} to {user}'s config." + Style.RESET_ALL) | |
@staticmethod | |
def sync(user: str = ""): | |
with with_config() as config: | |
user = user or visit_field_(config, "current_user", home.name) | |
users = visit_field_(config, "users", {}) | |
user_info = visit_field_(users, user, {}) | |
user_info = typing.cast('UserInfo', user_info) | |
user_info['user'] = user | |
data = visit_field_( | |
user_info, "data", | |
typing.cast('dict[str, Resource]', {}) | |
) | |
for path in list(data.keys()): | |
data[path] = create_resource(home.joinpath(path)) | |
print(Fore.GREEN + f"Synced data for user {user!r}." + Style.RESET_ALL) | |
@staticmethod | |
def create(user: str): | |
Main.sync() | |
with with_config() as config: | |
users = visit_field_(config, "users", {}) | |
if user in users: | |
print(Fore.YELLOW + f"User {user!r} already exists." + Style.RESET_ALL) | |
else: | |
_ = visit_field_(users, user, UserInfo(user=user, data={})) | |
print(Fore.GREEN + f"Created user {user!r}." + Style.RESET_ALL) | |
@staticmethod | |
def switch(user: str): | |
Main.sync() | |
with with_config() as config: | |
user = user | |
users = visit_field_(config, "users", {}) | |
if user not in users: | |
print(Fore.RED + f"User {user!r} does not exist." + Style.RESET_ALL) | |
sys.exit(1) | |
user_info = visit_field_(users, user, {}) | |
user_info['user'] = user | |
data = visit_field_(user_info, "data", {}) | |
for path, resource in data.items(): | |
from_resource(resource, home.joinpath(path).as_posix()) | |
config["current_user"] = user | |
print(Fore.GREEN + f"Switched to user {user!r}." + Style.RESET_ALL) | |
@staticmethod | |
def show(sync: bool = False): | |
if sync: | |
Main.sync() | |
with with_config() as config: | |
buf = io.StringIO() | |
yaml.safe_dump(_show_config(config), buf) | |
print(buf.getvalue()) | |
def ident(x: str): | |
maxi = 2 ** 32 | |
z = 1283 | |
for c in from_b64_str(x): | |
z = (z * (1 + c) + 7) % maxi | |
return z | |
def _show_resource(r: Resource): | |
if r["is_dir"]: | |
return "/" | |
else: | |
return '' | |
def _show_user_info(user_info: UserInfo): | |
files = {} | |
for k, v in user_info.get("data", {}).items(): | |
files[k + _show_resource(v)] = ident(v["data"]) | |
return {'user': user_info['user'], 'items': files} | |
def _show_config(config: Config): | |
current_user = config.get("current_user", home.name) | |
users = config.get("users", {}) | |
results = {} | |
for k, v in users.items(): | |
results[k] = _show_user_info(v) | |
return {'current_user': current_user, 'users': results} | |
wisepy2.wise(Main)() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment