Last active
September 29, 2015 13:49
-
-
Save ThiefMaster/11269741 to your computer and use it in GitHub Desktop.
Small FUSE filesystem that maps TV series into subfolders so e.g. XBMC picks them up properly. Any file matching _EPISODE_RE will show up as a symlink in a folder named after the series. If there's a .srt file with the same name, another symlink is "created".
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import re | |
import sys | |
from errno import ENOENT, EINVAL | |
from stat import S_IFDIR, S_IFLNK | |
from fuse import FUSE, Operations, LoggingMixIn, FuseOSError | |
EPISODE_REGEXES = (re.compile(r'^(?P<series>.+?)\.? - S0*\d+E0*\d+ - .+\.mkv$'), | |
re.compile(r'^(?P<series>.+?)\.+S0*\d+E0*\d+.+\.mkv$'), | |
re.compile(r'^(?P<series>.+?)\.+0*\d+x0*\d+.+\.mkv$')) | |
NAME_MAP = {'Doctor Who': 'Doctor Who (2005)', | |
'The Flash': 'The Flash (2014)', | |
'Atlantis': 'Atlantis (2013)'} | |
def _iter_series(path): | |
for name in os.listdir(path): | |
for i, regex in enumerate(EPISODE_REGEXES): | |
m = regex.match(name) | |
if m: | |
series = m.group('series') | |
if i > 0: | |
series = series.replace('.', ' ') | |
series = series.replace('_', ' ') | |
series = series.title() | |
series = NAME_MAP.get(series, series) | |
yield os.path.join(path, name), name, series | |
def _get_series_names(path): | |
return {x[2] for x in _iter_series(path)} | |
def _get_episodes_by_series(root, series): | |
return [x for x in _iter_series(root) if x[2] == series] | |
def _get_episode_by_filename(root, series, filename): | |
episodes = _get_episodes_by_series(root, series) | |
for path, name, data in episodes: | |
if name == filename: | |
return path, name, data | |
return None | |
class SeriesFuse(Operations): | |
def __init__(self, root): | |
self.root = root | |
def getattr(self, path, fh=None): | |
# / is always a folder containing only folders | |
if path == '/': | |
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=len(self.readdir(path, fh))) | |
segments = path[1:].split('/') | |
# /foo is either a folder containing only symlinks or ENOENT | |
if len(segments) == 1: | |
folder = path[1:] | |
if folder in _get_series_names(self.root): | |
mtime = int(max(os.path.getmtime(item[0]) for item in _get_episodes_by_series(self.root, folder))) | |
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2, st_mtime=mtime) | |
# /foo/bar is always a symlink | |
elif len(segments) == 2: | |
return dict(st_mode=(S_IFLNK | 0o777), st_nlink=1) | |
raise FuseOSError(ENOENT) | |
def readdir(self, path, fh): | |
default_files = {'.', '..'} | |
# / contains the list of series | |
if path == '/': | |
return default_files | _get_series_names(self.root) | |
segments = path[1:].split('/') | |
# /foo contains the episodes and possibly subtitles | |
if len(segments) == 1: | |
mkvs = set() | |
srts = set() | |
for item in _get_episodes_by_series(self.root, segments[0]): | |
# item is (path, name, data) | |
mkvs.add(item[1]) | |
subpath = os.path.splitext(item[0])[0] + '.srt' | |
if os.path.exists(subpath): | |
srts.add(os.path.basename(subpath)) | |
return default_files | mkvs | srts | |
raise FuseOSError(ENOENT) | |
def readlink(self, path): | |
if path == '/': | |
raise FuseOSError(EINVAL) | |
segments = path[1:].split('/') | |
if len(segments) == 2: | |
basename, ext = os.path.splitext(segments[1]) | |
if ext == '.srt': | |
episode = _get_episode_by_filename(self.root, segments[0], basename + '.mkv') | |
if episode: | |
return os.path.splitext(episode[0])[0] + '.srt' | |
else: | |
episode = _get_episode_by_filename(self.root, segments[0], segments[1]) | |
if episode: | |
return episode[0] | |
raise FuseOSError(EINVAL) | |
def main(): | |
try: | |
mountpoint = sys.argv[1] | |
root = sys.argv[2] | |
except IndexError: | |
print('Usage: {} <mountpoint> <root>'.format(sys.argv[0])) | |
sys.exit(1) | |
FUSE(SeriesFuse(root), mountpoint, allow_other=True) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment