Last active
January 15, 2021 19:16
-
-
Save frodo821/6f08bbc36944861a08249316d597cc85 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #-*-coding: utf-8;-*- | |
| from sys import argv, exit | |
| from os import chdir | |
| from os.path import ( | |
| expanduser as usr, | |
| exists, abspath) | |
| from abc import ABC, abstractmethod as abstract | |
| from atexit import register | |
| from io import StringIO | |
| from unicodedata import east_asian_width as cwidth | |
| from re import sub | |
| from json import dump, dumps, load | |
| from shlex import split | |
| import sqlite3 as sql | |
| settings = { | |
| "pragma": { | |
| "foreign_keys": 1, | |
| }, | |
| "startup_snippets": "", | |
| "headers": False, | |
| "output_mode": "tabs", | |
| "ps1": ">>> ", | |
| "ps2": "... ", | |
| "cmd_prefix": ".", | |
| "root": "~" | |
| } | |
| s_file = usr("~/.sqlite_rc") | |
| def load_pragmas(cursor, pragmas): | |
| for pragma, value in pragmas.items(): | |
| try: | |
| cursor.execute(f"PRAGMA {pragma}={value};") | |
| except Exception as e: | |
| print(f"Error:{pragma}:{e}") | |
| def init(): | |
| global db, settings, cur, formatter | |
| if not argv[1:]: | |
| db = sql.connect(":memory:") | |
| print("Warning: THIS IS IN-MEMORY MODE. THE DATABASE WILL BE ELIMINATED FOREVER.") | |
| else: | |
| try: | |
| db = sql.connect(argv[1]) | |
| except Exception as ex: | |
| print(f"Failed to open file '{argv[1]}':{type(ex).__name__}:{ex}") | |
| exit() | |
| if exists(s_file): | |
| with open(s_file) as s: | |
| try: | |
| stgs = load(s) | |
| settings = stgs | |
| del stgs | |
| except: | |
| pass | |
| cur = db.cursor() | |
| load_pragmas(cur, settings.get("pragma", {})) | |
| try: | |
| snp = settings.get("startup_snippets", "") | |
| if snp: | |
| cur.executescript(snp) | |
| except Exception as e: | |
| print(f"Errored during executing startup snippet: {e}") | |
| try: | |
| chdir(usr(settings.get("root", '~'))) | |
| except Exception as e: | |
| print(f"Error:{type(e).__name__}:{e}") | |
| try: | |
| Formatter.change(settings.get("output_mode", 'tabs')) | |
| except Exception as e: | |
| print(*e.args) | |
| print('Failed to set default formatter. Using fallback strategy.') | |
| fallback = next(iter(Formatter.formatters)) | |
| settings["output_mode"] = fallback | |
| Formatter.change(fallback) | |
| def finalize(): | |
| with open(s_file, 'w') as s: | |
| dump(settings, s) | |
| def length(s: str): | |
| l = 0 | |
| for c in s: | |
| if cwidth(c) in ('F', 'W', 'A'): | |
| l += 2 | |
| else: | |
| l += 1 | |
| return l | |
| def limstr(s): | |
| ret = "" | |
| if s is not None: | |
| s = str(s) | |
| else: | |
| s = settings.get('null_printing', str(s)) | |
| s = s.replace('\n', ' ').replace('\r', ' ') | |
| mlen = settings.get('max_length', 0) | |
| if not mlen: | |
| return s | |
| for c in s: | |
| a = ret + c | |
| if length(a) > mlen: | |
| return ret | |
| ret = a | |
| return ret | |
| def reading(): | |
| t = "" | |
| while not t: | |
| t = input(settings.get("ps1", '>>> ')) | |
| if t.startswith(settings.get("cmd_prefix", '.')): | |
| return split(t[1:]) | |
| if t.startswith('--'): | |
| return '' | |
| while not sql.complete_statement(t): | |
| t += ' '+input(settings.get("ps2", '... ')) | |
| return t | |
| class Command(ABC): | |
| commands = {} | |
| def __init_subclass__(cls, cmd_name = None, **kwargs): | |
| name = cmd_name or sub(r"(?!^)([A-Z])", r"_\1", cls.__name__).lower() | |
| if name in Command.commands: | |
| raise NameError(f"Command '{name}' is already exists.") | |
| Command.commands[name] = cls() | |
| @staticmethod | |
| def default(cmd_name): | |
| def _cmd(*a): | |
| print(f"Error: command '{cmd_name}' could not be found.") | |
| return _cmd | |
| @abstract | |
| def exec(self, *args): | |
| pass | |
| @abstract | |
| def help(self, is_long): | |
| pass | |
| class Formatter(ABC): | |
| formatters = {} | |
| __current = None | |
| def __init_subclass__(cls, formatter_name = None, **kwargs): | |
| name = formatter_name or sub(r"(?!^)([A-Z])", r"_\1", cls.__name__).lower() | |
| if name in Formatter.formatters: | |
| raise NameError(f"Formatter '{name}' is already exists.") | |
| if Formatter.__current is None: | |
| Formatter.__current = name | |
| Formatter.formatters[name] = cls() | |
| @abstract | |
| def format(self, rows): | |
| pass | |
| @classmethod | |
| def change(cls, name): | |
| if name not in Formatter.formatters.keys(): | |
| raise NameError(f"Unknown output mode: {name}") | |
| cls.__current = name | |
| @classmethod | |
| def format_current(cls, rows): | |
| if cls.__current is None: | |
| raise ValueError("Formatter not initialized.") | |
| return cls.formatters[cls.__current].format(rows) | |
| class Help(Command): | |
| def exec(self, *args): | |
| if not args: | |
| print(self.help(True)) | |
| return | |
| if args[0] not in Command.commands: | |
| print('help: command could not be found.') | |
| return | |
| msg = Command.commands[args[0]] | |
| print(args[0], msg.help(True), sep=': ') | |
| def help(self, is_long): | |
| if is_long: | |
| mcl = max(map(len, Command.commands)) + 1 | |
| ret = [f"Usage: {settings.get('cmd_prefix', '.')}help [command_name]"] | |
| for k in Command.commands: | |
| msg = Command.commands[k] | |
| ret.append(f"{settings.get('cmd_prefix', '.')}{k.ljust(mcl)}: {msg.help(False)}") | |
| return "\n".join(ret) | |
| return f"show this help message or detailed help of specified command." | |
| class Open(Command): | |
| def exec(self, *args): | |
| global db, cur | |
| if not args: | |
| print(self.help(True)) | |
| return | |
| if args[1:]: | |
| print("too many arguments.") | |
| return | |
| try: | |
| dat = sql.connect(args[0]) | |
| except BaseException as ex: | |
| print(f"Failed to open file '{args[0]}':{type(ex).__name__}:{ex}") | |
| return | |
| db.commit() | |
| cur.close() | |
| db.close() | |
| db = dat | |
| cur = dat.cursor() | |
| load_pragmas(cur, settings.get("pragma", {})) | |
| def help(self, is_long): | |
| if is_long: | |
| return ( | |
| "usage: .open database\n" | |
| "close current connection and open specified database.") | |
| return "close current connection and open specified database." | |
| class SetVariable(Command, cmd_name = "setv"): | |
| def exec(self, *args): | |
| if not args: | |
| print("Variable name is required.") | |
| return | |
| if not args[1:]: | |
| print("Variable value is required.") | |
| return | |
| names = args[0].split(".") | |
| if any(map(lambda x: not x.isidentifier(), names)): | |
| print("Invalid variable name.") | |
| return | |
| try: | |
| val = eval(args[1], {}, {}) | |
| except: | |
| val = args[1] | |
| ns = settings | |
| for n in names[:-1]: | |
| if n not in ns.keys(): | |
| ns[n] = {} | |
| ns = ns[n] | |
| ns[names[-1]] = val | |
| print(f"{args[0]} = {val}") | |
| def help(self, is_long): | |
| if is_long: | |
| return ( | |
| "usage: .setv variable value\n" | |
| "sets value to a variable.\n" | |
| "This command will force to override your environmental settings.") | |
| return "sets value to a variable." | |
| class Vars(Command): | |
| def exec(self, *args): | |
| if not args: | |
| print(dumps(settings, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ': '))) | |
| return | |
| for a in args: | |
| names = a.split('.') | |
| ns = settings | |
| nf = False | |
| for n in names[:-1]: | |
| if n not in ns.keys(): | |
| nf = True | |
| break | |
| ns = ns[n] | |
| if not nf: | |
| val = ns.get(names[-1], None) | |
| else: | |
| val = None | |
| if isinstance(val, dict): | |
| val = dumps(val, ensure_ascii=False, indent=4, separators=(",", ": ")) | |
| print(f"{a} = {val}") | |
| def help(self, is_long): | |
| return "dump all environmental settings." | |
| class Exit(Command): | |
| def exec(self, *args): | |
| cur.close() | |
| db.commit() | |
| db.close() | |
| exit() | |
| def help(self, is_long): | |
| return "exit this repl session." | |
| class Commit(Command): | |
| def exec(self, *args): | |
| db.commit() | |
| def help(self, is_long): | |
| return "commit all changes to a database." | |
| class Headers(Command): | |
| def exec(self, *args): | |
| if not args: | |
| print(f"Headers output is currently {'enabled' if settings.get('headers', False) else 'disabled'}.") | |
| return | |
| if args[1:]: | |
| print("Too many arguments.") | |
| return | |
| if args[0] in ('enable', 'enabled', 'on'): | |
| settings["headers"] = True | |
| return | |
| if args[0] in ('disable', 'disabled', 'off'): | |
| settings["headers"] = False | |
| return | |
| print("Granted options are only these: enable, enabled, on, disable, disabled, off") | |
| def help(self, is_long): | |
| return "sets flag to determine to show headers or not." | |
| def get_tables(): | |
| n = cur.execute( | |
| "SELECT" | |
| " name" | |
| " FROM sqlite_master" | |
| " WHERE type='table' OR type='view';").fetchall() | |
| return list(map(lambda x: x[0], n)) | |
| class Tables(Command): | |
| def exec(self, *args): | |
| for t in get_tables(): | |
| print(t) | |
| def help(self, is_long): | |
| return "show existing table names." | |
| class Scheme(Command): | |
| def exec(self, *tbls): | |
| if not tbls: | |
| tbls = get_tables() | |
| for t in tbls: | |
| n = cur.execute(( | |
| "SELECT sql" | |
| " FROM sqlite_master" | |
| " WHERE" | |
| " name=? and" | |
| " (type='table' or type='view')"), (t,)).fetchone() | |
| if not n: | |
| print(f"No such table: '{t}'") | |
| continue | |
| print(n[0]) | |
| def help(self, is_long): | |
| return "show table schemes." | |
| class Mode(Command): | |
| def exec(self, *args): | |
| global formatter | |
| if not args: | |
| print('You can use those output modes:') | |
| print(', '.join(Formatter.formatters.keys())) | |
| return | |
| if args[1:]: | |
| print('Too many arguments.') | |
| return | |
| settings["output_mode"] = args[0] | |
| try: | |
| Formatter.change(args[0]) | |
| except NameError as err: | |
| print(*err.args) | |
| return | |
| def help(self, is_long): | |
| return "change output mode." | |
| class Limit(Command): | |
| def exec(self, *args): | |
| if not args or args[1:]: | |
| print(self.help(True)) | |
| return | |
| arg = args[0].lower() | |
| if arg == 'show': | |
| lim = settings.get('max_length', 0) | |
| lim = f'{lim} chars' if lim else 'disabled' | |
| print(f'Current output limit is {lim}.') | |
| return | |
| if arg == 'disable': | |
| settings['max_length'] = 0 | |
| return | |
| try: | |
| settings['max_length'] = int(arg) | |
| except: | |
| print('invalid argument.') | |
| def help(self, is_long): | |
| if is_long: | |
| return ( | |
| 'Usage: limit [show | disable | (num)]\n' | |
| 'If show is specified, output currently length limit.\n' | |
| 'If disable is specified, disable length limit.\n' | |
| 'num means a number matching following format [0-9]+ (ex. 1, 01, 351)\n' | |
| 'If a number is specified, set length limit to the number.') | |
| return "change or show max output length." | |
| class Null(Command): | |
| def exec(self, *args): | |
| if not args: | |
| print(f"Current null column string is '{settings.get('null_printing', 'None')}'") | |
| return | |
| if args[1:]: | |
| print('Too many arguments.') | |
| return | |
| settings['null_printing'] = args[0] | |
| def help(self, is_long): | |
| return "change or show whether null to be printed." | |
| class TabsFormatter(Formatter, formatter_name="tabs"): | |
| def format(self, rows): | |
| if not rows: | |
| return "" | |
| return '\n'.join(map(lambda x: '\t'.join(x), rows)) | |
| class CSVFormatter(Formatter, formatter_name="csv"): | |
| def format(self, rows): | |
| if not rows: return "" | |
| return '\n'.join(map(lambda y: ','.join(map(lambda x: f'"{x}"', y)), rows)) | |
| class ColumnsFormatter(Formatter, formatter_name="column"): | |
| def format(self, rows): | |
| if not rows: return "" | |
| columns = list( | |
| map( | |
| max, | |
| map( | |
| lambda x: map( | |
| lambda y: max( | |
| length(y)+2, 5), x), | |
| zip(*rows)))) | |
| ret = "" | |
| if settings.get("headers", False): | |
| header = rows.pop(0) | |
| sep = '+' | |
| ret += '|' | |
| for c, r in zip(columns, header): | |
| ret += f"{r}{' '*(c - length(r))}|" | |
| sep += '-' * c + '+' | |
| sep += '\n' | |
| ret += "\n" | |
| ret += sep | |
| else: | |
| sep = '' | |
| for row in rows: | |
| ret += '|' | |
| for c, r in zip(columns, row): | |
| ret += f"{r}{' '*(c - length(r))}|" | |
| ret += "\n" | |
| return sep+ret+sep[:-1] | |
| def main(): | |
| will_exit = False | |
| while True: | |
| try: | |
| cmd = reading() | |
| if not cmd: | |
| continue | |
| will_exit = False | |
| if isinstance(cmd, list): | |
| Command.commands.get(cmd[0], Command.default(cmd[0])).exec(*cmd[1:]) | |
| continue | |
| try: | |
| rows = cur.execute(cmd).fetchall() | |
| desc = cur.description | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| continue | |
| if rows: | |
| if settings.get("headers", False): | |
| rows.insert(0, tuple(col[0] for col in desc)) | |
| rows = list(map(lambda x: list(map(limstr, x)), rows)) | |
| print(Formatter.format_current(rows)) | |
| except KeyboardInterrupt: | |
| if will_exit: | |
| raise SystemExit | |
| will_exit = True | |
| print("\nEnter ^C again to exit.") | |
| except EOFError: | |
| raise SystemExit | |
| if __name__ == '__main__': | |
| register(finalize) | |
| init() | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment