Created
January 31, 2020 18:20
-
-
Save lambdalisue/3a0ea8579abbe32ede83f16ec745f5f8 to your computer and use it in GitHub Desktop.
Remove all artifacts from a particular repository
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import re | |
import time | |
from typing import Iterator, Tuple, Optional | |
import requests | |
# ================================================================ | |
TOKEN = "****************************************" | |
USER = "username" | |
REPO = "repository" | |
DRY = True | |
# ================================================================ | |
auth = (USER, TOKEN) | |
base = f"https://api.github.com/repos/{REPO}" | |
def iter_workflows(url: Optional[str] = None) -> Iterator[str]: | |
url = url or f"{base}/actions/workflows" | |
r = requests.get(url, auth=auth) | |
r.raise_for_status() | |
yield from (v["id"] for v in r.json()["workflows"]) | |
next_url = find_next_url(r.headers.get("link", "")) | |
if next_url: | |
yield from iter_workflows(next_url) | |
def iter_workflow_runs(workflow_id: str, url: Optional[str] = None) -> Iterator[str]: | |
url = url or f"{base}/actions/workflows/{workflow_id}/runs" | |
r = requests.get(url, auth=auth) | |
r.raise_for_status() | |
yield from (v["id"] for v in r.json()["workflow_runs"]) | |
next_url = find_next_url(r.headers.get("link", "")) | |
if next_url: | |
yield from iter_workflow_runs(workflow_id, next_url) | |
def iter_artifacts(run_id: str, url: Optional[str] = None) -> Iterator[Tuple[str, int]]: | |
url = url or f"{base}/actions/runs/{run_id}/artifacts" | |
r = requests.get(url, auth=auth) | |
r.raise_for_status() | |
yield from ((v["id"], v["size_in_bytes"]) for v in r.json()["artifacts"]) | |
next_url = find_next_url(r.headers.get("link", "")) | |
if next_url: | |
yield from iter_artifacts(run_id, next_url) | |
def delete_artifact(artifact_id: str) -> None: | |
r = requests.delete(f"{base}/actions/artifacts/{artifact_id}", auth=auth) | |
r.raise_for_status() | |
def find_next_url(link: str) -> Optional[str]: | |
pattern = re.compile('<(.*)>; rel="next"') | |
m = pattern.match(link) | |
if not m: | |
return None | |
return m.group(1) | |
def main(): | |
total_size_in_bytes = 0 | |
for workflow_id in iter_workflows(): | |
print(f"Listing workflow runs in workflow [{workflow_id}] ... ") | |
for workflow_run_id in iter_workflow_runs(workflow_id): | |
print(f" Listing artifacts in workflow run [{workflow_run_id}] ... ") | |
for artifact_id, size_in_bytes in iter_artifacts(workflow_run_id): | |
print( | |
f" Removing artifact [{artifact_id}] {size_in_bytes} bytes ... " | |
) | |
if not DRY: | |
delete_artifact(artifact_id) | |
time.sleep(0.1) | |
total_size_in_bytes += size_in_bytes | |
print(f"Deleted: {total_size_in_bytes} bytes") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment