Skip to content

Instantly share code, notes, and snippets.

@malandro-sv
Last active March 18, 2026 21:53
Show Gist options
  • Select an option

  • Save malandro-sv/49374696157ab43c2af557e8c2f2401e to your computer and use it in GitHub Desktop.

Select an option

Save malandro-sv/49374696157ab43c2af557e8c2f2401e to your computer and use it in GitHub Desktop.
lonetwin's with some minor tweaks
#! /usr/bin/env python3
try:
import os
import sys
import time
import readline
import rlcompleter
from importlib import reload
except ImportError:
raise ImportError
# useful aliases:
ls = lambda: print(os.listdir("."))
readline.parse_and_bind("tab: complete")
# .pythonrc by lonetwin; added on 1.16.26:
# #!/usr/bin/env python
# -*- coding: utf-8 -*-
# The MIT License (MIT)
#
# Copyright (c) 2015-2021 Steven Fernandez
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Keep a copy of the initial namespace, we'll need it later
CLEAN_NS = globals().copy()
"""pymp - lonetwin's pimped-up pythonrc
This file will be executed when the Python interactive shell is started, if
$PYTHONSTARTUP is in your environment and points to this file. You could
also make this file executable and call it directly.
This file creates an InteractiveConsole instance, which provides:
* execution history
* colored prompts and pretty printing
* auto-indentation
* intelligent tab completion:¹
* source code listing for objects
* session history editing using your $EDITOR, as well as editing of
source files for objects or regular files
* temporary escape to $SHELL or ability to execute a shell command and
capturing the result into the '_' variable
* convenient printing of doc stings and search for entries in online docs
* auto-execution of a virtual env specific (`.venv_rc.py`) file at startup
If you have any other good ideas please feel free to submit issues/pull requests.
¹ Since python 3.4 the default interpreter also has tab completion
enabled however it does not do pathname completion
"""
# Fix for Issue #5
# - Exit if being called from within ipython
try:
import sys
__IPYTHON__ and sys.exit(0) # type: ignore
except NameError:
pass
import ast
import asyncio
import atexit
import concurrent
import glob
import importlib
import inspect
import keyword
import os
import pkgutil
import pprint
import re
import readline
import rlcompleter
import shlex
import signal
import subprocess
import threading
import warnings
import webbrowser
from code import InteractiveConsole
from functools import cached_property, lru_cache, partial
from itertools import chain
from operator import attrgetter
from tempfile import NamedTemporaryFile
from types import FunctionType, SimpleNamespace
__version__ = "0.9.0"
config = SimpleNamespace(
ONE_INDENT=" ", # what should we use for indentation ?
HISTFILE=os.path.expanduser("~/.python_history"),
HISTSIZE=-1,
EDITOR=os.getenv("EDITOR", "vi"),
SHELL=os.getenv("SHELL", "/bin/bash"),
EDIT_CMD=r"\e",
SH_EXEC="!",
DOC_CMD="?",
DOC_URL="https://docs.python.org/{sys.version_info.major}/search.html?q={term}",
HELP_CMD=r"\h",
LIST_CMD=r"\l",
AUTO_INDENT=True, # - Should we auto-indent by default
VENV_RC=os.getenv("VENV_RC", ".venv_rc.py"),
# - option to pass to the editor to open a file at a specific
# `line_no`. This is used when the EDIT_CMD is invoked with a python
# object to open the source file for the object.
LINE_NUM_OPT="+{line_no}",
# - Run-time toggle for auto-indent command (eg: when pasting code)
TOGGLE_AUTO_INDENT_CMD=r"\\",
# - should path completion expand ~ using os.path.expanduser()
COMPLETION_EXPANDS_TILDE=True,
# - when executing edited history, should we also print comments
POST_EDIT_PRINT_COMMENTS=True,
# - Attempt to auto-import top-level module names on NameError
ENABLE_AUTO_IMPORTS=True,
# - Start/Stop the asyncio loop in the interpreter (similar to `python -m asyncio`)
TOGGLE_ASYNCIO_LOOP_CMD=r"\A",
)
# Color functions. These get initialized in init_color_functions() later
red = green = yellow = blue = purple = cyan = grey = str
class ImprovedCompleter(rlcompleter.Completer):
"""A smarter rlcompleter.Completer"""
def __init__(self, namespace=None):
super().__init__(namespace)
# - remove '/' and '~' from delimiters to help with path completion
completer_delims = readline.get_completer_delims()
completer_delims = completer_delims.replace("/", "")
if config.COMPLETION_EXPANDS_TILDE:
completer_delims = completer_delims.replace("~", "")
readline.set_completer_delims(completer_delims)
self.matches = []
@lru_cache(None)
def pkg_contents(self, pkg):
"""Given a package name, return a list of it's sub-modules."""
spec = importlib.util.find_spec(pkg)
locs = [spec.origin] if not spec.parent else spec.submodule_search_locations
return [
pkg.name
for pkg in pkgutil.walk_packages(locs, f"{pkg}.", onerror=lambda _: None)
]
@cached_property
def pkglist(self):
return frozenset(item.name for item in pkgutil.iter_modules() if item.ispkg)
@cached_property
def modlist(self):
modlist = chain(
sys.builtin_module_names, map(attrgetter("name"), pkgutil.iter_modules())
)
return frozenset(name for name in modlist if not name.startswith("_"))
def exceptions(self, exc_cls=Exception):
exc_names = [exc.__name__ for exc in exc_cls.__subclasses__()]
for sub_cls in exc_cls.__subclasses__():
exc_names.extend(self.exceptions(sub_cls))
return exc_names
def startswith_filter(self, text, names, striptext=None):
filtered = [name for name in names if name.startswith(text)]
if striptext:
return [name.replace(striptext, "") for name in filtered]
return filtered
def get_path_matches(self, text):
return [
f"{item}{os.path.sep}" if os.path.isdir(item) else item
for item in glob.iglob(f"{text}**")
]
def get_import_matches(self, text, words):
if any(
[
(len(words) == 2 and not text),
(len(words) == 3 and text and "import".startswith(text)),
]
):
return ["import "]
if len(words) <= 2:
# import p<tab> / from p<tab>
modname, _, _ = text.partition(".")
if modname in self.pkglist:
return self.startswith_filter(text, self.pkg_contents(modname))
return self.startswith_filter(text, self.modlist)
if len(words) >= 3 and words[2] == "import":
# from pkg.sub import na<tab>
namespace = words[1]
pkg, _, _ = namespace.partition(".")
if pkg in self.pkglist:
# from pkg.sub import na<tab>
match_text = ".".join((namespace, text))
if matches := self.startswith_filter(
match_text, self.pkg_contents(pkg), striptext=f"{namespace}."
):
return matches
# from module import na<ta>
mod = importlib.import_module(namespace)
return self.startswith_filter(text, getattr(mod, "__all__", dir(mod)))
def complete(self, text, state, line=None):
if not line:
line = readline.get_line_buffer()
if line == "" or line.isspace():
return None if state else config.ONE_INDENT
words = line.split()
if state == 0:
# - this is the first completion is being attempted for
# text, we need to populate self.matches, just like
# super().complete()
if line.startswith(("from ", "import ")):
self.matches = self.get_import_matches(text, words)
elif words[0] in ("raise", "except"):
self.matches = self.startswith_filter(
text.lstrip("("), self.exceptions()
)
elif os.path.sep in text:
self.matches = self.get_path_matches(
os.path.expanduser(text)
if config.COMPLETION_EXPANDS_TILDE
else text
)
elif "." in text:
self.matches = self.attr_matches(text)
else:
self.matches = self.global_matches(text)
if len(self.matches) == 1:
match = self.matches[0]
if keyword.iskeyword(match) and match in ("raise", "except"):
self.matches.extend(self.exceptions())
if match and match.endswith(os.path.sep):
self.matches.extend(self.get_path_matches(match))
try:
return self.matches[state]
except IndexError:
return None
def _doc_to_usage(method):
def inner(self, arg):
arg = arg.strip()
if arg.startswith(("-h", "--help")):
return self.writeline(blue(method.__doc__.strip()))
return method(self, arg)
return inner
class ImprovedConsole(InteractiveConsole):
"""
Welcome to lonetwin's pimped up python prompt
You've got color, tab completion, auto-indentation, pretty-printing
and more !
* A tab with preceding text will attempt auto-completion of
keywords, names in the current namespace, attributes and methods.
If the preceding text has a '/', filename completion will be
attempted. Without preceding text four spaces will be inserted.
* History will be saved in {HISTFILE} when you exit.
* If you create a file named {VENV_RC} in the current directory, the
contents will be executed in this session before the prompt is
shown.
* Typing out a defined name followed by a '{DOC_CMD}' will print out
the object's __doc__ attribute if one exists.
(eg: []? / str? / os.getcwd? )
* Typing '{DOC_CMD}{DOC_CMD}' after something will search for the
term at {DOC_URL}
(eg: try webbrowser.open??)
* Open the your editor with current session history, source code of
objects or arbitrary files, using the '{EDIT_CMD}' command.
* List source code for objects using the '{LIST_CMD}' command.
* Execute shell commands using the '{SH_EXEC}' command.
Try `<cmd> -h` for any of the commands to learn more.
The EDITOR, SHELL, command names and more can be changed in the
config declaration at the top of this file. Make this your own !
"""
def __init__(self, *args, **kwargs):
self.session_history = [] # This holds the last executed statements
self.buffer = [] # This holds the statement to be executed
self._indent = ""
self.loop = None
super(ImprovedConsole, self).__init__(*args, **kwargs)
self.init_color_functions()
self.init_readline()
self.init_prompt()
self.init_pprint()
# - dict mapping commands to their handler methods
self.commands = {
config.EDIT_CMD: self.process_edit_cmd,
config.LIST_CMD: self.process_list_cmd,
config.SH_EXEC: self.process_sh_cmd,
config.HELP_CMD: self.process_help_cmd,
config.TOGGLE_AUTO_INDENT_CMD: self.toggle_auto_indent,
config.TOGGLE_ASYNCIO_LOOP_CMD: self.toggle_asyncio,
}
# - regex to identify and extract commands and their arguments
self.commands_re = re.compile(
r"(?P<cmd>{})\s*(?P<args>[^(]*)".format(
"|".join(re.escape(cmd) for cmd in self.commands)
)
)
def init_color_functions(self):
"""Populates globals dict with some helper functions for colorizing text"""
def colorize(color_code, text, bold=True, readline_workaround=False):
reset = "\033[0m"
color = "\033[{0}{1}m".format("1;" if bold else "", color_code)
# - reason for readline_workaround: http://bugs.python.org/issue20359
if readline_workaround:
return f"\001{color}\002{text}\001{reset}\002"
return f"{color}{text}{reset}"
g = globals()
for code, color in enumerate(
["red", "green", "yellow", "blue", "purple", "cyan", "grey"], 31
):
g[color] = partial(colorize, code)
def init_readline(self):
"""Activates history and tab completion"""
# - 1. history stuff
# - mainly borrowed from site.enablerlcompleter() from py3.4+,
# we can't simply call site.enablerlcompleter() because its
# implementation overwrites the history file for each python
# session whereas we prefer appending history from every
# (potentially concurrent) session.
# Reading the initialization (config) file may not be enough to set a
# completion key, so we set one first and then read the file.
readline_doc = getattr(readline, "__doc__", "")
if readline_doc is not None and "libedit" in readline_doc:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
try:
readline.read_init_file()
except OSError:
# An OSError here could have many causes, but the most likely one
# is that there's no .inputrc file (or .editrc file in the case of
# Mac OS X + libedit) in the expected location. In that case, we
# want to ignore the exception.
pass
def append_history(len_at_start):
current_len = readline.get_current_history_length()
readline.append_history_file(current_len - len_at_start, config.HISTFILE)
if readline.get_current_history_length() == 0:
# If no history was loaded, default to .python_history.
# The guard is necessary to avoid doubling history size at
# each interpreter exit when readline was already configured
# see: http://bugs.python.org/issue5845#msg198636
try:
readline.read_history_file(config.HISTFILE)
except IOError:
pass
len_at_start = readline.get_current_history_length()
atexit.register(append_history, len_at_start)
readline.set_history_length(config.HISTSIZE)
# - 2. enable auto-indenting
if config.AUTO_INDENT:
readline.set_pre_input_hook(self.auto_indent_hook)
# - 3. completion
# - replace default completer
self.completer = ImprovedCompleter(self.locals)
readline.set_completer(self.completer.complete)
def init_prompt(self, nested=False):
"""Activates color on the prompt based on python version.
Also adds the hosts IP if running on a remote host over a
ssh connection.
"""
prompt_color = green if sys.version_info.major == 2 else yellow
sys.ps1 = prompt_color(">=> " if nested else ">>> ", readline_workaround=True)
sys.ps2 = red("... ", readline_workaround=True)
# - if we are over a remote connection, modify the ps1
if os.getenv("SSH_CONNECTION"):
_, _, this_host, _ = os.getenv("SSH_CONNECTION").split()
sys.ps1 = prompt_color(f"[{this_host}]>>> ", readline_workaround=True)
sys.ps2 = red(f"[{this_host}]... ", readline_workaround=True)
def init_pprint(self):
"""Activates pretty-printing of output values."""
keys_re = re.compile(r'([\'\("]+(.*?[\'\)"]: ))+?')
color_dict = partial(keys_re.sub, lambda m: purple(m.group()))
format_func = pprint.pformat
if sys.version_info.major >= 3 and sys.version_info.minor > 3:
format_func = partial(pprint.pformat, compact=True)
def pprint_callback(value):
if value is not None:
try:
rows, cols = os.get_teminal_size()
except AttributeError:
try:
rows, cols = map(
int, subprocess.check_output(["stty", "size"]).split()
)
except Exception:
cols = 80
formatted = format_func(value, width=cols)
print(
color_dict(formatted)
if issubclass(type(value), dict)
else blue(formatted)
)
self.locals["_"] = value
sys.displayhook = pprint_callback
def _init_nested_repl(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
self.locals["asyncio"] = asyncio
self.locals["repl_future"] = None
self.locals["repl_future_interrupted"] = False
self.runcode = self.runcode_async
def repl_thread():
try:
self.init_prompt(nested=True)
self.interact(
banner=(
"An asyncio loop has been started in the main thread.\n"
"This nested interpreter is now running in a separate thread.\n"
f"Use {config.TOGGLE_ASYNCIO_LOOP_CMD} to stop the asyncio loop "
"and simply exit this nested interpreter to stop this thread\n"
),
exitmsg="now exiting nested REPL...\n",
)
finally:
warnings.filterwarnings(
"ignore",
message=r"^coroutine .* was never awaited$",
category=RuntimeWarning,
)
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(self._stop_asyncio_loop)
self.init_prompt()
self.repl_thread = threading.Thread(target=repl_thread)
self.repl_thread.start()
def _start_asyncio_loop(self):
self.locals["repl_future"] = None
self.locals["repl_future_interrupted"] = False
self.runcode = self.runcode_async
while self.loop is not None:
try:
self.loop.run_forever()
except KeyboardInterrupt:
if (
repl_future := self.locals["repl_future"]
) and not repl_future.done():
repl_future.cancel()
self.locals["repl_future_interrupted"] = True
def _stop_asyncio_loop(self):
self.loop.stop()
del self.locals["repl_future"]
del self.locals["repl_future_interrupted"]
self.runcode = self.runcode_sync
self.loop = None
self.writeline(
grey(
"Stopped the asyncio loop. "
f"Use {config.TOGGLE_ASYNCIO_LOOP_CMD} to restart it."
)
)
@_doc_to_usage
def toggle_asyncio(self, _):
"""{config.TOGGLE_ASYNCIO_LOOP_CMD} - Starts/stops the asyncio loop
Configures the interpreter in a similar manner to `python -m asyncio`
"""
if self.loop is None:
self._init_nested_repl()
self._start_asyncio_loop()
elif not self.loop.is_running():
self.writeline(grey("Restarting previously stopped asyncio loop"))
self._start_asyncio_loop()
else:
if (
repl_future := self.locals.get("repl_future", None)
) and not repl_future.done():
repl_future.cancel()
self.loop.call_soon_threadsafe(self._stop_asyncio_loop)
def auto_indent_hook(self):
"""Hook called by readline between printing the prompt and
starting to read input.
"""
readline.insert_text(self._indent)
readline.redisplay()
@_doc_to_usage
def toggle_auto_indent(self, _):
"""{config.TOGGLE_AUTO_INDENT_CMD} - Toggles the auto-indentation behavior"""
hook = None if config.AUTO_INDENT else self.auto_indent_hook
msg = "# Auto-Indent has been {}abled\n".format("en" if hook else "dis")
config.AUTO_INDENT = bool(hook)
if hook is None:
msg += (
"# End of blocks will be detected after 3 empty lines\n"
f"# Re-type {config.TOGGLE_AUTO_INDENT_CMD} on a line by itself to enable"
)
readline.set_pre_input_hook(hook)
print(grey(msg, bold=False))
return ""
def raw_input(self, prompt=""):
"""Read the input and delegate if necessary."""
line = super(ImprovedConsole, self).raw_input(prompt)
empty_lines = 3 if line else 1
while not config.AUTO_INDENT and empty_lines < 3:
line = super(ImprovedConsole, self).raw_input(prompt)
empty_lines += 1 if not line else 3
return self._cmd_handler(line)
def _cmd_handler(self, line):
if matches := self.commands_re.match(line):
command, args = matches.groups()
line = self.commands[command](args)
elif line.endswith(config.DOC_CMD):
if line.endswith(config.DOC_CMD * 2):
# search for line in online docs
# - strip off the '??' and the possible tab-completed
# '(' or '.' and replace inner '.' with '+' to create the
# search query string
line = line.rstrip(f"{config.DOC_CMD}.(").replace(".", "+")
webbrowser.open(config.DOC_URL.format(sys=sys, term=line))
line = ""
else:
line = line.rstrip(f"{config.DOC_CMD}.(")
if not line:
line = "dir()"
elif keyword.iskeyword(line):
line = f'help("{line}")'
else:
line = f"print({line}.__doc__)"
elif config.AUTO_INDENT and (
line.startswith(config.ONE_INDENT) or self._indent
):
if line.strip():
# if non empty line with an indent, check if the indent
# level has been changed
leading_space = line[: line.index(line.lstrip()[0])]
if self._indent != leading_space:
# indent level changed, update self._indent
self._indent = leading_space
else:
# - empty line, decrease indent
self._indent = self._indent[: -len(config.ONE_INDENT)]
line = self._indent
elif line.startswith("%"):
self.writeline("Y U NO LIKE ME?")
return line
return line or ""
def push(self, line):
"""Wrapper around InteractiveConsole's push method for adding an
indent on start of a block.
"""
if more := super(ImprovedConsole, self).push(line):
if line.endswith((":", "[", "{", "(")):
self._indent += config.ONE_INDENT
else:
self._indent = ""
return more
def runcode_async(self, code):
future = concurrent.futures.Future()
def callback():
self.locals["repl_future"] = None
self.locals["repl_future_interrupted"] = False
func = FunctionType(code, self.locals)
try:
coro = func()
except SystemExit:
raise
except BaseException as ex:
if isinstance(ex, KeyboardInterrupt):
self.locals["repl_future_interrupted"] = True
future.set_exception(ex)
return
if not inspect.iscoroutine(coro):
future.set_result(coro)
return
try:
self.locals["repl_future"] = self.loop.create_task(coro)
asyncio.futures._chain_future(self.locals["repl_future"], future)
except BaseException as exc:
future.set_exception(exc)
self.loop.call_soon_threadsafe(callback)
try:
return future.result()
except SystemExit:
raise
except BaseException:
if self.locals["repl_future_interrupted"]:
self.write("\nKeyboardInterrupt\n")
else:
self.showtraceback()
def runcode_sync(self, code):
"""Wrapper around super().runcode() to enable auto-importing"""
if not config.ENABLE_AUTO_IMPORTS:
return super().runcode(code)
try:
exec(code, self.locals)
except NameError as err:
if match := re.search(r"'(\w+)' is not defined", err.args[0]):
name = match.group(1)
if name in self.completer.modlist:
mod = importlib.import_module(name)
print(grey(f"# imported undefined module: {name}", bold=False))
self.locals[name] = mod
return self.runcode(code)
self.showtraceback()
except SystemExit:
raise
except Exception:
self.showtraceback()
runcode = runcode_sync
def write(self, data):
"""Write out data to stderr"""
sys.stderr.write(data if data.startswith("\033[") else red(data))
def writeline(self, data):
"""Same as write but adds a newline to the end"""
return self.write(f"{data}\n")
def resetbuffer(self):
self._indent = previous = ""
for line in self.buffer:
# - replace multiple empty lines with one before writing to session history
stripped = line.strip()
if stripped or stripped != previous:
self.session_history.append(line)
previous = stripped
return super(ImprovedConsole, self).resetbuffer()
def _mktemp_buffer(self, lines):
"""Writes lines to a temp file and returns the filename."""
with NamedTemporaryFile(mode="w+", suffix=".py", delete=False) as tempbuf:
tempbuf.write("\n".join(lines))
return tempbuf.name
def showtraceback(self, *args):
"""Wrapper around super(..).showtraceback()
We do this to detect whether any subsequent statements after a
traceback occurs should be skipped. This is relevant when
executing multiple statements from an edited buffer.
"""
self._skip_subsequent = True
return super(ImprovedConsole, self).showtraceback(*args)
def _exec_from_file(
self,
open_fd,
quiet=False,
skip_history=False,
print_comments=config.POST_EDIT_PRINT_COMMENTS,
):
self._skip_subsequent = False
previous = ""
for stmt in open_fd:
# - skip over multiple empty lines
stripped = stmt.strip()
if stripped == previous == "":
continue
# - if line is a comment, print (if required) and move to
# next line
if stripped.startswith("#"):
if print_comments and not quiet:
self.write(grey(f"... {stmt}", bold=False))
continue
# - process line only if we haven't encountered an error yet
if not self._skip_subsequent:
line = stmt.strip("\n")
if line and not line[0].isspace():
# - end of previous statement, submit buffer for
# execution
source = "\n".join(self.buffer)
more = self.runsource(source, self.filename)
if not more:
self.resetbuffer()
if not quiet:
self.write(cyan(f"... {stmt}", bold=(not self._skip_subsequent)))
if self._skip_subsequent:
self.session_history.append(stmt)
else:
self.buffer.append(line)
if not skip_history:
readline.add_history(line)
previous = stripped
self.push("")
def lookup(self, name, namespace=None):
"""Lookup the (dotted) object specified with the string `name`
in the specified namespace or in the current namespace if
unspecified.
"""
name, _, components = name.partition(".")
obj = (
getattr(namespace, name, namespace) if namespace else self.locals.get(name)
)
return self.lookup(components, obj) if components else obj
@_doc_to_usage
def process_edit_cmd(self, arg=""):
"""{config.EDIT_CMD} [object|filename]
Open {config.EDITOR} with session history, provided filename or
object's source file.
- without arguments, a temporary file containing session history is
created and opened in {config.EDITOR}. On quitting the editor, all
the non commented lines in the file are executed, if the
editor exits with a 0 return code (eg: if editor is `vim`, and
you exit using `:cq`, nothing from the buffer is executed and
you are returned to the prompt).
- with a filename argument, the file is opened in the editor. On
close, you are returned bay to the interpreter.
- with an object name argument, an attempt is made to lookup the
source file of the object and it is opened if found. Else the
argument is treated as a filename.
"""
line_num_opt = ""
if arg:
try:
if obj := self.lookup(arg):
filename = inspect.getsourcefile(obj)
_, line_no = inspect.getsourcelines(obj)
line_num_opt = config.LINE_NUM_OPT.format(line_no=line_no)
else:
filename = arg
except (IOError, TypeError, NameError) as e:
return self.writeline(e)
else:
# - make a list of all lines in history, commenting any non-blank lines.
if not (history := self.session_history):
history = open(config.HISTFILE).readlines()
filename = self._mktemp_buffer(
f"# {line}" if line.strip() else ""
for line in (line.strip("\n") for line in history)
)
line_num_opt = config.LINE_NUM_OPT.format(line_no=len(history))
# - shell out to the editor
rc = os.system(f"{config.EDITOR} {line_num_opt} {filename}")
# - if arg was not provided (ie: we edited history), execute
# un-commented lines in the current namespace
if not arg:
if rc == 0:
# - if HISTFILE contents were edited (ie: EDIT_CMD in a
# brand new session), don't print commented out lines
print_comments = (
False
if history != self.session_history
else config.POST_EDIT_PRINT_COMMENTS
)
with open(filename) as edits:
self._exec_from_file(edits, print_comments=print_comments)
else:
self.writeline(
f"{config.EDITOR} exited with an error code. Skipping execution."
)
os.unlink(filename)
@_doc_to_usage
def process_sh_cmd(self, cmd):
"""{config.SH_EXEC} [cmd [args ...] | {{fmt string}}]
Escape to {config.SHELL} or execute `cmd` in {config.SHELL}
- without arguments, the current interpreter will be suspended
and you will be dropped in a {config.SHELL} prompt. Use fg to return.
- with arguments, the text will be executed in {config.SHELL} and the
output/error will be displayed. Additionally '_' will contain
a named tuple with the (<stdout>, <stderror>, <return_code>)
for the execution of the command.
You may pass strings from the global namespace to the command
line using the `.format()` syntax. for example:
>>> filename = '/does/not/exist'
>>> !ls {{filename}}
ls: cannot access /does/not/exist: No such file or directory
>>> _
CompletedProcess(arg=['ls'], returncode=0, stdout=b'', stderr=b'ls:
cannot access /does/not/exist: No such file or directory\n')
"""
if cmd:
try:
cmd = cmd.format(**self.locals)
cmd = shlex.split(cmd)
if cmd[0] == "cd":
os.chdir(
os.path.expanduser(
os.path.expandvars(" ".join(cmd[1:]) or "${HOME}")
)
)
else:
completed = subprocess.run(
cmd, capture_output=True, env=os.environ, text=True
)
out, rc = completed.stdout, completed.returncode
print(red(out) if rc else green(out, bold=False))
self.locals["_"] = completed
except Exception:
self.showtraceback()
else:
if os.getenv("SSH_CONNECTION"):
# I use the bash function similar to the one below in my
# .bashrc to directly open a python prompt on remote
# systems I log on to.
# function rpython { ssh -t $1 -- "python" }
# Unfortunately, suspending this ssh session, does not place me
# in a shell, so I need to create one:
os.system(config.SHELL)
else:
os.kill(os.getpgrp(), signal.SIGSTOP)
@_doc_to_usage
def process_list_cmd(self, arg):
"""{config.LIST_CMD} <object> - List source code for object, if possible."""
if not arg:
return self.writeline(
"source list command requires an "
f"argument (eg: {config.LIST_CMD} foo)"
)
try:
src_lines, offset = inspect.getsourcelines(self.lookup(arg))
except (IOError, TypeError, NameError) as e:
self.writeline(e)
else:
for line_no, line in enumerate(src_lines, offset + 1):
self.write(cyan(f"{line_no:03d}: {line}"))
def process_help_cmd(self, arg):
if arg:
if keyword.iskeyword(arg):
self.push(f'help("{arg}")')
elif arg in self.commands:
self.commands[arg]("-h")
else:
self.push(f"help({arg})")
else:
print(cyan(self.__doc__).format(**config.__dict__))
def interact(self, banner=None, exitmsg=None):
"""A forgiving wrapper around InteractiveConsole.interact()"""
venv_rc_done = cyan("(no venv rc found)")
try:
with open(config.VENV_RC) as venv_rc:
self._exec_from_file(venv_rc, quiet=True, skip_history=True)
# - clear out session_history for venv_rc commands
self.session_history = []
venv_rc_done = green("Successfully executed venv rc !")
except IOError:
pass
if banner is None:
banner = (
f"Welcome to the ImprovedConsole (version {__version__})\n"
f"Type in {config.HELP_CMD} for list of features.\n"
f"{venv_rc_done}"
)
retries = 2
while retries:
try:
super(ImprovedConsole, self).interact(banner=banner, exitmsg=exitmsg)
except SystemExit:
# Fixes #2: exit when 'quit()' invoked
break
except Exception:
import traceback
retries -= 1
print(
red(
"I'm sorry, ImprovedConsole could not handle that !\n"
"Please report an error with this traceback, "
"I would really appreciate that !"
)
)
traceback.print_exc()
print(
red(
"I shall try to restore the crashed session.\n"
"If the crash occurs again, please exit the session"
)
)
banner = blue("Your crashed session has been restored")
else:
# exit with a Ctrl-D
break
# Exit the Python shell on exiting the InteractiveConsole
if threading.current_thread() == threading.main_thread():
sys.exit()
if not os.getenv("SKIP_PYMP"):
# - create our pimped out console and fire it up !
pymp = ImprovedConsole(locals=CLEAN_NS)
CLEAN_NS["__pymp__"] = pymp
pymp.interact()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment