Skip to content

Instantly share code, notes, and snippets.

@keif
Last active March 13, 2025 23:32
Show Gist options
  • Save keif/ece2fa9372d038f89a64ab14895ce462 to your computer and use it in GitHub Desktop.
Save keif/ece2fa9372d038f89a64ab14895ce462 to your computer and use it in GitHub Desktop.
This script fetches all GitHub notifications, checks if their associated issues or pull requests are closed/merged, and marks them as done by deleting them via the GitHub API.
import requests
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# Get GitHub token from environment variable
GITHUB_TOKEN = os.environ.get('GITHUB_TOKEN')
if not GITHUB_TOKEN:
raise ValueError("Missing GITHUB_TOKEN environment variable")
# GitHub API Base URL
GITHUB_API_URL = 'https://api.github.com'
# Headers for API requests (updated to include API version)
HEADERS = {
'Authorization': f'Bearer {GITHUB_TOKEN}',
'Accept': 'application/vnd.github+json',
'X-GitHub-Api-Version': '2022-11-28'
}
# Fetch **ALL** notifications with pagination
def get_all_notifications():
notifications = []
url = f'{GITHUB_API_URL}/notifications?all=true&per_page=100' # Start with first page
while url:
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
notifications.extend(response.json())
# Get next page URL from headers (pagination)
url = None # Default to stopping
if 'Link' in response.headers:
links = response.headers['Link'].split(', ')
for link in links:
if 'rel="next"' in link:
url = link[link.find('<') + 1:link.find('>')] # Extract next page URL
break # Stop checking once next page is found
return notifications
# Check if a notification's associated issue/PR is closed or merged
def is_closed_or_merged(notification):
subject = notification.get("subject", {})
subject_type = subject.get("type")
api_url = subject.get("url") # API URL for issue/PR
if not api_url:
return False
response = requests.get(api_url, headers=HEADERS)
if response.status_code != 200:
return False
data = response.json()
if subject_type == "PullRequest":
return data.get("merged_at") is not None or data.get("state") == "closed"
elif subject_type == "Issue":
return data.get("state") == "closed"
return False
# **Delete a notification (marks it as done)**
def delete_notification(thread_id):
url = f'{GITHUB_API_URL}/notifications/threads/{thread_id}'
response = requests.delete(url, headers=HEADERS) # βœ… Correct DELETE request
if response.status_code in [200, 204]:
print(f"βœ… Marked notification {thread_id} as done (deleted).")
return True
else:
print(f"❌ Failed to delete {thread_id} (HTTP {response.status_code})")
return False
# **Process a single notification (check if closed/merged, then delete)**
def process_notification(notification):
thread_id = notification['id']
title = notification['subject']['title']
print(f"πŸ” Checking notification {thread_id}: {title}...")
if is_closed_or_merged(notification):
return delete_notification(thread_id) # βœ… Parallelized DELETE call
return False
# Main script logic with parallel execution
def main():
notifications = get_all_notifications()
print(f"πŸ“© Found {len(notifications)} total notifications (across all pages).")
processed_count = 0
max_workers = 5 # Adjust this number based on performance needs (avoid API rate limiting)
# **Use ThreadPoolExecutor to process notifications in parallel**
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_notification = {executor.submit(process_notification, notif): notif for notif in notifications}
for future in as_completed(future_to_notification):
if future.result():
processed_count += 1
print(f"βœ… Finished processing. {processed_count} notifications marked as done.")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment