Created
July 23, 2025 14:25
-
-
Save solvingj/612132fdd031c2b9b6c2ff6c93023d48 to your computer and use it in GitHub Desktop.
failed attempt dulwich and fsspec pure python in-memory clone
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import fsspec | |
from io import BytesIO | |
from dulwich.client import get_transport_and_path | |
from dulwich.objects import ShaFile | |
from dulwich.repo import BaseRepo | |
from dulwich.object_store import BaseObjectStore | |
from dulwich.refs import DictRefsContainer | |
from dulwich.errors import NotGitRepository | |
from dulwich.index import update_working_directory | |
class FsspecObjectStore(BaseObjectStore): | |
def __init__(self, fs, base_path): | |
self.fs = fs | |
self.base_path = base_path.rstrip("/") | |
def _object_path(self, sha): | |
sha_hex = sha.hexdigest() | |
return f"{self.base_path}/objects/{sha_hex[:2]}/{sha_hex[2:]}" | |
def add_object(self, obj): | |
path = self._object_path(obj.id) | |
data = obj.as_legacy_object() | |
self.fs.pipe(path, data) | |
return obj.id | |
def get_raw(self, sha): | |
path = self._object_path(sha) | |
if not self.fs.exists(path): | |
raise KeyError(sha) | |
data = self.fs.cat(path) | |
return ShaFile.from_raw_string(data).type_name, data | |
def __iter__(self): | |
objdir = f"{self.base_path}/objects" | |
if not self.fs.exists(objdir): | |
return | |
for dir_entry in self.fs.ls(objdir): | |
subdir = os.path.basename(dir_entry) | |
if len(subdir) != 2: | |
continue | |
files = self.fs.ls(f"{objdir}/{subdir}") | |
for f in files: | |
yield bytes.fromhex(f"{subdir}{os.path.basename(f)}") | |
class InMemoryRepo(BaseRepo): | |
def __init__(self, fs, base_path="memory://.git"): | |
self.fs = fs | |
self._controldir = base_path.rstrip("/") | |
self._object_store = FsspecObjectStore(fs, self._controldir) | |
self._refs_container = DictRefsContainer() | |
@property | |
def object_store(self): | |
return self._object_store | |
def get_refs_container(self): | |
return self._refs_container | |
def close(self): | |
pass | |
# Memory-backed checkout function | |
def clone_into_memory(remote_url, memory_repo_path="memory://.git", worktree_path="memory://worktree"): | |
fs = fsspec.filesystem("memory") | |
# 1. Create in-memory repo | |
repo = InMemoryRepo(fs, memory_repo_path) | |
# 2. Clone objects and refs | |
client, path = get_transport_and_path(remote_url.encode()) | |
refs = client.fetch(path, repo) | |
# 3. Set HEAD | |
head_ref = refs.get(b"HEAD") or refs.get(b"refs/heads/master") | |
if not head_ref: | |
raise Exception("Could not determine HEAD") | |
repo.refs.set_symbolic_ref(b"HEAD", b"refs/heads/master") | |
repo.refs[b"refs/heads/master"] = head_ref | |
# 4. Materialize working directory into memory | |
def file_writer(path, executable, contents): | |
full_path = os.path.join(worktree_path.replace("memory://", ""), path.decode()) | |
mem_path = f"memory://{full_path}" | |
fs.makedirs(os.path.dirname(mem_path), exist_ok=True) | |
fs.pipe(mem_path, contents) | |
tree = repo[repo.refs[b"HEAD"]].tree | |
update_working_directory(repo.object_store, tree, file_writer) | |
return fs, repo | |
# ✅ Try it | |
if __name__ == "__main__": | |
fs, repo = clone_into_memory("https://github.com/jelmer/dulwich") | |
print("Files in memory://worktree:") | |
print(fs.ls("memory://worktree")) | |
print("\nREADME.md (first line):") | |
print(fs.read_text("memory://worktree/README.md").splitlines()[0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment