|
#!/usr/bin/env python3 |
|
"""Firejail FS""" |
|
import re |
|
import argparse |
|
import subprocess |
|
import tempfile |
|
import stat |
|
import time |
|
import functools |
|
import logging |
|
import shlex |
|
import os |
|
import shutil |
|
import pathlib |
|
import errno |
|
import pwd |
|
import grp |
|
import fuse |
|
|
|
|
|
def lru_cache_time(seconds, maxsize=128): |
|
""" |
|
Adds time aware caching to lru_cache |
|
adapted from https://stackoverflow.com/a/57300326 |
|
""" |
|
def dec(func): |
|
@functools.lru_cache(maxsize) |
|
def inner_wrapper(__ttl, *args, **kwargs): |
|
""" |
|
Main wrapper, note that the first argument (ttl) is not passed down. |
|
This is because no function should bother to know that this is here. |
|
""" |
|
return func(*args, **kwargs) |
|
|
|
@functools.wraps(inner_wrapper, |
|
assigned=functools.WRAPPER_ASSIGNMENTS |
|
+ ('cache_clear', 'cache_info')) |
|
def wrapper(*args, **kwargs): |
|
"""outer wrapper adds __ttl to cached inner wrapper call""" |
|
return inner_wrapper(round(time.time() / seconds), *args, **kwargs) |
|
|
|
return wrapper |
|
return dec |
|
|
|
|
|
def parse_ls_output(data): |
|
"""seems to be the best firejail provides...""" |
|
def parse_mode(text_mode): |
|
# reverse-copied from |
|
# https://github.com/coreutils/gnulib/blob/master/lib/filemode.c |
|
# commented-out lines don't exist in (my?) Python |
|
mode = { |
|
'-': stat.S_IFREG, |
|
'd': stat.S_IFDIR, |
|
'b': stat.S_IFBLK, |
|
'c': stat.S_IFCHR, |
|
'l': stat.S_IFLNK, |
|
'p': stat.S_IFIFO, |
|
's': stat.S_IFSOCK, |
|
# 'C': stat.S_IFCTG, |
|
'D': stat.S_IFDOOR, |
|
# 'm': stat.S_IFMPB, |
|
# 'n': stat.S_IFNWK, |
|
'P': stat.S_IFPORT, |
|
'w': stat.S_IFWHT, |
|
}.get(text_mode[0], 0) |
|
if text_mode[3] in 'sS': |
|
mode |= stat.S_ISUID |
|
if text_mode[6] in 'sS': |
|
mode |= stat.S_ISGID |
|
if text_mode[9] in 'tT': |
|
mode |= stat.S_ISVTX |
|
text_mode = re.sub( |
|
'(?i)s|t', |
|
lambda m: '-' if m.group(0).isupper() else 'x', |
|
text_mode[1:], |
|
) |
|
for ent, char in zip((*['USR']*3, *['GRP']*3, *['OTH']*3), text_mode): |
|
if char != '-': |
|
mode |= getattr(stat, f'S_I{char.upper()}{ent}') |
|
return mode |
|
|
|
r = {} |
|
for line in data.splitlines(): |
|
# usernames: https://serverfault.com/a/578264 |
|
m = re.match( |
|
r'([-dbclpsDPw][-rwxsStT]{9}) ' |
|
'([\w.-]+) +([\w.-]+) +(\d+) (.+)', line) |
|
if m is None: |
|
continue # files with newlines around |
|
mode, owner, group, size, name = m.groups() |
|
r[name] = dict( |
|
st_mode=parse_mode(mode), |
|
st_uid=pwd.getpwnam(owner).pw_uid, |
|
st_gid=grp.getgrnam(group).gr_gid, |
|
st_size=int(size), |
|
) |
|
return r |
|
|
|
|
|
class FirejailFS(fuse.Operations): |
|
def __init__(self, sandbox): |
|
self.sandbox = sandbox |
|
self.open_files = {} |
|
self.tmpdir = pathlib.Path(tempfile.mkdtemp()) |
|
self.datadir = pathlib.Path(tempfile.mkdtemp()) |
|
self.fd = 0 |
|
self.invalidate_caches = self.get_dir_info.cache_clear |
|
|
|
def __del__(self): |
|
shutil.rmtree(self.tmpdir) |
|
shutil.rmtree(self.datadir) |
|
|
|
def firejail_command(self, command, *args, **kwargs): |
|
cmdline = ('firejail', f'--{command}={self.sandbox}') + args |
|
logging.info('calling %s', shlex.join(cmdline)) |
|
r = subprocess.run( |
|
cmdline, |
|
capture_output=True, |
|
**kwargs, |
|
) |
|
if r.returncode: |
|
if b'Error: Cannot access' in r.stderr: |
|
raise fuse.FuseOSError(errno.EACCES) |
|
elif b'Warning: cannot open source file' in r.stderr: |
|
raise fuse.FuseOSError(errno.EACCES) |
|
elif b'is an invalid filename' in r.stderr: |
|
raise fuse.FuseOSError(errno.EINVAL) |
|
else: |
|
raise RuntimeError('firejail exited with status ' |
|
f'{r.returncode} and stderr "{r.stderr}"') |
|
return r.stdout.decode(errors='replace') |
|
|
|
@lru_cache_time(2) |
|
def get_dir_info(self, path): |
|
return parse_ls_output(self.firejail_command('ls', path)) |
|
|
|
### --- Start FUSE operations --- |
|
|
|
def create(self, path, mode): |
|
outer_tmp_fd, outer_path = tempfile.mkstemp(dir=self.datadir) |
|
os.chmod(outer_path, mode) |
|
os.close(outer_tmp_fd) |
|
self.fd += 1 |
|
self.open_files[self.fd] = open(outer_path, 'w+b') |
|
self.flush(path, self.fd) |
|
self.invalidate_caches() |
|
logging.info(f'created {path}') |
|
return self.fd |
|
|
|
def flush(self, path, fh): |
|
f = self.open_files[fh] |
|
if '+' in f.mode or 'w' in f.mode: |
|
logging.debug(f'flushing {path}') |
|
os.fsync(f.fileno()) |
|
self.firejail_command('put', f.name, path) |
|
self.invalidate_caches() |
|
|
|
# fsync not implemented |
|
|
|
def getattr(self, path, fh=None): |
|
logging.debug(f'stat {path}') |
|
defaults = dict( |
|
st_ctime=time.time(), |
|
st_mtime=time.time(), |
|
st_atime=time.time(), |
|
) |
|
if path == '/': |
|
stat_result = os.stat('/') |
|
return {k: getattr(stat_result, k) for k in dir(stat_result) |
|
if k.startswith('st_')} |
|
dirname, basename = os.path.split(path) |
|
try: |
|
data = self.get_dir_info(dirname)[basename] |
|
except KeyError: |
|
raise fuse.FuseOSError(errno.ENOENT) |
|
return {**data, **defaults} |
|
|
|
# getxattr not implemented |
|
# listxattr not implemented |
|
# mkdir not implemented |
|
|
|
def open(self, path, flags): |
|
if flags & os.O_PATH: |
|
raise fuse.FuseOSError(errno.ENOSYS) |
|
flags &= (os.O_RDONLY | os.O_RDWR | os.O_WRONLY) |
|
mode = { |
|
os.O_RDONLY: 'rb', |
|
os.O_RDWR: 'rb+', |
|
os.O_WRONLY | os.O_APPEND: 'ab', |
|
os.O_WRONLY | os.O_TRUNC: 'wb', |
|
os.O_RDWR | os.O_TRUNC: 'wb+', |
|
os.O_WRONLY: 'rb+', # probably closest I can get |
|
}[flags] |
|
outer_path = tempfile.mktemp(dir=self.datadir) |
|
directory, base_name = os.path.split(path) |
|
if base_name not in self.readdir(directory, None): |
|
return self.create(path, 0o644) # should probably read umask here... |
|
self.firejail_command('get', path, cwd=self.tmpdir) |
|
shutil.move(self.tmpdir/base_name, outer_path) |
|
self.fd += 1 |
|
self.open_files[self.fd] = open(outer_path, mode) |
|
return self.fd |
|
|
|
def read(self, path, size, offset, fh): |
|
f = self.open_files[fh] |
|
f.seek(offset) |
|
return f.read(size) |
|
|
|
def readdir(self, path, fh): |
|
return self.get_dir_info(path).keys() |
|
|
|
# readlink not implemented |
|
|
|
def release(self, path, fh): |
|
logging.debug(f'releasing {path}') |
|
f = self.open_files.pop(fh) |
|
if '+' in f.mode or 'w' in f.mode: |
|
# I have no idea why, but the following two lines are important |
|
f.seek(0) |
|
f.read() |
|
os.fsync(f.fileno()) |
|
self.firejail_command('put', f.name, path) |
|
self.invalidate_caches() |
|
os.unlink(f.name) |
|
|
|
# removexattr not implemented |
|
# rename not implemented |
|
# rmdir not implemented |
|
# setxattr not implemented |
|
# statfs not implemented |
|
# symlink not implemented |
|
|
|
def truncate(self, path, length, fh=None): |
|
release = False |
|
if fh is None: |
|
fh = self.open(path, os.O_WRONLY) |
|
release = True |
|
self.open_files[fh].truncate(length) |
|
if release: |
|
self.release(path, fh) |
|
|
|
# unlink not implemented |
|
# utimens not implemented |
|
|
|
def write(self, path, data, offset, fh): |
|
logging.debug(f'write {data} to {path}') |
|
f = self.open_files[fh] |
|
f.seek(offset) |
|
return f.write(data) |
|
|
|
|
|
def get_args(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('sandbox') |
|
parser.add_argument('mountpoint') |
|
parser.add_argument( |
|
'--loglevel', |
|
default='INFO', |
|
type=str.upper, |
|
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'), |
|
) |
|
return parser.parse_args() |
|
|
|
if __name__ == '__main__': |
|
args = get_args() |
|
logging.basicConfig( |
|
level=getattr(logging, args.loglevel), |
|
format='%(asctime)s - %(levelname)s: %(message)s', |
|
) |
|
fuse = fuse.FUSE(FirejailFS(args.sandbox), args.mountpoint, foreground=True) |