Created
January 20, 2025 09:13
-
-
Save peterroelants/408f85d6a2b89a3cbd4f708f1abac876 to your computer and use it in GitHub Desktop.
Fsspec toy filesystem
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import io | |
from typing import Literal, TypedDict, TypeVar, Union | |
from fsspec.spec import AbstractFileSystem | |
# Create a recursive type for the store dictionary | |
T = TypeVar("T") | |
StoreDict = dict[str, Union[bytes, "StoreDict"]] | |
class FileInfo(TypedDict): | |
"""Type for file/directory information returned by ls().""" | |
name: str | |
type: Literal["file", "directory"] | |
size: int | |
class SimpleMemoryFileSystem(AbstractFileSystem): | |
""" | |
A simple in-memory filesystem for educational purposes. | |
This filesystem stores files and directories in a nested dictionary structure. | |
It implements the minimum required methods from AbstractFileSystem to be functional. | |
""" | |
store: StoreDict | |
protocol: str = "toy" | |
def __init__(self) -> None: | |
"""Initialize the filesystem with a predefined structure.""" | |
super().__init__() | |
# Initialize the store with a predefined directory structure | |
# / | |
# |--B | |
# | |--b.txt (contains "I'm B") | |
# | | |
# |--C | |
# | |--E | |
# | | |--e.txt (contains "I'm E") | |
# | | | |
# | |--D | |
# | | |--d.txt (contains "I'm D") | |
# | | | |
# | |--c.txt (contains "I'm C") | |
# | | |
# |--a.txt (contains "I'm A") | |
self.store = { | |
"B": {"b.txt": b"I'm B"}, | |
"C": { | |
"D": {"d.txt": b"I'm D"}, | |
"E": {"e.txt": b"I'm E"}, | |
"c.txt": b"I'm C", | |
}, | |
"a.txt": b"I'm A", | |
} | |
def ls( | |
self, | |
path: str, | |
detail: bool = True, | |
**kwargs, | |
) -> list[FileInfo] | list[str]: | |
""" | |
List objects at path. | |
Overrides `AbstractFileSystem.ls` to list objects in the in-memory store. | |
Args: | |
path: Path to list. | |
detail: If True, return a list of dictionaries with file information. | |
If False, return just a list of paths. | |
Returns: | |
List of paths or file information dictionaries. | |
""" | |
obj = self._get_object_at_path(path) | |
path_parts = self._get_path_parts(path) | |
parent_path = "/".join(path_parts) | |
if isinstance(obj, dict): # Directory | |
entries: list[FileInfo] = [] | |
for name, item in obj.items(): | |
full_path = f"{parent_path}/{name}" if parent_path else name | |
info: FileInfo = { | |
"name": full_path, | |
"type": "directory" if isinstance(item, dict) else "file", | |
"size": len(item) if isinstance(item, bytes) else 0, | |
} | |
entries.append(info) | |
elif isinstance(obj, bytes): # File | |
entries = [ | |
{ | |
"name": parent_path, | |
"type": "file", | |
"size": len(obj), | |
} | |
] | |
else: | |
raise ValueError(f"Unexpected object type: {type(obj)!r}") | |
if detail: | |
return entries | |
# Else, return just the names | |
return [entry["name"] for entry in entries] | |
def cat_file( | |
self, | |
path: str, | |
start: int | None = None, | |
end: int | None = None, | |
**kwargs, | |
) -> bytes: | |
""" | |
Get the contents of a file. | |
Overrides `AbstractFileSystem.cat_file` to provide byte range reading. | |
Args: | |
path: Path to the file. | |
start: Start position of the byte range to read (optional). | |
end: End position of the byte range to read (optional). | |
Returns: | |
File contents as bytes. | |
Raises: | |
IsADirectoryError: If the path points to a directory. | |
FileNotFoundError: If the path doesn't exist or isn't a file. | |
""" | |
super().cat_file | |
content = self._get_object_at_path(path) | |
if isinstance(content, dict): | |
raise IsADirectoryError(f"Path is a directory: {path!r}") | |
if not isinstance(content, bytes): | |
raise FileNotFoundError(f"Path is not a file: {path!r}") | |
if start is not None or end is not None: | |
return content[slice(start, end)] | |
return content | |
def cp_file(self, path1: str, path2: str, **kwargs) -> None: | |
""" | |
Copy a file from one location to another within the filesystem. | |
Overrides `AbstractFileSystem.cp_file`. | |
Args: | |
path1: Source path | |
path2: Destination path | |
**kwargs: Additional arguments (unused) | |
Raises: | |
FileNotFoundError: If source path doesn't exist | |
IsADirectoryError: If source path is a directory | |
""" | |
content = self._get_object_at_path(path1) | |
if isinstance(content, dict): | |
raise IsADirectoryError(f"Source path is a directory: {path1!r}") | |
if not isinstance(content, bytes): | |
raise FileNotFoundError(f"Source path not found: {path1!r}") | |
self._update_store(path2, content) | |
def _open( | |
self, | |
path: str, | |
mode: Literal["rb", "wb"], | |
block_size: int | None = None, | |
autocommit: bool = True, | |
**kwargs, | |
) -> io.BytesIO: | |
""" | |
Open a file and return a BytesIO object. | |
Overrides `AbstractFileSystem._open` to provide a BytesIO object for the file. | |
Args: | |
path: Path to the file. | |
mode: Mode to open file in ('rb' or 'wb'). | |
block_size: Ignored in this implementation. | |
autocommit: Ignored in this implementation. | |
Returns: | |
BytesIO object for the file. | |
Raises: | |
ValueError: If mode is not 'rb' or 'wb'. | |
IsADirectoryError: If the path points to a directory in read mode. | |
""" | |
if mode == "rb": | |
content = self._get_object_at_path(path) | |
if isinstance(content, dict): | |
raise IsADirectoryError(f"Path is a directory: {path!r}") | |
return io.BytesIO(content) | |
elif mode == "wb": | |
# Create a BytesIO object that will update the store when closed | |
bio = io.BytesIO() | |
# Write to the store when the BytesIO object is closed | |
bio.close = lambda: self._update_store(path, bio.getvalue()) # type: ignore | |
return bio | |
else: | |
raise ValueError(f"Unexpected mode: {mode!r}, only 'rb' and 'wb' modes are supported") | |
def rm_file(self, path: str) -> None: | |
""" | |
Delete one file or empty directory. | |
Overrides `AbstractFileSystem._rm` to remove files and empty directories from the store. | |
This method is called internally by both rm_file and rmdir. | |
Args: | |
path: Path to the file or directory to remove. | |
Raises: | |
FileNotFoundError: If the file or directory doesn't exist. | |
OSError: If attempting to remove a non-empty directory. | |
""" | |
parts = self._get_path_parts(path) | |
if not parts: | |
raise FileNotFoundError(f"No path specified: {path!r}") | |
# Navigate to the parent directory | |
current: StoreDict = self.store | |
for part in parts[:-1]: | |
if part not in current or not isinstance(current[part], dict): | |
raise FileNotFoundError(f"Path not found: {path!r}") | |
current = current[part] # type: ignore | |
# Check if the path exists | |
name = parts[-1] | |
if name not in current: | |
raise FileNotFoundError(f"Path not found: {path!r}") | |
# If it's a directory, check if it's empty | |
if isinstance(current[name], dict): | |
if current[name]: # type: ignore | |
raise OSError(f"Directory not empty: {path!r}") | |
# Remove the file or directory | |
del current[name] | |
def rmdir(self, path: str) -> None: | |
""" | |
Remove a directory. | |
Overrides `AbstractFileSystem.rmdir` to remove an empty directory. | |
Args: | |
path: Path to the directory to remove. | |
Raises: | |
FileNotFoundError: If the directory doesn't exist. | |
NotADirectoryError: If the path points to a file. | |
OSError: If the directory is not empty. | |
""" | |
obj = self._get_object_at_path(path) | |
if not isinstance(obj, dict): | |
raise NotADirectoryError(f"Path is not a directory: {path}") | |
self.rm_file(path) | |
def mkdir(self, path: str, create_parents: bool = True, **kwargs) -> None: | |
""" | |
Create directory entry at path. | |
Overrides `AbstractFileSystem.mkdir` to create directories in the store. | |
Args: | |
path: Path to the directory to create. | |
create_parents: If True, create parent directories if they do not exist. | |
Raises: | |
FileExistsError: If the directory already exists. | |
NotADirectoryError: If a parent component of the path exists as a file. | |
""" | |
parts = self._get_path_parts(path) | |
if not parts: | |
raise FileExistsError("Cannot create root directory '/'") | |
# Navigate the path, creating parent directories if needed | |
current: StoreDict = self.store | |
for i, part in enumerate(parts): | |
# Handle the last part (the directory we want to create) | |
if i == len(parts) - 1: | |
if part in current: | |
raise FileExistsError(f"Path already exists: {path!r}") | |
current[part] = {} | |
continue | |
# Handle intermediate parts (parent directories) | |
if part not in current: | |
if not create_parents: | |
raise FileNotFoundError(f"Parent directory does not exist: {'/'.join(parts[: i + 1])}") | |
current[part] = {} | |
elif not isinstance(current[part], dict): | |
raise NotADirectoryError(f"Cannot create directory '{path}': '{'/'.join(parts[: i + 1])}' is a file") | |
current = current[part] # type: ignore | |
def makedirs(self, path: str, exist_ok: bool = False) -> None: | |
""" | |
Recursively make directories. | |
Overrides `AbstractFileSystem.makedirs` to create a directory and any non-existent | |
parent directories. Similar to `mkdir` with `create_parents=True` but with | |
`exist_ok` parameter to control behavior when directory exists. | |
Args: | |
path: Path to the directory to create. | |
exist_ok: If False, error if target directory already exists. | |
Raises: | |
FileExistsError: If the directory exists and exist_ok=False. | |
NotADirectoryError: If a parent component exists as a file. | |
""" | |
try: | |
self.mkdir(path, create_parents=True) | |
except FileExistsError: | |
if not exist_ok: | |
raise | |
def _update_store(self, path: str, content: bytes) -> None: | |
""" | |
Update the store with new file content. | |
Args: | |
path: Path to the file. | |
content: New content for the file. | |
""" | |
parts = self._get_path_parts(path) | |
# Navigate to the parent directory | |
current: StoreDict = self.store | |
for part in parts[:-1]: | |
if part not in current or not isinstance(current[part], dict): | |
current[part] = {} | |
current = current[part] # type: ignore | |
# Update or create the file | |
current[parts[-1]] = content | |
def _get_path_parts(self, path: str) -> list[str]: | |
""" | |
Split a path into its components, removing empty parts and protocol. | |
Args: | |
path: The path to split. | |
Returns: | |
List of path components. | |
""" | |
path = self._strip_protocol(path) | |
parts = path.strip("/").split("/") | |
return [p for p in parts if p] | |
def _get_object_at_path(self, path: str) -> StoreDict | bytes: | |
""" | |
Get the object (file or directory) at the given path. | |
Args: | |
path: The path to look up. | |
Returns: | |
The object at the path. | |
Raises: | |
FileNotFoundError: If the path doesn't exist. | |
""" | |
if not path or path == "/": # Root directory | |
return self.store | |
current: StoreDict | bytes = self.store | |
for part in self._get_path_parts(path): | |
if isinstance(current, dict) and part in current: | |
current = current[part] # type: ignore | |
else: | |
raise FileNotFoundError(f"Path not found: {path!r}") | |
return current | |
if __name__ == "__main__": | |
# Create a filesystem instance | |
fs = SimpleMemoryFileSystem() | |
# List contents | |
print(f"{fs.ls('/', detail=False)=}") # ['B', 'C', 'a.txt'] | |
print(f"{fs.ls('/C/E', detail=False)=}") # ['C/E/e.txt'] | |
# Read a file | |
with fs.open("/C/E/e.txt", "rb") as f: | |
print(f"{f.read()=!r}") # b"I'm E" | |
# Create and remove an empty directory | |
fs.mkdir("/D/new_dir") | |
print(f"{fs.ls('/D', detail=False)=}") # ['D/new_dir'] | |
print(f"{fs.ls('/D/new_dir', detail=False)=}") # [] | |
# Write a new file | |
with fs.open("/D/new_dir/newfile.txt", "wb") as f: | |
f.write(b"I'm a new file!") | |
# Verify the write worked | |
with fs.open("/D/new_dir/newfile.txt", "rb") as f: | |
print(f"{f.read()=!r}") # b"I'm a new file!" | |
# Remove dir and file | |
fs.rm("/C/D", recursive=True) | |
try: | |
fs.ls("/C/D", detail=False) | |
except FileNotFoundError as e: | |
print(f"{e!s}") # Path not found: '/C/D' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment