Created
August 20, 2024 08:03
-
-
Save spookylukey/c175709610d5466f1bba49f356f505a1 to your computer and use it in GitHub Desktop.
Custom shell plugin for Nikola
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Custom reST_ directive for passing a block through a shell command | |
""" | |
from subprocess import PIPE, Popen | |
from docutils import statemachine | |
from docutils.nodes import literal_block, raw | |
from docutils.parsers.rst import Directive, directives | |
from nikola.plugin_categories import RestExtension | |
_site = None | |
class Shell(Directive): | |
required_arguments = 1 | |
optional_arguments = 0 | |
has_content = True | |
final_argument_whitespace = False | |
option_spec = { | |
"format": directives.unchanged, | |
} | |
def run(self): | |
nodes = [] | |
body = "\n".join(self.content) | |
program = self.arguments[0] | |
allowed_programs = _site.config.get("SHELL_ALLOWED_PROGRAMS", None) | |
if allowed_programs is None: | |
raise ValueError("SHELL_ALLOWED_PROGRAMS not configured in conf") | |
if program not in allowed_programs: | |
raise ValueError(f'Program "{program}" not found in SHELL_ALLOWED_PROGRAMS') | |
shell_command = allowed_programs[program] | |
result_format = self.options.get("format", None) | |
try: | |
process = Popen( | |
shell_command, | |
shell=True, | |
stdin=PIPE, | |
stdout=PIPE, | |
stderr=PIPE, | |
) | |
stdout_data, stderr_data = process.communicate(input=body.encode("utf-8")) | |
except Exception as exc: | |
nodes.append( | |
self.state_machine.reporter.error( | |
f'Failed to run shell command "{shell_command}": {exc!r}', | |
literal_block(self.block_text, self.block_text), | |
line=self.lineno, | |
) | |
) | |
else: | |
if process.returncode == 0: | |
output_text = stdout_data.decode("utf-8") | |
if result_format is not None: | |
nodes.append(raw("", text=output_text, format=result_format)) | |
else: | |
# Code borrowed from 'include' directive | |
tab_width = self.options.get("tab-width", self.state.document.settings.tab_width) | |
include_lines = statemachine.string2lines(output_text, tab_width, convert_whitespace=True) | |
self.state_machine.insert_input(include_lines, shell_command) | |
return [] | |
else: | |
nodes.append( | |
self.state_machine.reporter.error( | |
f'Error returned from shell command "{shell_command}": {stderr_data.decode("utf-8")}', | |
literal_block(self.block_text, self.block_text), | |
line=self.lineno, | |
) | |
) | |
return nodes | |
class Plugin(RestExtension): | |
name = "shell" | |
def set_site(self, site): | |
global _site | |
directives.register_directive("shell", Shell) | |
_site = site | |
return super().set_site(site) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment