Last active
December 12, 2023 13:55
-
-
Save astoeckel/601c66f2475e7516102b0f5a67b53a1c to your computer and use it in GitHub Desktop.
Python sandbox
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This module provides an `eval_in_sandbox` function that, on Linux systems, | |
executes arbitrary Python code in a secure and lightweight sandbox with | |
memory, I/O, and time limits. | |
---- | |
Copyright (c) 2023 Andreas Stöckel | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
import io | |
import os | |
import shutil | |
import subprocess | |
import tempfile | |
import time | |
import sys | |
import dataclasses | |
import select | |
@dataclasses.dataclass | |
class ProcessOutput: | |
returncode: int = 0 | |
stdout: str = "" | |
stderr: str = "" | |
def __bool__(self): | |
return self.returncode == 0 | |
class SandboxError(Exception): | |
pass | |
def eval_in_sandbox( | |
python_code: str, | |
mem_max_mb: int = 100, | |
cpu_max_time_sec: int = 1, | |
timeout_max_time_sec: int = 10, | |
io_max_kb: int = 128, | |
) -> ProcessOutput: | |
""" | |
Executes the given script in an isolated sandbox with memory, CPU, and I/O | |
limits. Returns the captured stdout and stderr. Throws `SandboxError` if | |
anything goes wrong. | |
:param python_code: is a string containing the Python code that should be | |
safely executed. | |
:param mem_max_mb: is the maximum size of the address space used by the | |
child process in mebibytes. Note that this value must be substantially | |
larger than the actual amount of memory used by the process, since it | |
includes the memory needed by the Python interpreter executable itself. | |
:param cpu_max_time_sec: is the maximum CPU time that can be used by the | |
child process. CPU time is the time during which the process is | |
actually active; this doesn't include sleeping, or waiting for I/O. | |
:param timeout_max_time_sec: is the maximum wall-clock time for which the | |
process may be alive. | |
:param io_max_kb: maximum number of kibibytes that may be written by the | |
process to stdout/stdin. | |
:return: an instance of the `ProcessOutput` structure, containing the | |
process return code and stdin/stdout. | |
""" | |
# Convert the given memory size limits to bytes | |
mem_max_bytes = 1024 * 1024 * mem_max_mb | |
io_max_bytes = 1024 * io_max_kb | |
# Search for the bubblewrap executable | |
bwrap_exe = shutil.which("bwrap") | |
if bwrap_exe is None: | |
raise SandboxError("Cannot find the `bwrap` executable") | |
# Search for the prlimit executable | |
prlimit_exe = shutil.which("prlimit") | |
if prlimit_exe is None: | |
raise SandboxError("Cannot find the `prlimit` executable") | |
# Determine the path to the Python interpreter; sometimes `sys.executable` | |
# returns wrong results, so we use `shutil.which` as a fallback | |
python_exe = sys.executable | |
if not os.access(python_exe, os.X_OK): | |
python_exe = shutil.which("python3") | |
if python_exe is None: | |
raise SandboxError("Cannot find the Python executable") | |
if not python_exe.startswith("/usr"): | |
raise SandboxError("Python interpreter not located in `/usr`") | |
# Securely create a temporary directory; this directory will automatically | |
# be deleted | |
with tempfile.TemporaryDirectory() as tmp_dir: | |
# Write the given code into the temporary directory | |
code_filename = os.path.join(tmp_dir, "code.py") | |
with open(code_filename, "w", encoding="utf-8") as f: | |
f.write(python_code) | |
f.write("\n") | |
# Assemble the arguments that we need to pass to `bwrap` | |
args = [ | |
# Use `prlimit` to limit the CPU time and the size of the address | |
# space used by the child commands | |
prlimit_exe, | |
# Set the soft and hard cpu time limit | |
f"--cpu={cpu_max_time_sec}:{cpu_max_time_sec}", | |
# Set the soft and hard address space limit | |
f"--as={mem_max_bytes}:{mem_max_bytes}", | |
# Execute bwrap | |
bwrap_exe, | |
# Use an isolated terminal session for the subprcoess | |
"--new-session", | |
# Isolate all possible Linux namespaces. | |
"--unshare-all", | |
# Clear all environment variables | |
"--clearenv", | |
# Execute the script inside the "home" directory | |
"--chdir", "/home", | |
# Mount the temporary directory as home | |
"--ro-bind", tmp_dir, "/home", | |
# Make the directory containing the Python interpreter available | |
"--ro-bind", "/usr", "/usr", | |
"--ro-bind", "/lib", "/lib", | |
"--ro-bind", "/lib64", "/lib64", | |
# Lastly, make the virtual root directory read-only | |
"--remount-ro", "/", | |
# Execute the Python interpreter | |
python_exe, | |
# Disable buffering | |
"-u", | |
# Execute the script | |
"/home/code.py" | |
] | |
# Execute the subprocess; read from stdout/stderr while handling | |
# timeouts and I/O limits. | |
try: | |
# Create the subprocess; close stdin | |
proc = subprocess.Popen( | |
args, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
stdin=subprocess.DEVNULL | |
) | |
# Mark stdout and stderr as non-blocking; this is needed because we | |
# always try to read 1024-byte chunks from these streams. | |
os.set_blocking(proc.stdout.fileno(), False) | |
os.set_blocking(proc.stderr.fileno(), False) | |
# Get a second timestamp from a monotonic time source | |
t_start = time.monotonic() | |
# Read stdout and stderr | |
buf_stdout, buf_stderr = io.BytesIO(), io.BytesIO() | |
src_fds = [proc.stdout, proc.stderr] | |
tar_bufs = [buf_stdout, buf_stderr] | |
returncode = None | |
while returncode is None: | |
# Abort if we get a timeout | |
t_now = time.monotonic() | |
if t_now - t_start > timeout_max_time_sec: | |
raise SandboxError("Process timed out") | |
# Wait for data being available on stdout/stderr | |
r_fds, _, x_fds = select.select( | |
src_fds, | |
# We do not have any input stream | |
[], | |
# Wait for errors/the proc being closed | |
src_fds, | |
# Wait for at most 100ms | |
0.1, | |
) | |
# Read data from stdout/stderr; abort if the I/O buffers are | |
# too large | |
for src_fd, tar_buf in list(zip(src_fds, tar_bufs)): | |
if src_fd in r_fds: | |
# FD is ready for reading, try to read a bit of data | |
buf = src_fd.read(1024) | |
# Did that fail? If yes, do not bother asking select | |
# for that fd anymore | |
if not buf: | |
src_fds.remove(src_fd) | |
tar_bufs.remove(tar_buf) | |
continue | |
# Append the data we read to the target buffer | |
tar_buf.write(buf) | |
if tar_buf.tell() > io_max_bytes: | |
raise SandboxError("I/O buffer size exceeded") | |
# Discard stdout/stderr if closed by the process | |
for src_fd, tar_buf in list(zip(src_fds, tar_bufs)): | |
if src_fd in x_fds: | |
# Do not bother about this FD anymore | |
src_fds.remove(src_fd) | |
tar_bufs.remove(tar_buf) | |
# Wait for the process to exit, if stdout/stderr were closed | |
if not src_fds: | |
try: | |
returncode = proc.wait(0.1) | |
except TimeoutError: | |
# If we get a timeout, continue in the loop; our own | |
# high-level timeout code will trigger at some point. | |
pass | |
# Try to decode the stdout/stderr buffer | |
try: | |
stdout = str(buf_stdout.getvalue(), "utf-8") | |
stderr = str(buf_stderr.getvalue(), "utf-8") | |
except UnicodeDecodeError: | |
raise SandboxError("Subprocess did not produce valid UTF-8") | |
# Strip spaces away | |
stdout = stdout.strip() | |
stderr = stderr.strip() | |
# Return the output! | |
return ProcessOutput( | |
returncode=returncode, | |
stderr=stderr, | |
stdout=stdout, | |
) | |
except SandboxError: | |
# Kill the subprocess | |
if returncode is None: | |
proc.kill() | |
proc.wait() | |
# Re-raise the exception | |
raise | |
if __name__ == "__main__": | |
def main(): | |
""" | |
Small test program. | |
""" | |
res = eval_in_sandbox(""" | |
print('Hello World from stdout!') | |
import os | |
import json | |
print("My UID is", os.getuid()) | |
print("My GID is", os.getgid()) | |
print("My PID is", os.getpid()) | |
print("My CWD is", os.getcwd()) | |
print("My environment is", json.dumps(dict(**os.environ), indent=4)) | |
try: | |
with open("./foo.txt", "w") as f: | |
f.write("EVIL\\n") | |
except BaseException as e: | |
print(f"Writing to a file failed with: {e}") | |
try: | |
os.unlink("code.py") | |
except BaseException as e: | |
print(f"Deleting a file failed with: {e}") | |
import sys | |
sys.stderr.write('Hello World from stderr!') | |
sys.exit(42) | |
""") | |
print(f"Stdout: {res.stdout}") | |
print(f"Stderr: {res.stderr}") | |
print(f"Returncode: {res.returncode}") | |
sys.exit(res.returncode) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment