|
#!/usr/bin/env python3 |
|
|
|
# SPDX-FileCopyrightText: ANNO DOMINI 2024 Jan Chren (rindeal) <dev.rindeal(a)gmail.com> |
|
# SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only |
|
|
|
import sys |
|
|
|
assert sys.version_info >= (3, 11) |
|
|
|
import argparse |
|
import copy |
|
import enum |
|
from typing import Generator |
|
|
|
|
|
from gi.repository import Gio, GLib |
|
|
|
|
|
PROG = "gsettings-array" |
|
AUTHOR = "Jan Chren (rindeal)" |
|
URL = "https://gist.github.com/rindeal/c5786254410028f760ee2351d884a744" |
|
LICENSE = "GPL-2.0-only OR GPL-3.0-only" |
|
COPYRIGHT = f"(C) ANNO DOMINI 2024 {AUTHOR}" |
|
|
|
|
|
class args_auto: |
|
pass |
|
|
|
|
|
class ArgsMeta(type): |
|
def __new__(metacls, cls, bases, classdict): |
|
classdict.update({k: k.lower() for k, v in classdict.items() if isinstance(v, args_auto)}) |
|
return super().__new__(metacls, cls, bases, classdict) |
|
|
|
|
|
class ArgsBase: |
|
def _arg_dir(self) -> Generator[str, None, None]: |
|
return (k for k in dir(self) if k[0] != '_' and k.replace('_', '').islower()) |
|
|
|
def __init__(self, args_ns: argparse.Namespace): |
|
for key in self._arg_dir(): |
|
attr_val = getattr(self, key) |
|
if hasattr(args_ns, key): |
|
attr_t = type(attr_val) |
|
new_val = attr_t(getattr(args_ns, key)) |
|
else: |
|
new_val = copy.copy(attr_val) |
|
setattr(self, key, new_val) |
|
|
|
def __repr__(self) -> str: |
|
attributes = ", ".join(f"{k}={getattr(self, k)!r}" for k in self._arg_dir()) |
|
return f"{self.__class__.__name__}({attributes})" |
|
|
|
|
|
class ArgCmdName(enum.StrEnum): |
|
_ = '' |
|
INSERT = enum.auto() |
|
LS = enum.auto() |
|
SORT = enum.auto() |
|
DEDUP = enum.auto() |
|
POP = enum.auto() |
|
RM = enum.auto() |
|
CLEAR = enum.auto() |
|
UPDATER = enum.auto() |
|
|
|
|
|
class _UpdaterCmdName(enum.StrEnum): |
|
_ = '' |
|
CHECK = enum.auto() |
|
DIFF = enum.auto() |
|
UPDATE = enum.auto() |
|
|
|
|
|
class Args(ArgsBase, metaclass=ArgsMeta): |
|
CMD = args_auto(); cmd: ArgCmdName = ArgCmdName('') |
|
SCHEMA = args_auto(); schema: str = str() |
|
KEY = args_auto(); key: str = str() |
|
INDEX = args_auto(); index: int = int(sys.maxsize) |
|
ITEMS = args_auto(); items: list[str] = list() |
|
UPDATER = args_auto(); updater: _UpdaterCmdName = _UpdaterCmdName('') |
|
|
|
OPT_SORT = args_auto(); opt_sort: bool = bool(False) |
|
OPT_REVERSE = args_auto(); opt_reverse: bool = bool(False) |
|
OPT_DEDUP = args_auto(); opt_dedup: bool = bool(False) |
|
OPT_CLEAR = args_auto(); opt_clear: bool = bool(False) |
|
# OPT_CHECK_UPDATE = args_auto(); opt_check_update: bool = bool(False) |
|
|
|
|
|
class Updater: |
|
URL = "https://gist.github.com/rindeal/c5786254410028f760ee2351d884a744/raw/gsettings-array.py" |
|
|
|
CmdName = _UpdaterCmdName |
|
|
|
@classmethod |
|
def handle_args(cls, args: Args) -> int | str: |
|
if args.updater in {cls.CmdName.CHECK, cls.CmdName.DIFF}: |
|
print_diff = cls.CmdName.DIFF == args.updater |
|
is_update_available, err = cls.check_for_update(print_diff=print_diff) |
|
if print_diff: |
|
return 0 |
|
if is_update_available: |
|
print("Update is available!") |
|
print("Download it from", Updater.URL) |
|
return 0 |
|
else: |
|
if err: |
|
return f"Error occurred when checking for an update: '{err}'" |
|
print("No update available.") |
|
return 61 # ENODATA 61 No data available |
|
elif cls.CmdName.UPDATE == args.updater: |
|
# TODO |
|
return 0 |
|
else: |
|
assert False |
|
|
|
@classmethod |
|
def check_for_update(cls, local_path: str = __file__, remote_url: str = '', print_diff: bool = False) -> tuple[bool, str]: |
|
""" |
|
Returns: |
|
- (True, '') if there are updates |
|
- (False, '') if there are no updates |
|
- (False, error_message) if there was and error |
|
""" |
|
import os |
|
import urllib.request |
|
|
|
if not remote_url: |
|
remote_url = cls.URL |
|
|
|
try: |
|
response = urllib.request.urlopen(remote_url) |
|
if response.getcode() != 200: |
|
return False, "Failed to get a successful response from the server." |
|
|
|
remote_info = response.info() |
|
remote_size: str | int | None = remote_info.get('Content-Length') |
|
if remote_size is None: |
|
return False, "Could not retrieve 'Content-Length' of the remote file." |
|
remote_size = int(remote_size) |
|
if remote_size < 1e3: |
|
return False, "Remote file size is too small" |
|
if remote_size > 1e6: |
|
return False, "Remote file size is too big" |
|
local_size = os.path.getsize(local_path) |
|
if not print_diff and local_size != remote_size: |
|
return True, '' |
|
|
|
remote_content = response.read().decode('utf-8') |
|
with open(local_path, 'r') as f: |
|
local_content = f.read() |
|
|
|
if print_diff: |
|
import difflib |
|
|
|
def generate_unified_diff(old_content: str, new_content: str, file_name: str) -> str: |
|
"""Generate a unified diff between old_content and new_content.""" |
|
difflines = difflib.unified_diff( |
|
old_content.splitlines(keepends=True), |
|
new_content.splitlines(keepends=True), |
|
fromfile=f"Old '{file_name}'", |
|
tofile=f"New '{file_name}'", |
|
) |
|
return ''.join(list(difflines)).strip() |
|
diff = generate_unified_diff(local_content, remote_content, PROG) |
|
print(diff) |
|
|
|
return (True, '') if remote_content != local_content else (False, '') |
|
|
|
except urllib.error.URLError as e: |
|
return False, f"An error occurred while trying to access the URL: {e}" |
|
except OSError as e: |
|
return False, f"An error occurred while trying to access the local file: {e}" |
|
|
|
|
|
class App: |
|
@staticmethod |
|
def _parse_args(raw_arg_list: list[str] | None = None) -> Args: |
|
indent = " " * 4 |
|
main_parser = argparse.ArgumentParser( |
|
PROG, |
|
description='Manipulate GSettings arrays.', |
|
epilog="\n".join([ |
|
indent*0 + "additional information:", |
|
indent*1 + f'''Docs: {URL}''', |
|
indent*1 + f'''Copyright: {COPYRIGHT}''', |
|
indent*1 + f'''License: {LICENSE}''', |
|
]), |
|
formatter_class=argparse.RawDescriptionHelpFormatter |
|
) |
|
|
|
# main_parser.add_argument('--check-for-update', dest=Args.OPT_CHECK_UPDATE, help="Check Gist GitHub for an update to this utility.", action='store_true') |
|
|
|
cmdpar = main_parser.add_subparsers(metavar='COMMAND', dest=Args.CMD, help="Task to perform on the array. Available commands are:", required=True) |
|
|
|
C = ArgCmdName |
|
P = { |
|
C.INSERT: cmdpar.add_parser(C.INSERT, help="Insert one or more items starting at a specified index."), |
|
C.LS: cmdpar.add_parser(C.LS, help="List all items in the array, each on a new line."), |
|
C.SORT: cmdpar.add_parser(C.SORT, help="Sort all items in the array."), |
|
C.DEDUP: cmdpar.add_parser(C.DEDUP, help="Remove duplicated items from the array."), |
|
C.POP: cmdpar.add_parser(C.POP, help="Print and remove the item at a specified index."), |
|
C.RM: cmdpar.add_parser(C.RM, help="Remove one or more items from the array."), |
|
C.CLEAR: cmdpar.add_parser(C.CLEAR, help="Clear all items from the array."), |
|
} |
|
|
|
updater_parser = cmdpar.add_parser(C.UPDATER, help="Check updates for this tool and possibly update itself.") |
|
upcmdpar = updater_parser.add_subparsers(metavar='COMMAND', dest=Args.UPDATER, required=True) |
|
UC = Updater.CmdName |
|
upcmdpar.add_parser(UC.CHECK, help="Check for updates.") |
|
upcmdpar.add_parser(UC.DIFF, help="Display unified diff with new changes.") |
|
upcmdpar.add_parser(UC.UPDATE, help="Perform self-update.") |
|
|
|
for p in P.values(): |
|
p.add_argument(metavar='SCHEMA', dest=Args.SCHEMA, help="GSettings schema, eg. `org.gnome.desktop.input-sources`") |
|
p.add_argument(metavar='KEY', dest=Args.KEY, help="GSettings key, eg. `sources`") |
|
for p in (P[C.INSERT], P[C.POP]): |
|
p.add_argument(metavar='INDEX', dest=Args.INDEX, help="Array index, 0 = first, ..., -1 = last", type=int) |
|
for p in (P[C.INSERT], P[C.RM]): |
|
p.add_argument(metavar='ITEM', dest=Args.ITEMS, help="Value formatted according to the array's inner type. Use `gsettings range` command to inspect array type.", nargs=argparse.ONE_OR_MORE) |
|
for p in (P[C.INSERT],): |
|
p.add_argument('--clear', dest=Args.OPT_CLEAR, help="Run `clear` before main task", action='store_true') |
|
for p in (P[C.INSERT], P[C.POP], P[C.RM]): |
|
p.add_argument('--sort', dest=Args.OPT_SORT, help="Run `sort` after main task", action='store_true') |
|
for p in (P[C.INSERT], P[C.POP], P[C.RM], P[C.SORT]): |
|
p.add_argument('--reverse', dest=Args.OPT_REVERSE, help="Reverse orientation of the sort", action='store_true') |
|
p.add_argument('--dedup', dest=Args.OPT_DEDUP, help="Run `dedup` after main task", action='store_true') |
|
|
|
args_ns = main_parser.parse_args(raw_arg_list) |
|
args = Args(args_ns) |
|
|
|
if args.opt_reverse and not (args.opt_sort or ArgCmdName.SORT == args.cmd): |
|
main_parser.error("--reverse requires --sort or sort command") |
|
|
|
return args |
|
|
|
@staticmethod |
|
def _maybe_get_schema(schema_str: str) -> Gio.SettingsSchema | None: |
|
default_source = Gio.SettingsSchemaSource.get_default() |
|
schema = Gio.SettingsSchemaSource.lookup(default_source, schema_str, True) |
|
return schema |
|
|
|
@staticmethod |
|
def _quote_strings(items: list): |
|
checkers = { |
|
'is_bool': lambda s: s.lower() in {'true', 'false'}, |
|
'is_array': lambda s: s[0] == '[' and s[-1] == ']', |
|
'is_tuple': lambda s: s[0] == '(' and s[-1] == ')', |
|
'is_string': lambda s: (s[0], s[-1]) in {('"',)*2, ("'",)*2}, |
|
'is_number': lambda s: s.replace('.', '', 1).isdigit() |
|
} |
|
quoted = [] |
|
for item in items: |
|
if item and not any(checker(item) for checker in checkers.values()): |
|
quote = '"' if "'" in item else "'" |
|
item = quote + item.replace(quote, "\\" + quote) + quote |
|
quoted.append(item) |
|
return quoted |
|
|
|
@classmethod |
|
def _main(cls, args: Args) -> int | str: |
|
if ArgCmdName.UPDATER == args.cmd: |
|
return Updater.handle_args(args) |
|
|
|
schema = cls._maybe_get_schema(args.schema) |
|
if not schema: |
|
return f"Error: Schema not found: schema=`{args.schema}`\n{args}" |
|
if not schema.has_key(args.key): |
|
return f"Error: Key not found: key=`{args.key}`\n{args}" |
|
|
|
array_type = schema.get_key(args.key).get_value_type() |
|
if not array_type.is_array(): |
|
return f"Error: Key not an array: type=`{array_type.dup_string()}`\n{args}" |
|
|
|
gsettings = Gio.Settings.new(args.schema) |
|
old_array = gsettings[args.key] |
|
|
|
should_print_diff = args.cmd not in (ArgCmdName.LS, ) |
|
if should_print_diff: |
|
print("Old value:", old_array, file=sys.stderr) |
|
|
|
input_array_str = "[%s]" % ",".join(cls._quote_strings(args.items)) if args.items else "[]" |
|
parsed_array = GLib.Variant.parse(array_type, input_array_str) |
|
|
|
if ArgCmdName.LS == args.cmd: |
|
for item in cls._quote_strings(old_array): |
|
print(item) |
|
|
|
if ArgCmdName.CLEAR == args.cmd or args.opt_clear: |
|
old_array = gsettings[args.key] = [] |
|
|
|
if ArgCmdName.INSERT == args.cmd: |
|
i = args.index |
|
if i < 0: |
|
i += len(old_array) + 1 |
|
gsettings[args.key] = old_array[:i] + list(parsed_array) + old_array[i:] |
|
|
|
if ArgCmdName.POP == args.cmd and len(old_array): |
|
i = args.index |
|
mn, mx = -len(old_array), len(old_array) |
|
if mn <= i < mx: |
|
if i < 0: |
|
i += len(old_array) |
|
print(old_array[i]) |
|
gsettings[args.key] = old_array[:i] + old_array[i + 1:] |
|
else: |
|
return f"Error: Index out of bounds, must be: {mn} <= INDEX < {mx}" |
|
|
|
if ArgCmdName.RM == args.cmd: |
|
gsettings[args.key] = [x for x in old_array if x not in parsed_array] |
|
|
|
if ArgCmdName.DEDUP == args.cmd or args.opt_dedup: |
|
# use dict.fromkeys() since set() doesn't preserve order |
|
gsettings[args.key] = list(dict.fromkeys(old_array).keys()) |
|
|
|
if ArgCmdName.SORT == args.cmd or args.opt_sort: |
|
new_arr = sorted(old_array) |
|
if args.opt_reverse: |
|
new_arr = reversed(new_arr) |
|
gsettings[args.key] = new_arr |
|
|
|
# Writes made to a GSettings are handled asynchronously. |
|
# Without sync(), new changes won't take effect at all! |
|
gsettings.sync() |
|
|
|
if should_print_diff: |
|
print("New value:", gsettings[args.key], file=sys.stderr) |
|
|
|
return 0 |
|
|
|
@classmethod |
|
def run(cls, args: list[str] | None = None) -> int | str: |
|
return cls._main(cls._parse_args(args)) |
|
|
|
|
|
if __name__ == '__main__': |
|
app = App() |
|
sys.exit(app.run()) |