Created
October 2, 2022 22:58
-
-
Save marco-brandizi/a3b6c04f6fc4c358ba222226b41304f6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function, absolute_import, division | |
import logging | |
import os | |
from errno import EACCES | |
from os.path import realpath | |
from threading import Lock | |
from fuse import FUSE, FuseOSError, Operations, LoggingMixIn | |
# Source: https://gist.github.com/ChuckFoo/9438ef663bad0d2d020b | |
# | |
def getCaseInsensitivePath(path, RET_FOUND=False): | |
''' | |
Get a case insensitive path on a case sensitive system | |
RET_FOUND is for internal use only, to avoid too many calls to os.path.exists | |
# Example usage | |
getCaseInsensitivePath('/hOmE/mE/sOmEpAtH.tXt') | |
''' | |
if path=='' or os.path.exists(path): | |
if RET_FOUND: ret = path, True | |
else: ret = path | |
return ret | |
f = os.path.basename(path) # f may be a directory or a file | |
d = os.path.dirname(path) | |
suffix = '' | |
if not f: # dir ends with a slash? | |
if len(d) < len(path): | |
suffix = path[:len(path)-len(d)] | |
f = os.path.basename(d) | |
d = os.path.dirname(d) | |
if not os.path.exists(d): | |
d, found = getCaseInsensitivePath(d, True) | |
if not found: | |
if RET_FOUND: ret = path, False | |
else: ret = path | |
return ret | |
# at this point, the directory exists but not the file | |
try: # we are expecting 'd' to be a directory, but it could be a file | |
files = os.listdir(d) | |
except: | |
if RET_FOUND: ret = path, False | |
else: ret = path | |
return ret | |
f_low = f.lower() | |
try: f_nocase = [fl for fl in files if fl.lower() == f_low][0] | |
except: f_nocase = None | |
if f_nocase: | |
if RET_FOUND: ret = os.path.join(d, f_nocase) + suffix, True | |
else: ret = os.path.join(d, f_nocase) + suffix | |
return ret | |
else: | |
if RET_FOUND: ret = path, False | |
else: ret = path | |
return ret # cant find the right one, just return the path as is. | |
# Adapted from: | |
# https://gist.github.com/ChuckFoo/9438ef663bad0d2d020b | |
# https://raw.githubusercontent.com/fusepy/fusepy/master/examples/loopback.py | |
class CIWrapper(LoggingMixIn, Operations): | |
def __init__(self, root): | |
self.root = realpath(root) | |
self.rwlock = Lock() | |
def __call__(self, op, path, *args): | |
return super(Loopback, self).__call__(op, self.root + path, *args) | |
def access(self, path, mode): | |
if not os.access( getCaseInsensitivePath ( path ), mode): | |
raise FuseOSError(EACCES) | |
def chmod(self, path, mode): | |
return os.chmod( getCaseInsensitivePath ( path ), mode ) | |
def chown(self, path, uid, gid): | |
return os.chown( getCaseInsensitivePath ( path ), uid, gid) | |
# TODO: continue from here | |
def create(self, path, mode): | |
return os.open( getCaseInsensitivePath ( path ), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode) | |
def flush(self, path, fh): | |
return os.fsync(fh) | |
def fsync(self, path, datasync, fh): | |
if datasync != 0: | |
return os.fdatasync(fh) | |
else: | |
return os.fsync(fh) | |
def getattr(self, path, fh=None): | |
st = os.lstat( getCaseInsensitivePath ( path ) ) | |
return dict((key, getattr(st, key)) for key in ( | |
'st_atime', 'st_ctime', 'st_gid', 'st_mode', 'st_mtime', | |
'st_nlink', 'st_size', 'st_uid')) | |
getxattr = None | |
def link(self, target, source): | |
return os.link( getCaseInsensitivePath ( self.root + source), getCaseInsensitivePath ( target ) ) | |
listxattr = None | |
def mkdir(self, path, mode): | |
return os.mkdir ( getCaseInsensitivePath ( path ), mode) | |
def mknod(self, path, mode, dev): | |
return os.mknod( getCaseInsensitivePath ( path ), mode, dev) | |
def open(self, path, flags): | |
return os.open ( getCaseInsensitivePath ( path ), flags) | |
def read(self, path, size, offset, fh): | |
with self.rwlock: | |
os.lseek(fh, offset, 0) | |
return os.read(fh, size) | |
def readdir(self, path, fh): | |
return ['.', '..'] + os.listdir ( getCaseInsensitivePath ( path ) ) | |
def readlink(self, path): | |
return os.readlink ( getCaseInsensitivePath ( path ) ) | |
def release(self, path, fh): | |
return os.close(fh) | |
def rename(self, old, new): | |
return os.rename ( getCaseInsensitivePath ( old ), getCaseInsensitivePath ( self.root + new ) ) | |
def rmdir(self, path): | |
return os.rmdir ( getCaseInsensitivePath ( path ) ) | |
def statfs(self, path): | |
stv = os.statvfs ( getCaseInsensitivePath ( path ) ) | |
return dict((key, getattr(stv, key)) for key in ( | |
'f_bavail', 'f_bfree', 'f_blocks', 'f_bsize', 'f_favail', | |
'f_ffree', 'f_files', 'f_flag', 'f_frsize', 'f_namemax')) | |
def symlink(self, target, source): | |
return os.symlink ( getCaseInsensitivePath ( source ), target ) | |
def truncate(self, path, length, fh=None): | |
with open ( getCaseInsensitivePath ( path ), 'r+' ) as f: | |
f.truncate(length) | |
def unlink(self, path): | |
return os.unlink ( getCaseInsensitivePath ( path ) ) | |
def utimens(self, path, times=None): | |
return os.utime ( getCaseInsensitivePath ( path ), times ) | |
def write(self, path, data, offset, fh): | |
with self.rwlock: | |
os.lseek(fh, offset, 0) | |
return os.write(fh, data) | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('root') | |
parser.add_argument('mount') | |
args = parser.parse_args() | |
logging.basicConfig(level=logging.INFO) | |
fuse = FUSE( | |
CIWrapper(args.root), args.mount, foreground=True, allow_other=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Adding refs.