Skip to content

Instantly share code, notes, and snippets.

@peterroelants
Created January 20, 2025 09:13
Show Gist options
  • Save peterroelants/408f85d6a2b89a3cbd4f708f1abac876 to your computer and use it in GitHub Desktop.
Save peterroelants/408f85d6a2b89a3cbd4f708f1abac876 to your computer and use it in GitHub Desktop.
Fsspec toy filesystem
from __future__ import annotations
import io
from typing import Literal, TypedDict, TypeVar, Union
from fsspec.spec import AbstractFileSystem
# Create a recursive type for the store dictionary
T = TypeVar("T")
StoreDict = dict[str, Union[bytes, "StoreDict"]]
class FileInfo(TypedDict):
"""Type for file/directory information returned by ls()."""
name: str
type: Literal["file", "directory"]
size: int
class SimpleMemoryFileSystem(AbstractFileSystem):
"""
A simple in-memory filesystem for educational purposes.
This filesystem stores files and directories in a nested dictionary structure.
It implements the minimum required methods from AbstractFileSystem to be functional.
"""
store: StoreDict
protocol: str = "toy"
def __init__(self) -> None:
"""Initialize the filesystem with a predefined structure."""
super().__init__()
# Initialize the store with a predefined directory structure
# /
# |--B
# | |--b.txt (contains "I'm B")
# |
# |--C
# | |--E
# | | |--e.txt (contains "I'm E")
# | |
# | |--D
# | | |--d.txt (contains "I'm D")
# | |
# | |--c.txt (contains "I'm C")
# |
# |--a.txt (contains "I'm A")
self.store = {
"B": {"b.txt": b"I'm B"},
"C": {
"D": {"d.txt": b"I'm D"},
"E": {"e.txt": b"I'm E"},
"c.txt": b"I'm C",
},
"a.txt": b"I'm A",
}
def ls(
self,
path: str,
detail: bool = True,
**kwargs,
) -> list[FileInfo] | list[str]:
"""
List objects at path.
Overrides `AbstractFileSystem.ls` to list objects in the in-memory store.
Args:
path: Path to list.
detail: If True, return a list of dictionaries with file information.
If False, return just a list of paths.
Returns:
List of paths or file information dictionaries.
"""
obj = self._get_object_at_path(path)
path_parts = self._get_path_parts(path)
parent_path = "/".join(path_parts)
if isinstance(obj, dict): # Directory
entries: list[FileInfo] = []
for name, item in obj.items():
full_path = f"{parent_path}/{name}" if parent_path else name
info: FileInfo = {
"name": full_path,
"type": "directory" if isinstance(item, dict) else "file",
"size": len(item) if isinstance(item, bytes) else 0,
}
entries.append(info)
elif isinstance(obj, bytes): # File
entries = [
{
"name": parent_path,
"type": "file",
"size": len(obj),
}
]
else:
raise ValueError(f"Unexpected object type: {type(obj)!r}")
if detail:
return entries
# Else, return just the names
return [entry["name"] for entry in entries]
def cat_file(
self,
path: str,
start: int | None = None,
end: int | None = None,
**kwargs,
) -> bytes:
"""
Get the contents of a file.
Overrides `AbstractFileSystem.cat_file` to provide byte range reading.
Args:
path: Path to the file.
start: Start position of the byte range to read (optional).
end: End position of the byte range to read (optional).
Returns:
File contents as bytes.
Raises:
IsADirectoryError: If the path points to a directory.
FileNotFoundError: If the path doesn't exist or isn't a file.
"""
super().cat_file
content = self._get_object_at_path(path)
if isinstance(content, dict):
raise IsADirectoryError(f"Path is a directory: {path!r}")
if not isinstance(content, bytes):
raise FileNotFoundError(f"Path is not a file: {path!r}")
if start is not None or end is not None:
return content[slice(start, end)]
return content
def cp_file(self, path1: str, path2: str, **kwargs) -> None:
"""
Copy a file from one location to another within the filesystem.
Overrides `AbstractFileSystem.cp_file`.
Args:
path1: Source path
path2: Destination path
**kwargs: Additional arguments (unused)
Raises:
FileNotFoundError: If source path doesn't exist
IsADirectoryError: If source path is a directory
"""
content = self._get_object_at_path(path1)
if isinstance(content, dict):
raise IsADirectoryError(f"Source path is a directory: {path1!r}")
if not isinstance(content, bytes):
raise FileNotFoundError(f"Source path not found: {path1!r}")
self._update_store(path2, content)
def _open(
self,
path: str,
mode: Literal["rb", "wb"],
block_size: int | None = None,
autocommit: bool = True,
**kwargs,
) -> io.BytesIO:
"""
Open a file and return a BytesIO object.
Overrides `AbstractFileSystem._open` to provide a BytesIO object for the file.
Args:
path: Path to the file.
mode: Mode to open file in ('rb' or 'wb').
block_size: Ignored in this implementation.
autocommit: Ignored in this implementation.
Returns:
BytesIO object for the file.
Raises:
ValueError: If mode is not 'rb' or 'wb'.
IsADirectoryError: If the path points to a directory in read mode.
"""
if mode == "rb":
content = self._get_object_at_path(path)
if isinstance(content, dict):
raise IsADirectoryError(f"Path is a directory: {path!r}")
return io.BytesIO(content)
elif mode == "wb":
# Create a BytesIO object that will update the store when closed
bio = io.BytesIO()
# Write to the store when the BytesIO object is closed
bio.close = lambda: self._update_store(path, bio.getvalue()) # type: ignore
return bio
else:
raise ValueError(f"Unexpected mode: {mode!r}, only 'rb' and 'wb' modes are supported")
def rm_file(self, path: str) -> None:
"""
Delete one file or empty directory.
Overrides `AbstractFileSystem._rm` to remove files and empty directories from the store.
This method is called internally by both rm_file and rmdir.
Args:
path: Path to the file or directory to remove.
Raises:
FileNotFoundError: If the file or directory doesn't exist.
OSError: If attempting to remove a non-empty directory.
"""
parts = self._get_path_parts(path)
if not parts:
raise FileNotFoundError(f"No path specified: {path!r}")
# Navigate to the parent directory
current: StoreDict = self.store
for part in parts[:-1]:
if part not in current or not isinstance(current[part], dict):
raise FileNotFoundError(f"Path not found: {path!r}")
current = current[part] # type: ignore
# Check if the path exists
name = parts[-1]
if name not in current:
raise FileNotFoundError(f"Path not found: {path!r}")
# If it's a directory, check if it's empty
if isinstance(current[name], dict):
if current[name]: # type: ignore
raise OSError(f"Directory not empty: {path!r}")
# Remove the file or directory
del current[name]
def rmdir(self, path: str) -> None:
"""
Remove a directory.
Overrides `AbstractFileSystem.rmdir` to remove an empty directory.
Args:
path: Path to the directory to remove.
Raises:
FileNotFoundError: If the directory doesn't exist.
NotADirectoryError: If the path points to a file.
OSError: If the directory is not empty.
"""
obj = self._get_object_at_path(path)
if not isinstance(obj, dict):
raise NotADirectoryError(f"Path is not a directory: {path}")
self.rm_file(path)
def mkdir(self, path: str, create_parents: bool = True, **kwargs) -> None:
"""
Create directory entry at path.
Overrides `AbstractFileSystem.mkdir` to create directories in the store.
Args:
path: Path to the directory to create.
create_parents: If True, create parent directories if they do not exist.
Raises:
FileExistsError: If the directory already exists.
NotADirectoryError: If a parent component of the path exists as a file.
"""
parts = self._get_path_parts(path)
if not parts:
raise FileExistsError("Cannot create root directory '/'")
# Navigate the path, creating parent directories if needed
current: StoreDict = self.store
for i, part in enumerate(parts):
# Handle the last part (the directory we want to create)
if i == len(parts) - 1:
if part in current:
raise FileExistsError(f"Path already exists: {path!r}")
current[part] = {}
continue
# Handle intermediate parts (parent directories)
if part not in current:
if not create_parents:
raise FileNotFoundError(f"Parent directory does not exist: {'/'.join(parts[: i + 1])}")
current[part] = {}
elif not isinstance(current[part], dict):
raise NotADirectoryError(f"Cannot create directory '{path}': '{'/'.join(parts[: i + 1])}' is a file")
current = current[part] # type: ignore
def makedirs(self, path: str, exist_ok: bool = False) -> None:
"""
Recursively make directories.
Overrides `AbstractFileSystem.makedirs` to create a directory and any non-existent
parent directories. Similar to `mkdir` with `create_parents=True` but with
`exist_ok` parameter to control behavior when directory exists.
Args:
path: Path to the directory to create.
exist_ok: If False, error if target directory already exists.
Raises:
FileExistsError: If the directory exists and exist_ok=False.
NotADirectoryError: If a parent component exists as a file.
"""
try:
self.mkdir(path, create_parents=True)
except FileExistsError:
if not exist_ok:
raise
def _update_store(self, path: str, content: bytes) -> None:
"""
Update the store with new file content.
Args:
path: Path to the file.
content: New content for the file.
"""
parts = self._get_path_parts(path)
# Navigate to the parent directory
current: StoreDict = self.store
for part in parts[:-1]:
if part not in current or not isinstance(current[part], dict):
current[part] = {}
current = current[part] # type: ignore
# Update or create the file
current[parts[-1]] = content
def _get_path_parts(self, path: str) -> list[str]:
"""
Split a path into its components, removing empty parts and protocol.
Args:
path: The path to split.
Returns:
List of path components.
"""
path = self._strip_protocol(path)
parts = path.strip("/").split("/")
return [p for p in parts if p]
def _get_object_at_path(self, path: str) -> StoreDict | bytes:
"""
Get the object (file or directory) at the given path.
Args:
path: The path to look up.
Returns:
The object at the path.
Raises:
FileNotFoundError: If the path doesn't exist.
"""
if not path or path == "/": # Root directory
return self.store
current: StoreDict | bytes = self.store
for part in self._get_path_parts(path):
if isinstance(current, dict) and part in current:
current = current[part] # type: ignore
else:
raise FileNotFoundError(f"Path not found: {path!r}")
return current
if __name__ == "__main__":
# Create a filesystem instance
fs = SimpleMemoryFileSystem()
# List contents
print(f"{fs.ls('/', detail=False)=}") # ['B', 'C', 'a.txt']
print(f"{fs.ls('/C/E', detail=False)=}") # ['C/E/e.txt']
# Read a file
with fs.open("/C/E/e.txt", "rb") as f:
print(f"{f.read()=!r}") # b"I'm E"
# Create and remove an empty directory
fs.mkdir("/D/new_dir")
print(f"{fs.ls('/D', detail=False)=}") # ['D/new_dir']
print(f"{fs.ls('/D/new_dir', detail=False)=}") # []
# Write a new file
with fs.open("/D/new_dir/newfile.txt", "wb") as f:
f.write(b"I'm a new file!")
# Verify the write worked
with fs.open("/D/new_dir/newfile.txt", "rb") as f:
print(f"{f.read()=!r}") # b"I'm a new file!"
# Remove dir and file
fs.rm("/C/D", recursive=True)
try:
fs.ls("/C/D", detail=False)
except FileNotFoundError as e:
print(f"{e!s}") # Path not found: '/C/D'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment