Skip to content

Instantly share code, notes, and snippets.

@spookylukey
Created August 20, 2024 08:03
Show Gist options
  • Save spookylukey/c175709610d5466f1bba49f356f505a1 to your computer and use it in GitHub Desktop.
Save spookylukey/c175709610d5466f1bba49f356f505a1 to your computer and use it in GitHub Desktop.
Custom shell plugin for Nikola
"""
Custom reST_ directive for passing a block through a shell command
"""
from subprocess import PIPE, Popen
from docutils import statemachine
from docutils.nodes import literal_block, raw
from docutils.parsers.rst import Directive, directives
from nikola.plugin_categories import RestExtension
_site = None
class Shell(Directive):
required_arguments = 1
optional_arguments = 0
has_content = True
final_argument_whitespace = False
option_spec = {
"format": directives.unchanged,
}
def run(self):
nodes = []
body = "\n".join(self.content)
program = self.arguments[0]
allowed_programs = _site.config.get("SHELL_ALLOWED_PROGRAMS", None)
if allowed_programs is None:
raise ValueError("SHELL_ALLOWED_PROGRAMS not configured in conf")
if program not in allowed_programs:
raise ValueError(f'Program "{program}" not found in SHELL_ALLOWED_PROGRAMS')
shell_command = allowed_programs[program]
result_format = self.options.get("format", None)
try:
process = Popen(
shell_command,
shell=True,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
)
stdout_data, stderr_data = process.communicate(input=body.encode("utf-8"))
except Exception as exc:
nodes.append(
self.state_machine.reporter.error(
f'Failed to run shell command "{shell_command}": {exc!r}',
literal_block(self.block_text, self.block_text),
line=self.lineno,
)
)
else:
if process.returncode == 0:
output_text = stdout_data.decode("utf-8")
if result_format is not None:
nodes.append(raw("", text=output_text, format=result_format))
else:
# Code borrowed from 'include' directive
tab_width = self.options.get("tab-width", self.state.document.settings.tab_width)
include_lines = statemachine.string2lines(output_text, tab_width, convert_whitespace=True)
self.state_machine.insert_input(include_lines, shell_command)
return []
else:
nodes.append(
self.state_machine.reporter.error(
f'Error returned from shell command "{shell_command}": {stderr_data.decode("utf-8")}',
literal_block(self.block_text, self.block_text),
line=self.lineno,
)
)
return nodes
class Plugin(RestExtension):
name = "shell"
def set_site(self, site):
global _site
directives.register_directive("shell", Shell)
_site = site
return super().set_site(site)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment