Last active
July 14, 2017 06:24
-
-
Save ensonic/87e4108a7be64412d1c5a553b7e01f88 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# sudo apt-get install python-fuse | |
# | |
# python fusefs.py $HOME/temp/mount/ | |
# | |
# fusermount -u $HOME/temp/mount | |
# | |
# to debug issues run in foreground: | |
# python fusefs.py $HOME/temp/mount/ -d | |
import datetime | |
import errno | |
import fuse | |
import os | |
import stat | |
import sys | |
import time | |
import logging | |
logging.basicConfig(filename='fakefs.log', filemode='w', level=logging.INFO) | |
logger = logging.getLogger('fakefs') | |
if not hasattr(fuse, '__version__'): | |
raise RuntimeError("your fuse-py doesn't know of fuse.__version__") | |
fuse.fuse_python_api = (0, 2) | |
fuse.feature_assert('stateful_files', 'has_init') | |
TEST_FILE = ''' | |
First Line | |
Second Line | |
Last Line | |
''' | |
class FsAttr(): | |
read_only = 0x01 | |
directory = 0x10 | |
class Fakefs(fuse.Fuse): | |
def __init__(self, *args, **kw): | |
fuse.Fuse.__init__(self, *args, **kw) | |
self.root = '/' | |
self.uid = os.getuid() | |
self.gid = os.getgid() | |
logger.info('uid= %d, gid=%d', self.uid, self.gid) | |
self.files = {} | |
# add some special files, this is not perfect since they are not | |
# reported by readdir() | |
self.finfo = { | |
# we cannot stats the root of the filesystem, hardcode some values | |
'/': { | |
'attr': FsAttr.directory, | |
'size': 0 | |
}, | |
# a static file | |
'/test.txt': { | |
'attr': FsAttr.read_only, | |
'size': len(TEST_FILE), | |
'content': TEST_FILE | |
} | |
} | |
def getattr(self, path): | |
if path not in self.finfo: | |
return -errno.ENOENT | |
finfo = self.finfo[path] | |
logger.info('got attrs for %s: %s', path, repr(finfo)) | |
attr = finfo['attr'] | |
ts = int(time.time()) | |
st = fuse.Stat() | |
st.st_uid = self.uid | |
st.st_gid = self.gid | |
st.st_atime = ts | |
st.st_mtime = ts | |
st.st_ctime = ts | |
if attr & FsAttr.directory: # directory | |
if attr & FsAttr.read_only: | |
st.st_mode = stat.S_IFDIR | 0555 | |
else: | |
st.st_mode = stat.S_IFDIR | 0755 | |
st.st_nlink = 2 | |
else: # file | |
if attr & FsAttr.read_only: | |
st.st_mode = stat.S_IFREG | 0444 | |
else: | |
st.st_mode = stat.S_IFREG | 0644 | |
st.st_size = finfo['size'] | |
st.st_nlink = 1 | |
#logger.info('getattr(&s) = %s', path, vars(st)) | |
return st | |
def readdir(self, path, offset): | |
logger.info('get file list for %s at %d', path, offset) | |
# we need to take off the leading path | |
plen = len(path) | |
if plen > 1: | |
# for subdirs it is an extra '/', e.g. /dir/ | |
plen += 1 | |
for f in self.finfo: | |
fname = f.encode('ascii')[plen:] | |
if fname: | |
yield fuse.Direntry(fname) | |
# file i/o | |
def open(self, path, flags): | |
logger.info('open file %s with flags 0x%x', path, flags) | |
if path in self.finfo: | |
if 'content' not in self.finfo[path]: | |
return -errno.ENOENT | |
logger.info('open fake file %s', path) | |
self.files[path] = bytearray(self.finfo[path]['content']) | |
else: | |
logger.info('open new file %s', path) | |
self.files[path] = bytearray() | |
return 0 | |
def read(self, path, size, offset): | |
if path not in self.files: | |
logger.error('file %s has not been opened', path) | |
return -errno.EBADF | |
data = self.files[path] | |
logger.info('read from %s, offs %d, size %d, len %d', path, offset, | |
size, len(data)) | |
if offset < len(data): | |
if offset + size > len(data): | |
logger.info('read remainder') | |
buf = data[offset:] | |
else: | |
logger.info('read block') | |
buf = data[offset:offset+size] | |
else: | |
logger.info("eof for %s", path) | |
buf = '' # eof | |
logger.info("read() = %d bytes", len(buf)) | |
return buf | |
def release(self, path, flags): | |
if path not in self.files: | |
logger.error('file %s has not been opened', path) | |
return -errno.EBADF | |
logger.info('released(%s) = 0', path) | |
del self.files[path] | |
return 0 | |
def fsinit(self): | |
logger.info('mounted') | |
def fsdestroy(self): | |
logger.info('unmount') | |
def main(self, *a, **kw): | |
return fuse.Fuse.main(self, *a, **kw) | |
def main(): | |
usage = """ | |
Fake file system. | |
""" + fuse.Fuse.fusage | |
server = Fakefs(version="%prog " + fuse.__version__, | |
usage=usage, dash_s_do='setsingle') | |
server.parse(values=server, errex=1) | |
server.main() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment