Created
February 22, 2023 12:52
-
-
Save MarshalX/d3349392728aeb7a18dffd3919809a97 to your computer and use it in GitHub Desktop.
Fetch public email addresses of all stargazers of public repositories of the GitHub user by username
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Was written in Python 3.11 | |
# 1 dep: pip install aiohttp | |
# required env vars: GITHUB_PAT, GITHUB_USERNAME | |
# due to strict API limits it uses simple cache file to save state | |
# probably you need many runs of the script to fetch all emails | |
import asyncio | |
import os | |
import re | |
from typing import Optional, Coroutine, Tuple | |
import aiohttp | |
GITHUB_PAT = os.environ['GITHUB_PAT'] | |
USERNAME = os.environ['GITHUB_USERNAME'] | |
GITHUB_API_LIMIT_PER_HOUR = 5_000 | |
COUNT_OF_RUNNING_WORKFLOW_AT_SAME_TIME = 5 # just random number | |
API_BASE_URL = 'https://api.github.com' | |
REQUEST_KWARGS = { | |
'headers': { | |
'Authorization': f'token {GITHUB_PAT}' | |
} | |
} | |
GATHER_BATCH_SIZE = 10 | |
CACHE_FILENAME = 'db.csv' | |
OUTPUT_FILENAME = 'emails.txt' | |
async def send_req_until_success(coroutine: Coroutine) -> dict: | |
delay_sec = 5 | |
count_of_retries = int(GITHUB_API_LIMIT_PER_HOUR / COUNT_OF_RUNNING_WORKFLOW_AT_SAME_TIME / delay_sec) | |
retry_number = 0 | |
while retry_number <= count_of_retries: | |
if retry_number > 0: | |
print(f'Retry number for #{id(coroutine)} is {retry_number}') | |
retry_number += 1 | |
try: | |
res = await coroutine | |
if res.status != 200: | |
await asyncio.sleep(delay_sec) | |
continue | |
except: | |
await asyncio.sleep(delay_sec) | |
continue | |
json = await res.json() | |
return json | |
raise RuntimeError('Surprise. Time is over') | |
async def get_last_page_number(url: str) -> int: | |
last_page_number = 0 | |
async with aiohttp.request('GET', url, **REQUEST_KWARGS) as r: | |
pagination_data = r.headers.get('Link', '') | |
matches = re.findall(r'page=(\d+)>; rel="last"', pagination_data) | |
if matches: | |
last_page_number = int(matches[0]) | |
return last_page_number | |
async def get_stargazers(url: str, page: int) -> list[str]: | |
params = { | |
'per_page': '100', | |
'page': str(page), | |
} | |
context = aiohttp.request('GET', url, params=params, **REQUEST_KWARGS) | |
resp = await send_req_until_success(context.__aenter__()) | |
await context.__aexit__(None, None, None) | |
return [u['login'] for u in resp] | |
async def get_all_stargazers(owner: str, repo: str) -> list[str]: | |
url = f'{API_BASE_URL}/repos/{owner}/{repo}/stargazers' | |
res = [] | |
last_page_number = await get_last_page_number(url) | |
for page in range(1, last_page_number): | |
res.extend(await get_stargazers(url, page)) | |
return res | |
async def get_user_email(username: str) -> Optional[Tuple[str, str]]: | |
url = f'{API_BASE_URL}/users/{username}' | |
context = aiohttp.request('GET', url, **REQUEST_KWARGS) | |
resp = await send_req_until_success(context.__aenter__()) | |
await context.__aexit__(None, None, None) | |
return username, resp.get('email') | |
async def get_user_repos(username: str) -> list[str]: | |
# this method can't fetch more than 100 repos. you need to implement pagination | |
url = f'{API_BASE_URL}/users/{username}/repos' | |
params = { | |
'per_page': '100', | |
'sort': 'updated', | |
} | |
context = aiohttp.request('GET', url, params=params, **REQUEST_KWARGS) | |
resp = await send_req_until_success(context.__aenter__()) | |
await context.__aexit__(None, None, None) | |
return [r['name'] for r in resp] | |
def read_from_db() -> dict[str, str]: | |
db = {} | |
if not os.path.exists(CACHE_FILENAME): | |
return db | |
with open(CACHE_FILENAME, 'r', encoding='UTF-8') as f: | |
lines = f.read().split('\n') | |
for line in lines: | |
if not line: | |
continue | |
username, email = line.split('\t') | |
db[username] = email | |
return db | |
def write_to_db(username: str, email: str): | |
# could be asynced; could save array per one open | |
with open(CACHE_FILENAME, 'a+', encoding='UTF-8') as f: | |
f.write(f'{username}\t{email}\n') | |
def write_plain_emails(db: dict[str, str]): | |
with open(OUTPUT_FILENAME, 'w+', encoding='UTF-8') as f: | |
for _, email in db.items(): | |
if email != 'None': | |
f.write(f'{email}\n') | |
async def main(): | |
db = read_from_db() | |
repository_names = await get_user_repos(USERNAME) | |
tasks = [get_all_stargazers(USERNAME, repository_name) for repository_name in repository_names] | |
task_results = await asyncio.gather(*tasks) | |
stargazers = set() | |
for task_result in task_results: | |
for result in task_result: | |
stargazers.add(result) | |
print('Stargazers count:', len(stargazers)) | |
tasks = [get_user_email(username) for username in stargazers if username not in db] | |
print('Not cached stargazers:', len(tasks)) | |
for i, limit in enumerate(range(GATHER_BATCH_SIZE, len(tasks) + GATHER_BATCH_SIZE, GATHER_BATCH_SIZE)): | |
print(f'[{i}] Fetch emails from {limit - GATHER_BATCH_SIZE} to {limit}/{len(tasks)}') | |
task_results = await asyncio.gather(*tasks[limit - GATHER_BATCH_SIZE: limit]) | |
for task_result in task_results: | |
username, email = task_result | |
write_to_db(username, email) | |
write_plain_emails(read_from_db()) | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This script doesn't make sense because there is GrahpQL interface: https://docs.github.com/en/graphql/overview/explorer