Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active September 24, 2024 05:13
Show Gist options
  • Save ottomata/ed5dfb0640a7059be67d02ef650677c5 to your computer and use it in GitHub Desktop.
Save ottomata/ed5dfb0640a7059be67d02ef650677c5 to your computer and use it in GitHub Desktop.
fsspec GitLabFileSystem
from fsspec_gitlab import *
import fsspec
# Instantiate a gitlab repo filesystem
gitlab_repo = fsspec.filesystem("gitlab", base_uri="https://gitlab.wikimedia.org", project_path="repos/data-engineering/workflow_utils")
# list a directory
gitlab_repo.ls("workflow_utils", detail=True)
# open and read a file
with gitlab_repo.open("README.md") as f:
print(f.read().decode('utf-8'))
# copy the workflow_utils/ directory into the local filesystem
gitlab_repo.get("workflow_utils", "/tmp/workflow_utils", recursive=True)
# copied and modified from
# https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/github.py
import requests
import fsspec
from fsspec import AbstractFileSystem
from fsspec.implementations.memory import MemoryFile
def urlencode_path(path):
return requests.utils.quote(path, safe='')
class GitLabFileSystem(AbstractFileSystem):
"""Interface to files in gitlab
An instance of this class provides read only access to files residing
within a remote gitlab repository.
You may specify a point in the repos history, by SHA, branch
or tag (default is curreant master).
Given that code files tend to be small, and that gitlab does not support
retrieving partial content, we always fetch whole files.
When using fsspec.open, allows URIs of the form:
- "gitlab://path/file", in which case you must specify base_uri, project_path and
may specify sha in the extra kwargs.
You might want to configure some default GitLabFileSystem params, like for base_uri,
via https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration
``sha`` can be the full or abbreviated hex of the commit you want to fetch
from, or a branch or tag name (so long as it doesn't contain special characters
like "/", "?", which would have to be HTTP-encoded).
"""
protocol = "gitlab"
timeout_default = (60, 60) # connect, read timeouts
branch_name_default = "main"
def __init__(
self, base_uri, project_path, sha=None, timeout=None, **kwargs
):
super().__init__(**kwargs)
self.base_uri = base_uri
self.api_uri = f"{self.base_uri}/api/v4"
self.project_path = project_path
# url escape project path for use in GitLab API urls
self.project_path_escaped = urlencode_path(self.project_path)
if timeout is not None:
self.timeout = timeout
else:
self.timeout = self.timeout_default
self.sha = sha or self._get_default_branch()
@property
def kw(self):
# TODO?
# if self.username:
# return {"auth": (self.username, self.token)}
return {}
@property
def tags(self):
"""Names of tags in the repo"""
url = f"{self.api_uri}/projects/{self.project_path_escaped}/repository/tags"
r = requests.get(
url,
timeout=self.timeout,
**self.kw,
)
r.raise_for_status()
return [t["name"] for t in r.json()]
@property
def branches(self):
"""Names of branches in the repo"""
url = f"{self.api_uri}/projects/{self.project_path_escaped}/repository/branches"
r = requests.get(
url,
timeout=self.timeout,
**self.kw,
)
r.raise_for_status()
return [t["name"] for t in r.json()]
def _get_default_branch(self):
"""
The configured default branch name for the project
"""
url = f"{self.api_uri}/projects/{self.project_path_escaped}/repository/branches"
r = requests.get(
url,
timeout=self.timeout,
**self.kw,
)
r.raise_for_status()
return next((t["name"] for t in r.json() if t["default"]), [self.branch_name_default])
@property
def refs(self):
"""Named references, tags and branches"""
return {"tags": self.tags, "branches": self.branches}
# TODO: implement refresh param?
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.ls
def ls(self, path, detail=False, sha=None, **kwargs):
"""List files at given path
Parameters
----------
path: str
Location to list, relative to repo root
detail: bool
If True, returns list of dicts, one per file; if False, returns
list of full filenames only
sha: str (optional)
List at the given point in the repo history, branch or tag name or commit
SHA
"""
sha = sha or self.sha
if path not in self.dircache:
url = f"{self.api_uri}/projects/{self.project_path_escaped}/repository/tree?path={path}&ref={sha}&per_page=10000"
r = requests.get(
url,
timeout=self.timeout,
**self.kw,
)
if r.status_code == 404:
raise FileNotFoundError(path)
r.raise_for_status()
types = {"blob": "file", "tree": "directory"}
out = []
# get detail for each file listing
for f in r.json():
file_type = types[f["type"]]
file_detail = {
"name": f["path"],
"mode": f["mode"],
"type": file_type,
}
if file_type == "file":
# fsspec wants us to include size and sha in the
# ls detail output for files. Unfortunately this means
# another API call for each file.
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.ls
gitlab_file_metadata = self._gitlab_file_metadata(f["path"], sha)
file_detail["size"] = gitlab_file_metadata.get("size"),
# NOTE: this is fsspec content sha, not git ref sha.
file_detail["sha"] = gitlab_file_metadata.get("content_sha256")
out.append(file_detail)
# use dircache only for files from the instance's configured default sha
if sha == self.sha:
self.dircache[path] = out
else:
out = self.dircache[path]
if detail:
return out
else:
return sorted([f["name"] for f in out])
def _gitlab_file_metadata(self, path, sha=None):
"""
Returns GitLab file metadata for path.
This uses HEAD instead of GET to avoid fetching the file content.
https://docs.gitlab.com/ee/api/repository_files.html#get-file-metadata-only
This method only works on files. Directories will result in 404.
"""
sha = sha or self.sha
url = f"{self.api_uri}/projects/{self.project_path_escaped}/repository/files/{urlencode_path(path)}?ref={sha}"
r = requests.head(
url,
timeout=self.timeout,
**self.kw,
)
if r.status_code == 404:
raise FileNotFoundError(path)
r.raise_for_status()
header_to_key = {
"X-Gitlab-Blob-Id": "blob_id",
"X-Gitlab-Commit-Id": "commit_id",
"X-Gitlab-Content-Sha256": "content_sha256",
"X-Gitlab-Encoding": "encoding",
"X-Gitlab-File-Name": "file_name",
"X-Gitlab-File-Path": "file_path",
"X-Gitlab-Last-Commit-Id": "last_commit_id",
"X-Gitlab-Ref": "ref",
"X-Gitlab-Size": "size",
"X-Gitlab-Execute-Filemode": "execute_filemode",
}
out = {}
for header, key in header_to_key.items():
if header in r.headers:
out[key] = r.headers[header]
return out
def invalidate_cache(self, path=None):
self.dircache.clear()
def _open(
self,
path,
mode="rb",
block_size=None,
autocommit=True,
cache_options=None,
sha=None,
**kwargs,
):
if mode != "rb":
raise NotImplementedError
sha = sha or self.sha
url = f"{self.api_uri}/projects/{self.project_path_escaped}/repository/files/{urlencode_path(path)}/raw?ref={sha}"
r = requests.get(url, timeout=self.timeout, **self.kw)
if r.status_code == 404:
raise FileNotFoundError(path)
r.raise_for_status()
return MemoryFile(None, None, r.content)
# register GitLabFileSystem with fsspec on import.
# https://filesystem-spec.readthedocs.io/en/latest/developer.html#implementing-a-backend
fsspec.register_implementation(GitLabFileSystem.protocol, GitLabFileSystem)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment