Last active
February 4, 2024 18:27
-
-
Save st1vms/1e7c0fc573807402926e7f5b77a5d034 to your computer and use it in GitHub Desktop.
Git API utility wrappers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Look for the latest commit containing a file with a specific string in it""" | |
from typing import List, Dict | |
from base64 import b64decode | |
from requests import get as http_get | |
def get_commit_history( | |
owner: str, repo: str, access_token: str, quiet: bool = False | |
) -> List[Dict]: | |
"""Retrieves commit history for a repository""" | |
url = f"https://api.github.com/repos/{owner}/{repo}/commits" | |
headers = {"Authorization": f"Bearer {access_token}"} | |
all_commits = [] | |
page = 1 | |
per_page = 100 # Adjust per_page as needed | |
while True: | |
params = {"per_page": per_page, "page": page} | |
response = http_get(url, headers=headers, params=params, timeout=10) | |
if response.status_code == 200: | |
commits = response.json() | |
if not commits: # No more commits | |
break | |
all_commits.extend(commits) | |
if not quiet: | |
print(f"Retrieved {len(all_commits)} commits...", end="\r" * 30) | |
page += 1 | |
continue | |
raise RuntimeError( | |
f"Failed to retrieve commit history. Status code: {response.status_code}" | |
) | |
return all_commits | |
def find_file_in_commit( | |
commit_sha: str, owner: str, repo: str, file_path: str, access_token: str | |
) -> str: | |
"""Finds changed file in commit""" | |
commit_url = f"https://api.github.com/repos/{owner}/{repo}/commits/{commit_sha}" | |
headers = {"Authorization": f"Bearer {access_token}"} | |
commit_details = http_get(commit_url, headers=headers, timeout=10).json() | |
# Check if the 'files' key exists in the commit details | |
if "files" in commit_details: | |
files_changed = commit_details["files"] | |
for file_changed in files_changed: | |
if file_changed["filename"] == file_path: | |
return file_changed["contents_url"] | |
return None | |
def get_file_content(file_content_url: str, access_token: str) -> str: | |
"""Retrieves file content from blob""" | |
headers = { | |
"Accept": "application/vnd.github+json", | |
"Authorization": f"Bearer {access_token}", | |
} | |
file_content_response = http_get(file_content_url, headers=headers, timeout=10) | |
# Check if the request was successful | |
if file_content_response.status_code == 200: | |
file_content = file_content_response.json()["content"] | |
decoded_content = b64decode(file_content).decode("utf-8") | |
return decoded_content | |
raise RuntimeError( | |
f"Failed to retrieve file content. Status code: {file_content_response.status_code}" | |
) | |
def git_find_commit( | |
repo_owner: str, | |
repo_name: str, | |
rel_file_path: str, | |
match_str: str, | |
access_token: str, | |
n_results: int = 1, | |
quiet: bool = False, | |
) -> List[str]: | |
"""Look for `n_results` latest commits in a repository | |
having a file named `rel_file_path` with a string `match_str` in it. | |
Returns a list with all of the blob checksum strings (sha) found. | |
""" | |
def _print(*args, **kwargs) -> None: | |
if not quiet: | |
print(*args, **kwargs) | |
_print( | |
f"\nSearching for the latest commit in repository '{repo_owner}/{repo_name}" | |
f"\nLooking for latest commits with a file named '{rel_file_path}'" | |
f"having the string '{match_str}' in it...\n" | |
) | |
commits = get_commit_history(repo_owner, repo_name, access_token, quiet=quiet) | |
results = [] | |
for commit in commits: | |
commit_sha = commit["sha"] | |
_print(f"Checking commit: {commit_sha}", end="\r" * 50) | |
blob_url = find_file_in_commit( | |
commit_sha, repo_owner, repo_name, rel_file_path, access_token | |
) | |
if blob_url is not None: | |
ftext = get_file_content(blob_url, access_token) | |
# Add logic to check if the file content has the specific field | |
if match_str in ftext: | |
n_results -= 1 | |
results.append(commit_sha) | |
_print(f"\nField '{match_str}' found in commit {commit_sha}") | |
if n_results <= 0: | |
return results | |
return results | |
if __name__ == "__main__": | |
# Replace with your git authentication token | |
API_TOKEN = "" | |
REPO_OWNER = "" | |
REPO_NAME = "" | |
# Filepath relative to repository tree structure | |
REL_FILE_PATH = "" | |
# String to find inside blob content | |
MATCH_STR = "" | |
# Number of latest commits to gather | |
N_RESULTS = 2 | |
blobs = git_find_commit( | |
REPO_OWNER, # Repo owner | |
REPO_NAME, # Repo name | |
REL_FILE_PATH, # Relative filepath | |
MATCH_STR, # String to find in file blob text | |
API_TOKEN, # Github access token | |
n_results=N_RESULTS, | |
) | |
print(f"\nFound {len(blobs)} latest commits...") | |
for blob in blobs: | |
print( | |
f"\nhttps://github.com/{REPO_OWNER}/{REPO_NAME}/blob/{blob}/{REL_FILE_PATH}" | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Github API utility module""" | |
from requests import get as http_get | |
def latest_git_repo_tag(owner: str, repo_name: str, timeout: float = 3) -> str | None: | |
"""Retrieves the latest tag name for a repository on Github""" | |
response = http_get( | |
f"https://api.github.com/repos/{owner}/{repo_name}/releases/latest", | |
timeout=timeout, | |
) | |
if response.status_code != 200: | |
raise RuntimeError( | |
f"Got HTTP error when retrieving release tag: {response.status_code}" | |
) | |
return str(response.json()["name"]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Collections of Git API utility wrappers made in Python |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment