Skip to content

Instantly share code, notes, and snippets.

@hashbrowncipher
Created August 16, 2024 03:52
Show Gist options
  • Save hashbrowncipher/8f29c2a77f94c12fdebede85c4c4dcb1 to your computer and use it in GitHub Desktop.
Save hashbrowncipher/8f29c2a77f94c12fdebede85c4c4dcb1 to your computer and use it in GitHub Desktop.
isolation of libmagic within Python
import os
import socket
import sys
import Pyro5.api
import Pyro5.server
import pyseccomp as seccomp
import serpent
@Pyro5.server.expose
class MagicServer:
def from_buffer(self, data):
from magic import from_buffer
data = serpent.tobytes(data)
return from_buffer(data)
def pyro5_server(sock):
with Pyro5.api.Daemon(connected_socket=sock) as daemon:
daemon.register(MagicServer, objectId="magic.server")
daemon.requestLoop()
def drop_privileges(sock):
# I would also use namespaces, but most of the interesting ones require
# CAP_SYS_ADMIN
sc_filter = seccomp.SyscallFilter(seccomp.ERRNO(seccomp.errno.EPERM))
fileno = sock.fileno()
sc_filter.add_rule(
seccomp.ALLOW, "sendto", seccomp.Arg(0, seccomp.EQ, fileno)
)
sc_filter.add_rule(
seccomp.ALLOW, "recvfrom", seccomp.Arg(0, seccomp.EQ, fileno)
)
sc_filter.add_rule(seccomp.ALLOW, "read")
sc_filter.add_rule(seccomp.ALLOW, "write")
sc_filter.add_rule(seccomp.ALLOW, "close")
sc_filter.add_rule(seccomp.ALLOW, "lseek")
sc_filter.add_rule(seccomp.ALLOW, "getpeername")
sc_filter.add_rule(seccomp.ALLOW, "getsockname")
sc_filter.add_rule(seccomp.ALLOW, "getrandom")
sc_filter.add_rule(seccomp.ALLOW, "mmap")
sc_filter.add_rule(seccomp.ALLOW, "munmap")
sc_filter.add_rule(seccomp.ALLOW, "shutdown")
sc_filter.add_rule(seccomp.ALLOW, "mprotect")
sc_filter.add_rule(seccomp.ALLOW, "uname")
# A bpf filter to restrict to sys.path would be nice
sc_filter.add_rule(seccomp.ALLOW, "getdents64")
sc_filter.add_rule(seccomp.ALLOW, "newfstatat")
allowed_flags = os.O_RDONLY | os.O_NONBLOCK | os.O_CLOEXEC | os.O_DIRECTORY
disallowed_flags = (1 << 32) - 1 - allowed_flags
sc_filter.add_rule(
seccomp.ALLOW,
"openat",
seccomp.Arg(0, seccomp.EQ, 4294967196),
seccomp.Arg(2, seccomp.MASKED_EQ, disallowed_flags, 0),
)
sc_filter.load()
def parent_process(sock):
buf = open(sys.argv[1], "rb").read(2048)
with Pyro5.api.Proxy("magic.server", connected_socket=sock) as magic_proxy:
result = magic_proxy.from_buffer(buf)
print("Magic type:", result)
def main():
parent_sock, child_sock = socket.socketpair()
pid = os.fork()
if pid == 0:
parent_sock.close()
drop_privileges(child_sock)
pyro5_server(child_sock)
else:
child_sock.close()
parent_process(parent_sock)
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment