Skip to content

Instantly share code, notes, and snippets.

@frodo821
Last active January 15, 2021 19:16
Show Gist options
  • Select an option

  • Save frodo821/6f08bbc36944861a08249316d597cc85 to your computer and use it in GitHub Desktop.

Select an option

Save frodo821/6f08bbc36944861a08249316d597cc85 to your computer and use it in GitHub Desktop.
#-*-coding: utf-8;-*-
from sys import argv, exit
from os import chdir
from os.path import (
expanduser as usr,
exists, abspath)
from abc import ABC, abstractmethod as abstract
from atexit import register
from io import StringIO
from unicodedata import east_asian_width as cwidth
from re import sub
from json import dump, dumps, load
from shlex import split
import sqlite3 as sql
settings = {
"pragma": {
"foreign_keys": 1,
},
"startup_snippets": "",
"headers": False,
"output_mode": "tabs",
"ps1": ">>> ",
"ps2": "... ",
"cmd_prefix": ".",
"root": "~"
}
s_file = usr("~/.sqlite_rc")
def load_pragmas(cursor, pragmas):
for pragma, value in pragmas.items():
try:
cursor.execute(f"PRAGMA {pragma}={value};")
except Exception as e:
print(f"Error:{pragma}:{e}")
def init():
global db, settings, cur, formatter
if not argv[1:]:
db = sql.connect(":memory:")
print("Warning: THIS IS IN-MEMORY MODE. THE DATABASE WILL BE ELIMINATED FOREVER.")
else:
try:
db = sql.connect(argv[1])
except Exception as ex:
print(f"Failed to open file '{argv[1]}':{type(ex).__name__}:{ex}")
exit()
if exists(s_file):
with open(s_file) as s:
try:
stgs = load(s)
settings = stgs
del stgs
except:
pass
cur = db.cursor()
load_pragmas(cur, settings.get("pragma", {}))
try:
snp = settings.get("startup_snippets", "")
if snp:
cur.executescript(snp)
except Exception as e:
print(f"Errored during executing startup snippet: {e}")
try:
chdir(usr(settings.get("root", '~')))
except Exception as e:
print(f"Error:{type(e).__name__}:{e}")
try:
Formatter.change(settings.get("output_mode", 'tabs'))
except Exception as e:
print(*e.args)
print('Failed to set default formatter. Using fallback strategy.')
fallback = next(iter(Formatter.formatters))
settings["output_mode"] = fallback
Formatter.change(fallback)
def finalize():
with open(s_file, 'w') as s:
dump(settings, s)
def length(s: str):
l = 0
for c in s:
if cwidth(c) in ('F', 'W', 'A'):
l += 2
else:
l += 1
return l
def limstr(s):
ret = ""
if s is not None:
s = str(s)
else:
s = settings.get('null_printing', str(s))
s = s.replace('\n', ' ').replace('\r', ' ')
mlen = settings.get('max_length', 0)
if not mlen:
return s
for c in s:
a = ret + c
if length(a) > mlen:
return ret
ret = a
return ret
def reading():
t = ""
while not t:
t = input(settings.get("ps1", '>>> '))
if t.startswith(settings.get("cmd_prefix", '.')):
return split(t[1:])
if t.startswith('--'):
return ''
while not sql.complete_statement(t):
t += ' '+input(settings.get("ps2", '... '))
return t
class Command(ABC):
commands = {}
def __init_subclass__(cls, cmd_name = None, **kwargs):
name = cmd_name or sub(r"(?!^)([A-Z])", r"_\1", cls.__name__).lower()
if name in Command.commands:
raise NameError(f"Command '{name}' is already exists.")
Command.commands[name] = cls()
@staticmethod
def default(cmd_name):
def _cmd(*a):
print(f"Error: command '{cmd_name}' could not be found.")
return _cmd
@abstract
def exec(self, *args):
pass
@abstract
def help(self, is_long):
pass
class Formatter(ABC):
formatters = {}
__current = None
def __init_subclass__(cls, formatter_name = None, **kwargs):
name = formatter_name or sub(r"(?!^)([A-Z])", r"_\1", cls.__name__).lower()
if name in Formatter.formatters:
raise NameError(f"Formatter '{name}' is already exists.")
if Formatter.__current is None:
Formatter.__current = name
Formatter.formatters[name] = cls()
@abstract
def format(self, rows):
pass
@classmethod
def change(cls, name):
if name not in Formatter.formatters.keys():
raise NameError(f"Unknown output mode: {name}")
cls.__current = name
@classmethod
def format_current(cls, rows):
if cls.__current is None:
raise ValueError("Formatter not initialized.")
return cls.formatters[cls.__current].format(rows)
class Help(Command):
def exec(self, *args):
if not args:
print(self.help(True))
return
if args[0] not in Command.commands:
print('help: command could not be found.')
return
msg = Command.commands[args[0]]
print(args[0], msg.help(True), sep=': ')
def help(self, is_long):
if is_long:
mcl = max(map(len, Command.commands)) + 1
ret = [f"Usage: {settings.get('cmd_prefix', '.')}help [command_name]"]
for k in Command.commands:
msg = Command.commands[k]
ret.append(f"{settings.get('cmd_prefix', '.')}{k.ljust(mcl)}: {msg.help(False)}")
return "\n".join(ret)
return f"show this help message or detailed help of specified command."
class Open(Command):
def exec(self, *args):
global db, cur
if not args:
print(self.help(True))
return
if args[1:]:
print("too many arguments.")
return
try:
dat = sql.connect(args[0])
except BaseException as ex:
print(f"Failed to open file '{args[0]}':{type(ex).__name__}:{ex}")
return
db.commit()
cur.close()
db.close()
db = dat
cur = dat.cursor()
load_pragmas(cur, settings.get("pragma", {}))
def help(self, is_long):
if is_long:
return (
"usage: .open database\n"
"close current connection and open specified database.")
return "close current connection and open specified database."
class SetVariable(Command, cmd_name = "setv"):
def exec(self, *args):
if not args:
print("Variable name is required.")
return
if not args[1:]:
print("Variable value is required.")
return
names = args[0].split(".")
if any(map(lambda x: not x.isidentifier(), names)):
print("Invalid variable name.")
return
try:
val = eval(args[1], {}, {})
except:
val = args[1]
ns = settings
for n in names[:-1]:
if n not in ns.keys():
ns[n] = {}
ns = ns[n]
ns[names[-1]] = val
print(f"{args[0]} = {val}")
def help(self, is_long):
if is_long:
return (
"usage: .setv variable value\n"
"sets value to a variable.\n"
"This command will force to override your environmental settings.")
return "sets value to a variable."
class Vars(Command):
def exec(self, *args):
if not args:
print(dumps(settings, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ': ')))
return
for a in args:
names = a.split('.')
ns = settings
nf = False
for n in names[:-1]:
if n not in ns.keys():
nf = True
break
ns = ns[n]
if not nf:
val = ns.get(names[-1], None)
else:
val = None
if isinstance(val, dict):
val = dumps(val, ensure_ascii=False, indent=4, separators=(",", ": "))
print(f"{a} = {val}")
def help(self, is_long):
return "dump all environmental settings."
class Exit(Command):
def exec(self, *args):
cur.close()
db.commit()
db.close()
exit()
def help(self, is_long):
return "exit this repl session."
class Commit(Command):
def exec(self, *args):
db.commit()
def help(self, is_long):
return "commit all changes to a database."
class Headers(Command):
def exec(self, *args):
if not args:
print(f"Headers output is currently {'enabled' if settings.get('headers', False) else 'disabled'}.")
return
if args[1:]:
print("Too many arguments.")
return
if args[0] in ('enable', 'enabled', 'on'):
settings["headers"] = True
return
if args[0] in ('disable', 'disabled', 'off'):
settings["headers"] = False
return
print("Granted options are only these: enable, enabled, on, disable, disabled, off")
def help(self, is_long):
return "sets flag to determine to show headers or not."
def get_tables():
n = cur.execute(
"SELECT"
" name"
" FROM sqlite_master"
" WHERE type='table' OR type='view';").fetchall()
return list(map(lambda x: x[0], n))
class Tables(Command):
def exec(self, *args):
for t in get_tables():
print(t)
def help(self, is_long):
return "show existing table names."
class Scheme(Command):
def exec(self, *tbls):
if not tbls:
tbls = get_tables()
for t in tbls:
n = cur.execute((
"SELECT sql"
" FROM sqlite_master"
" WHERE"
" name=? and"
" (type='table' or type='view')"), (t,)).fetchone()
if not n:
print(f"No such table: '{t}'")
continue
print(n[0])
def help(self, is_long):
return "show table schemes."
class Mode(Command):
def exec(self, *args):
global formatter
if not args:
print('You can use those output modes:')
print(', '.join(Formatter.formatters.keys()))
return
if args[1:]:
print('Too many arguments.')
return
settings["output_mode"] = args[0]
try:
Formatter.change(args[0])
except NameError as err:
print(*err.args)
return
def help(self, is_long):
return "change output mode."
class Limit(Command):
def exec(self, *args):
if not args or args[1:]:
print(self.help(True))
return
arg = args[0].lower()
if arg == 'show':
lim = settings.get('max_length', 0)
lim = f'{lim} chars' if lim else 'disabled'
print(f'Current output limit is {lim}.')
return
if arg == 'disable':
settings['max_length'] = 0
return
try:
settings['max_length'] = int(arg)
except:
print('invalid argument.')
def help(self, is_long):
if is_long:
return (
'Usage: limit [show | disable | (num)]\n'
'If show is specified, output currently length limit.\n'
'If disable is specified, disable length limit.\n'
'num means a number matching following format [0-9]+ (ex. 1, 01, 351)\n'
'If a number is specified, set length limit to the number.')
return "change or show max output length."
class Null(Command):
def exec(self, *args):
if not args:
print(f"Current null column string is '{settings.get('null_printing', 'None')}'")
return
if args[1:]:
print('Too many arguments.')
return
settings['null_printing'] = args[0]
def help(self, is_long):
return "change or show whether null to be printed."
class TabsFormatter(Formatter, formatter_name="tabs"):
def format(self, rows):
if not rows:
return ""
return '\n'.join(map(lambda x: '\t'.join(x), rows))
class CSVFormatter(Formatter, formatter_name="csv"):
def format(self, rows):
if not rows: return ""
return '\n'.join(map(lambda y: ','.join(map(lambda x: f'"{x}"', y)), rows))
class ColumnsFormatter(Formatter, formatter_name="column"):
def format(self, rows):
if not rows: return ""
columns = list(
map(
max,
map(
lambda x: map(
lambda y: max(
length(y)+2, 5), x),
zip(*rows))))
ret = ""
if settings.get("headers", False):
header = rows.pop(0)
sep = '+'
ret += '|'
for c, r in zip(columns, header):
ret += f"{r}{' '*(c - length(r))}|"
sep += '-' * c + '+'
sep += '\n'
ret += "\n"
ret += sep
else:
sep = ''
for row in rows:
ret += '|'
for c, r in zip(columns, row):
ret += f"{r}{' '*(c - length(r))}|"
ret += "\n"
return sep+ret+sep[:-1]
def main():
will_exit = False
while True:
try:
cmd = reading()
if not cmd:
continue
will_exit = False
if isinstance(cmd, list):
Command.commands.get(cmd[0], Command.default(cmd[0])).exec(*cmd[1:])
continue
try:
rows = cur.execute(cmd).fetchall()
desc = cur.description
except Exception as e:
print(f"Error: {e}")
continue
if rows:
if settings.get("headers", False):
rows.insert(0, tuple(col[0] for col in desc))
rows = list(map(lambda x: list(map(limstr, x)), rows))
print(Formatter.format_current(rows))
except KeyboardInterrupt:
if will_exit:
raise SystemExit
will_exit = True
print("\nEnter ^C again to exit.")
except EOFError:
raise SystemExit
if __name__ == '__main__':
register(finalize)
init()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment