Last active
October 28, 2015 17:45
-
-
Save korc/36ebfddf4e60a55a083a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import fuse | |
import os | |
import stat | |
import errno | |
import re | |
import time | |
import warnings | |
fuse.fuse_python_api=(0, 2) | |
class NoMatchException(ValueError): pass | |
class PatternIterSM(object): | |
rstrip=None | |
history_size=0 | |
def __init__(self, data_source, state="start", end_state=None, states=None): | |
if states is not None: self.states=states | |
if not hasattr(self, "states"): raise NotImplementedError("Need to define some states") | |
self.data_iter=iter(data_source) | |
self.state_history=[] | |
self.state=state | |
self.end_state=end_state | |
self.next_states=self.states[state][0] | |
def on_nomatch(self, data): | |
raise NoMatchException("No match (%s>%s -> %s): %r"%(">".join(self.state_history), self.state, ",".join(self.next_states), data)) | |
def on_excess_data(self, excess_data): | |
warnings.warn("Excess data after processing %r[%d] state: %r"%(self.state, self.match_index, excess_data)) | |
def __iter__(self): | |
while True: | |
try: data=self.data=self.data_iter.next() | |
except StopIteration: break | |
if self.rstrip is not None: | |
data=data.rstrip(self.rstrip) | |
match=self.match_index=None | |
for test_state in self.next_states: | |
for pat_nr, test_pat in enumerate(self.states[test_state][1]): | |
match=test_pat.match(data) | |
if match is not None: | |
self.match=match | |
self.match_index=pat_nr | |
break | |
if match is not None: | |
break | |
if match is None: | |
self.on_nomatch(data) | |
continue | |
if self.history_size: | |
if len(self.state_history)>self.history_size: | |
self.state_history.pop(0) | |
self.state_history.append(self.state) | |
self.state=test_state | |
ns=self.states[test_state][0] | |
if ns is not None: | |
self.next_states=ns | |
if match.end()<len(data): | |
self.on_excess_data(data[match.end():]) | |
yield self.match | |
if self.end_state is not None and self.state==self.end_state: | |
break | |
# TODO: support output of: find -ls, unsquashfs -ll | |
class ListFileSM(PatternIterSM): | |
"""Supported formats (with env LANG=C.UTF-8): | |
tar tvf <tarfile> (--numeric-owner recommended) | |
(adb shell) ls -Rasiln | |
(linux) ls -Rasiln --time-style=long-iso | |
""" | |
crnl_re=re.compile(r'^$') | |
history_size = 10 | |
rstrip="\r\n" | |
states=dict( | |
start=(["tart_line", "ls_start", "crnl"], ), | |
tart_line=(None, [re.compile(r'^(?: *(?P<inode>\d+) +(?:(?P<blocks>\d+) +)?)?' | |
r'(?P<type>[hdbcpls-])(?P<mode>[TStsrwx-]{9}) +' | |
r'(?:(?P<links>\d+) +)?' | |
r'(?P<user>[^/ ]+)(?:/| +)(?P<group>\S+) +' | |
r'(?:(?P<node>\d+, *\d+)|(?P<size>\d+))? +' | |
r'(?P<mtime>\d+-\d+-\d+ \d+:\d+) +' | |
r'(?:\./)?(?P<fname>.*)$')]), | |
crnl=(None, [crnl_re]), | |
ls_start=(["ls_total", "tart_line", "end_of_ls", "ls_noperm", "ls_nostat"], [re.compile(r'^(?:\./)?(?P<prefix>.+):$')]), | |
ls_total=(["tart_line", "end_of_ls"], [re.compile(r'total (?P<total_count>\d+)$')]), | |
end_of_ls=(["ls_start"], [crnl_re]), | |
ls_noperm=(["end_of_ls", "tart_noperm"], [re.compile(r'^opendir failed, Permission denied$')]), | |
ls_nostat=(["tart_line", "end_of_ls", "tart_noperm"], [re.compile(r'^stat failed on .*: Permission denied$')]), | |
tart_noperm=(None, [re.compile(r'.*: Permission denied')]), | |
) | |
class ListFS(fuse.Fuse): | |
@property | |
def dirtree(self): | |
try: return self._dirtree | |
except AttributeError: pass | |
ret=self._dirtree=self.get_dirtree() | |
return ret | |
def get_dirtree(self): | |
ret={} | |
with open(self.listfile) as f: | |
fname_prefix="" | |
for match in ListFileSM(f): | |
inf=match.groupdict() | |
if "prefix" in inf: | |
fname_prefix=inf["prefix"].strip("/") | |
if fname_prefix==".": fname_prefix="" | |
p=fname_prefix.split("/") | |
if fname_prefix: p.append("") | |
for pp, pf in map(lambda i: ("/".join(p[:i]), p[i]), range(len(p))): | |
children=ret.setdefault(pp, {"type": "d"}).setdefault("children", []) | |
if pf and pf not in children: | |
children.append(pf) | |
if "fname" not in inf: continue | |
fname="%s%s%s"%(fname_prefix, "/" if fname_prefix else "", inf.pop("fname")) | |
if inf["type"]=="d": | |
fname=fname.rstrip("/") | |
elif inf["type"]=="l": | |
try: fname, inf["target"]=fname.split(" -> ", 1) | |
except ValueError: pass | |
elif inf["type"]=="h": | |
fname, linkto=fname.split(" link to ", 1) | |
inf=ret[linkto] | |
ret.setdefault(fname, {}).update(inf) | |
if inf["type"]=="d": ret[fname].setdefault("children", []) | |
if fname: | |
parent=fname.rsplit("/", 1)[0] if "/" in fname else "" | |
ret.setdefault(parent, {"type": "d"}).setdefault("children", []).append(fname.rsplit("/", 1)[-1]) | |
return ret | |
stat_types={"d": stat.S_IFDIR, "b": stat.S_IFBLK, "c": stat.S_IFCHR, "p": stat.S_IFIFO, "l": stat.S_IFLNK, | |
"-": stat.S_IFREG, "s": stat.S_IFSOCK} | |
def_stats=dict(map(lambda n: (n, 0), filter(lambda n: n.startswith("st_"), dir(os.lstat("/"))))) | |
@staticmethod | |
def mode2int(mode): | |
ret=0 | |
for idx, xbit in enumerate((stat.S_ISUID, stat.S_ISGID, stat.S_ISVTX)): | |
char_pos=idx*3+2 | |
if mode[char_pos].lower() in ("s", "t"): | |
ret|=xbit | |
mode="%s%s%s"%(mode[:char_pos], "x" if mode[char_pos].islower() else "-", mode[char_pos+1:]) | |
ret|=int(mode.replace("-", "0").replace("r", "1").replace("w", "1").replace("x", "1"), 2) | |
return ret | |
def inf2stat(self, inf): | |
st=type("InfStat", (object,), self.def_stats.copy())() | |
st.st_mode=self.stat_types[inf["type"]] | |
if "mode" in inf: st.st_mode|=self.mode2int(inf["mode"]) | |
if "size" in inf and inf["size"] is not None: st.st_size=int(inf["size"]) | |
if "mtime" in inf: st.st_mtime=time.mktime(time.strptime(inf["mtime"], "%Y-%m-%d %H:%M")) | |
if inf.get("user"): | |
uid=inf["user"] | |
if uid.isdigit(): st.st_uid=int(uid) | |
if inf.get("group"): | |
gid=inf["group"] | |
if gid.isdigit(): st.st_gid=int(gid) | |
if inf.get("links"): st.st_nlink=int(inf["links"]) | |
if inf.get("inode"): st.st_ino=int(inf["inode"]) | |
if inf.get("blocks"): st.st_blocks=int(inf["blocks"])*2 | |
return st | |
def getattr(self, path): | |
try: inf=self.dirtree[path.lstrip("/")] | |
except KeyError: raise OSError(errno.ENOENT, "No such file or directory: %r"%(path,)) | |
return self.inf2stat(inf) | |
def readdir(self, path, offset): | |
dir_entry=self.dirtree[path.lstrip("/")] | |
yield fuse.Direntry(".") | |
yield fuse.Direntry("..") | |
for child in dir_entry["children"]: | |
yield fuse.Direntry(child) | |
def readlink(self, path): | |
return self.dirtree[path.lstrip("/")]["target"] | |
if __name__ == '__main__': | |
srv=ListFS() | |
srv.parser.add_option("--listfile", help="file containing output of 'env LANG=C.UTF-8 tar tv'") | |
srv.parse() | |
if not srv.parser.fuse_args.modifiers["showhelp"]: | |
if not srv.parser.values.listfile: | |
raise RuntimeError("--listfile option required, see -h for help") | |
srv.listfile=os.path.realpath(srv.parser.values.listfile) | |
srv.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment