Last active
December 8, 2023 10:03
-
-
Save bnorick/44af7676f8e1fb853e50afab0aa5f292 to your computer and use it in GitHub Desktop.
Pythoni: a Python REPL you can pipe data into for processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import code | |
import functools | |
import inspect | |
import logging | |
import os | |
import pathlib | |
import shlex | |
import stat | |
import sys | |
import traceback | |
logger = logging.getLogger(__name__) | |
class ParserExited(Exception): | |
pass | |
class CommandError(Exception): | |
pass | |
class ArgumentParser(argparse.ArgumentParser): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def print_help(self, *args, **kwargs): | |
logger.debug(f"print_help prog=%s args=%s kwargs=%s", self.prog, args, kwargs) | |
super().print_help(file=sys.stderr) | |
def exit(self, *args, **kwargs): | |
logger.debug("exit prog=%s", self.prog) | |
raise ParserExited | |
def _argparse_export(pythoni, args): | |
logger.debug("%s", args) | |
if args.indices is None: | |
args.indices = [] | |
pythoni.export(args.path, *args.indices, as_python=args.as_python, input_command=args.input_command, overwrite=args.overwrite, all=args.all) | |
def _argparse_help(pythoni, args): | |
pythoni.command_parser.print_help(file=sys.stderr) | |
def _int_or_range(str): | |
try: | |
start, end = str.split("-") | |
return range(int(start), int(end) + 1) | |
except ValueError: | |
try: | |
val = int(str) | |
return range(val, val + 1) | |
except ValueError: | |
raise argparse.ArgumentError(f"indices must be integers or (inclusive) integer ranges, e.g., N-M") | |
class Pythoni(code.InteractiveConsole): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._history = [] | |
# NOTE: requires python 3.9 | |
self.command_parser = ArgumentParser( | |
usage="%%command ...", | |
description="Use commands prefixed with % to perform magic!\nFor help with a specific command, use %command --help.", | |
exit_on_error=False, | |
add_help=False, | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
) | |
subparsers = self.command_parser.add_subparsers(title="command", help=argparse.SUPPRESS) | |
export = subparsers.add_parser("export", prog="%export", exit_on_error=False) | |
export.add_argument("path", type=pathlib.Path) | |
export.add_argument("indices", type=_int_or_range, nargs="*", help="indices from history to export, integers or (inclusive) integer ranges, e.g., N-M") | |
export.add_argument("-p", "--as-python", action="store_true") | |
export.add_argument("-i", "--input-command", type=str, required=False, help="command to run which is piped into script") | |
export.add_argument("-o", "--overwrite", action="store_true") | |
export.add_argument("-a", "--all", action="store_true") | |
export.set_defaults(func=functools.partial(_argparse_export, self)) | |
help = subparsers.add_parser("help", prog="%help", exit_on_error=False) | |
help.set_defaults(func=functools.partial(_argparse_help, self)) | |
self.command_parser.epilog = "Available commands:\n " + "\n ".join(subparsers.choices.keys()) | |
sys.ps1 = "[0] >>> " | |
def runsource(self, source, filename="<input>", symbol="single"): | |
logger.debug("runsource %s", source) | |
if source.startswith("%"): | |
try: | |
# self.command_parser._reset() | |
args = self.command_parser.parse_args(shlex.split(source[1:].strip())) | |
args.func(args) | |
except ParserExited: | |
pass | |
except (CommandError, argparse.ArgumentError) as e: | |
self.error(str(e)) | |
except Exception as e: | |
self.error(str(e)) | |
traceback.print_exc(file=sys.stderr) | |
return False | |
else: | |
result = super().runsource(source, filename=filename, symbol=symbol) | |
if not result: | |
self._history.append("\n".join(self.buffer)) | |
sys.ps1 = f"[{len(self._history)}] >>> " | |
sys.ps2 = " " * sys.ps1.index(">") + "... " | |
return result | |
def _get_export_code(self, *indices, all=False): | |
if indices and all: | |
self.error(f"Both indices and all=True passed, only use one or the other.") | |
return | |
code = [] | |
if all: | |
code.extend(self._history) | |
else: | |
try: | |
for index_range in indices: | |
for index in index_range: | |
code.append(self._history[index]) | |
except IndexError as e: | |
raise CommandError(f"Nonexistent history index requested, {index=}") | |
return "\n\n".join(code) | |
def export(self, path, *indices, as_python=False, input_command=None, overwrite=False, all=False): | |
path = pathlib.Path(path) | |
if path.exists() and not overwrite: | |
self.error(f"path exists, pass overwrite=True to overwrite: {path}") | |
return | |
code = self._get_export_code(*indices, all=all) | |
if as_python: | |
with path.open("w", encoding="utf8") as f: | |
f.write(code) | |
print(f"Exported python script to {path}") | |
else: | |
if 'stdin' in code: | |
code = "import sys\nstdin = sys.stdin.readlines()\n\n" + code | |
if input_command: | |
script = f'#!/usr/bin/env bash\nset -Eeuo pipefail\ntrap exit SIGINT SIGTERM ERR EXIT\nCODE={shlex.quote(code)}\n{input_command} | python -c "$CODE"' | |
else: | |
script = f'#!/usr/bin/env bash\nset -Eeuo pipefail\ntrap exit SIGINT SIGTERM ERR EXIT\nCODE={shlex.quote(code)}\npython -c "$CODE"' | |
with path.open("w", encoding="utf8") as f: | |
f.write(script) | |
path.chmod(mode=path.stat().st_mode|stat.S_IRWXU) | |
print(f"Exported executable bash script to {path}") | |
def error(self, message): | |
print(f"ERROR: {message}", file=sys.stderr) | |
parser = argparse.ArgumentParser("pythoni") | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument("-c", "--code", help="code to execute, may use \"stdin\" local which is a list of lines read from stdin") | |
group.add_argument("-p", "--print", action="append", nargs="+", help="-p '[CODE]' is a shortcut to execute print([CODE])") | |
group.add_argument("-pf", "--print-fstring", action="append", nargs="+", help="-pf '[INPUT]' is a shortcut to execute print(f\'[INPUT]\'), no automatic escaping") | |
group.add_argument("-l", "--lambda", help="-l 'lambda line: line.replace(\"foobar\", \"\")' is a shortcut to execute the lambda func for each line and print results which evaluate to True, i.e., returning None will hide a line", dest="lambda_") | |
parser.add_argument("-d", "--debug", action="store_true", help="print debug informat useful when developing pythoni") | |
args = parser.parse_args() | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(funcName)s [%(lineno)d] - %(message)s") | |
redirected_out = not sys.stdout.isatty() | |
redirected_in = not sys.stdin.isatty() | |
if args.print: | |
args.print = [item for items in args.print for item in items] | |
args.code = "; ".join(f"print({p})" for p in args.print) | |
elif args.print_fstring: | |
args.print_fstring = [item for items in args.print_fstring for item in items] | |
args.code = "; ".join(f"print(f\'{p}\')" for p in args.print_fstring) | |
elif args.lambda_: | |
try: | |
if not args.lambda_.strip().startswith("lambda"): | |
raise SyntaxError | |
result = eval(args.lambda_) | |
if not callable(result): | |
raise SyntaxError | |
sig = inspect.signature(result) | |
if len(sig.parameters) != 1: | |
raise RuntimeError(f"invalid --lambda, function must accept a single argument \"{args.lambda_}\" does not") | |
except Exception as e: | |
if isinstance(e, SyntaxError): | |
message = f"invalid --lambda, must be a valid lambda function but \"{args.lambda_}\" is not" | |
else: | |
message = str(e) | |
print(f"ERROR: {message}", file=sys.stderr) | |
sys.exit(1) | |
args.code = f"""\ | |
fn = {args.lambda_} | |
for line in stdin: | |
result = fn(line) | |
if result: | |
print(result, end='') | |
""" | |
if redirected_in: | |
stdin = sys.stdin.readlines() | |
if not args.code: | |
print('stdin read to "stdin" variable\n') | |
if args.code: | |
exec(args.code) | |
elif not redirected_out: | |
# ref: https://github.com/python/cpython/issues/36029#issuecomment-1093968541 | |
_stdin = os.dup(0) | |
os.close(0) | |
tty = os.open("/dev/tty", os.O_RDONLY) | |
assert tty == 0 | |
import readline | |
import rlcompleter | |
variables = globals().copy() | |
variables.update(locals()) | |
readline.set_completer(rlcompleter.Completer(variables).complete) | |
readline.parse_and_bind("tab: complete") | |
pythoni = Pythoni(variables) | |
variables.update(pythoni=pythoni) | |
pythoni.interact() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For this version, I added some magic (like IPython) to enable an export capability,
%export
.Say you start with
your | long | piped | sequence | pythoni
and then interactively process the piped input. Now, you can export your interactive session to either a python script or a fully contained bash script.You can do the following:
%export process.py --all
to enable
your | long | piped | sequence | python process.py
Even better, especially for repetitive tasks, is the shell script export, e.g.,
%export go.sh --command 'your | long | piped | sequence' --all
and then simply running with
./go.sh
.As a contrived example,