Last active
June 22, 2021 22:34
-
-
Save mbarkhau/2b676d3b6d7f02432c620be295e719cf to your computer and use it in GitHub Desktop.
sublime_devbuild.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""A sublime-text build script, to do the right thing?. | |
This script takes the currently open file as a | |
reference point and uses it to determine which commands | |
to run. This means that you don't have to select a build | |
commands when switching between projects, the fact that | |
you have a different file open is enough to indicate | |
which build you want to run. | |
---- devbuild.submime-build | |
{ | |
"working_dir": "${project_path}", | |
"shell_cmd" : "/home/mbarkhau/workspace/devbuild.py ${file}", | |
"file_regex" : "^(?:[a-z]+:)?[ ]*File \"(...*?)\", line ([0-9]+):?([0-9]+)?:?(.+)?", | |
"word_wrap" : false, | |
} | |
""" | |
import os | |
import re | |
import sys | |
import time | |
import shlex | |
import queue | |
import typing as typ | |
import pathlib as pl | |
import threading | |
import subprocess as sp | |
from simple_file_cache import SimpleFileCache | |
TIME = False | |
VALID_SOURCE_DIRS = "src|test" | |
def _re_compile(pattern: str) -> typ.Pattern[str]: | |
pattern = pattern.replace("VALID_SOURCE_DIRS", VALID_SOURCE_DIRS) | |
return re.compile(pattern, flags=re.VERBOSE) | |
# examples: | |
# /home/user/project/test/test_cli.py:364: AssertionError | |
# src/pycalver2/version.py:227: error: Unexpected keyword ... | |
GENERIC_PATTERN = r""" | |
^(?P<path>(?:[/\w ]+/)?(?:VALID_SOURCE_DIRS)/[^:\s]+) | |
: | |
(?P<lineno>[0-9]*) | |
: | |
\s | |
(?P<errmsg>.*)$ | |
""" | |
GENERIC_RE = _re_compile(GENERIC_PATTERN) | |
# example: | |
# test/test_cli.py:14:0: E0611: No name 'config' in module 'pycalver2' (no-name-in-module) | |
PYLINT_PATTERN = r""" | |
^(?P<path>(?:VALID_SOURCE_DIRS)/[^:\s]+) | |
: | |
(?P<lineno>[0-9]+) | |
: | |
(?P<colno>[0-9]+) | |
: | |
\s | |
(?P<errmsg>.*)$ | |
""" | |
PYLINT_RE = _re_compile(PYLINT_PATTERN) | |
TOOL_PATTERNS = [("pylint", PYLINT_RE), ("", GENERIC_RE)] | |
def _dedent(cmd): | |
return " ".join(line.strip() for line in cmd.splitlines() if line.strip()) | |
def env_cmd(cmd): | |
return ["bash", "-c", f"source activate;{_dedent(cmd)}"] | |
def bash(cmd): | |
return ["bash", "-c", _dedent(cmd)] | |
BUILD_COMMANDS = { | |
"/home/mbarkhau/foss/litprog/lit_v3/99_scratchpad.md": [ | |
env_cmd("litprog build -vi lit_v3/9*"), | |
], | |
"/home/mbarkhau/foss/litprog/": [ | |
"make fmt", | |
"make lint_flake8", | |
"make mypy", | |
"make lint_pylint", | |
env_cmd("litprog build -vie -n 4 lit_v3/*.md"), | |
# env_cmd("litprog build -vie -n 4 lit_v3/*.md --html doc/"), | |
# "make lint_pylint", | |
# env_cmd("python -m litprog.vcs_timeline"), | |
# env_cmd("litprog build -ve lit_v3/11_overview.md"), | |
# env_cmd("litprog build -vie lit_v3/12_sudoku.md --html doc/"), | |
# env_cmd("litprog build -vie lit_v3/1*.md --html doc/ --pdf doc/"), | |
# env_cmd("sjfmt examples/"), | |
# "make devtest", | |
], | |
"/home/mbarkhau/foss/bumpver/": [ | |
"make fmt", | |
"make lint_flake8", | |
"make mypy", | |
env_cmd("make devtest"), | |
"make lint_pylint_errors", | |
"make lint_fmt", | |
"make lint_pylint", | |
# env_cmd('python -m bumpver update --dry --no-fetch --release beta'), | |
], | |
"/home/mbarkhau/workspace/simple_file_cache.py": [ | |
"python /home/mbarkhau/workspace/simple_file_cache.py", | |
], | |
"/home/mbarkhau/foss/pretty-traceback": [ | |
"make devtest", | |
"make fmt_sjfmt", | |
"make lint_flake8", | |
"make lint_pylint_errors", | |
"make mypy", | |
"make fmt_isort", | |
"make lint_fmt", | |
], | |
"/home/mbarkhau/foss/pylint-ignore": [ | |
"make fmt_sjfmt", | |
"make devtest", | |
"make mypy", | |
"make fmt_isort", | |
"make lint_fmt", | |
"make lint_flake8", | |
env_cmd('python -m pylint_ignore --rcfile=setup.cfg src/pylint_ignore'), | |
], | |
"/home/mbarkhau/foss/reftangle-metric": "python3 reftangle.py", | |
} | |
BUILD_COMMANDS = { | |
_path.rstrip("/"): _cmds | |
for _path, _cmds in BUILD_COMMANDS.items() | |
} | |
DEFAULT_BUILD_COMMANDS = [ | |
"make fmt_sjfmt", | |
"make fmt_isort", | |
"make lint_fmt", | |
"make lint_flake8", | |
"make mypy", | |
"make devtest", | |
] | |
DEFAULT_ERROR_FMT = 'File "{path}", line {lineno}:{colno}:{tool} {errmsg}\n' | |
def line_formatter(cwd, old_cwd): | |
def format_line(line): | |
for tool, pattern in TOOL_PATTERNS: | |
match = pattern.match(line) | |
if match: | |
kwargs = match.groupdict() | |
path = kwargs['path'] | |
if not path.startswith("/"): | |
path = str(cwd / path) | |
if path.startswith(str(old_cwd)): | |
path = str(pl.Path(path).relative_to(old_cwd)) | |
if tool: | |
kwargs['tool'] = " - " + tool | |
else: | |
kwargs['tool'] = "" | |
kwargs['path'] = path | |
kwargs['tool'] = tool | |
kwargs['colno'] = kwargs.get('colno', "") | |
if kwargs['errmsg']: | |
kwargs['errmsg'] = kwargs['errmsg'] | |
return DEFAULT_ERROR_FMT.format(**kwargs) | |
return line | |
return format_line | |
ANSI_ESCAPE_RE = re.compile(rb"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]") | |
def has_makefile(dirpath: pl.Path) -> bool: | |
candidates = [ | |
dirpath / "Makefile", | |
dirpath / "makefile", | |
] | |
return any(f.exists() for f in candidates) | |
def _get_build_commands(path: pl.Path) -> typ.Tuple[pl.Path, typ.List[str]]: | |
home_dir = pl.Path("~").expanduser() | |
cmd_path = path | |
while str(cmd_path).startswith(str(home_dir) + "/"): | |
build_commands = BUILD_COMMANDS.get(str(cmd_path)) | |
if build_commands is None: | |
cmd_path = cmd_path.parent | |
else: | |
if cmd_path.is_file(): | |
base_dir = cmd_path.parent | |
else: | |
base_dir = cmd_path | |
if isinstance(build_commands, str): | |
build_commands = [build_commands] | |
return (base_dir, build_commands) | |
cmd_path = path | |
while str(cmd_path).startswith(str(home_dir) + "/"): | |
if has_makefile(cmd_path): | |
return (base_dir, DEFAULT_BUILD_COMMANDS) | |
else: | |
cmd_path = cmd_path.parent | |
sys.stderr.write(f"No default command(s) for '{path}'\n") | |
return 1 | |
def main(args=sys.argv[1:]) -> int: | |
path = pl.Path(args[0]) | |
if not path.exists(): | |
sys.stderr.write(f"No such file '{path}'\n") | |
return 1 | |
with SimpleFileCache() as cache: | |
if str(path) == __file__: | |
# use path from previous run (if available) | |
path = pl.Path(cache.obj.get('path', str(path))) | |
cache.obj['path'] = str(path) | |
else: | |
cache.obj['path'] = str(path) | |
base_dir, build_commands = _get_build_commands(path) | |
if not any(build_commands): | |
sys.stderr.write(f"No command(s) for '{path}'\n") | |
return 1 | |
old_cwd = pl.Path(os.getcwd()) | |
os.chdir(str(base_dir)) | |
format_line = line_formatter(base_dir, old_cwd) | |
env = os.environ.copy() | |
env['TIME'] = r'\ncpu : %U + %S mem : %Mkb\ntime: %e files: %O' | |
for cmd in build_commands: | |
cmd_parts = shlex.split(cmd) if isinstance(cmd, str) else list(cmd) | |
assert isinstance(cmd_parts, list) | |
if TIME: | |
cmd_parts.insert(0, "time") | |
t0 = time.time() | |
# https://stackoverflow.com/a/4896288 | |
def enqueue_output(out, queue): | |
for line in iter(out.readline, b''): | |
line, _ = ANSI_ESCAPE_RE.subn(b"", line) | |
line = format_line(line.decode("utf-8")) | |
queue.put(line) | |
out.close() | |
proc = sp.Popen(cmd_parts, stderr=sp.PIPE, stdout=sp.PIPE, env=env) | |
qout = queue.Queue() | |
tout = threading.Thread(target=enqueue_output, args=(proc.stdout, qout)) | |
tout.daemon = True | |
tout.start() | |
qerr = queue.Queue() | |
terr = threading.Thread(target=enqueue_output, args=(proc.stderr, qerr)) | |
terr.daemon = True | |
terr.start() | |
retcode = None | |
while True: | |
try: | |
out_line = qout.get_nowait() | |
except queue.Empty: | |
out_line = None | |
try: | |
err_line = qerr.get_nowait() | |
except queue.Empty: | |
err_line = None | |
if sys.stdout.isatty(): | |
sys.stdout.write("\u001b[1000D") | |
sys.stdout.write(" " * 80) | |
sys.stdout.write("\u001b[1000D") | |
sys.stdout.flush() | |
try: | |
if out_line and out_line.strip(): | |
if not sys.stdout.isatty(): | |
sys.stdout.write("\n") | |
sys.stdout.write(out_line) | |
sys.stdout.flush() | |
except IOError: | |
pass | |
try: | |
if err_line and err_line.strip(): | |
if not sys.stderr.isatty(): | |
sys.stderr.write("\n") | |
sys.stderr.write(err_line) | |
sys.stderr.flush() | |
except IOError: | |
pass | |
if sys.stdout.isatty() or retcode is not None: | |
dur_ms = (time.time() - t0) * 1000 | |
progress_line = f"[{int(dur_ms):>5}ms] $ {cmd}" | |
try: | |
sys.stdout.write(progress_line) | |
sys.stdout.flush() | |
except ioerror: | |
pass | |
if retcode is not None: | |
sys.stdout.write("\n") | |
sys.stdout.flush() | |
if retcode is None: | |
try: | |
retcode = proc.wait(timeout=0.2) | |
except sp.TimeoutExpired: | |
retcode = None | |
else: | |
break | |
if retcode != 0: | |
# propagate error code | |
return retcode | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment