Skip to content

Instantly share code, notes, and snippets.

@spalladino
Last active February 21, 2020 06:40
Show Gist options
  • Save spalladino/ec04b2482da47392bb3f098f87e280fb to your computer and use it in GitHub Desktop.
Save spalladino/ec04b2482da47392bb3f098f87e280fb to your computer and use it in GitHub Desktop.
Estimate number of repositories that match a code search
from http.client import HTTPSConnection
from base64 import b64encode
from json import loads
from sys import exit, argv, stderr
from time import sleep
import urllib3
# Settings
PRINT_REPOS = True
PAGE_SIZE = 10
SIZE_STEP = 1000
SIZE_UPPER_BOUND = 20000
# Query to search for
QUERY = "foobar"
# Generate a token at https://github.com/settings/tokens
USER = ""
TOKEN = ""
# Query https://developer.github.com/v3/search/#search-code
connection = HTTPSConnection("api.github.com")
userpass = "%s:%s" % (USER, TOKEN)
userpassb64 = b64encode(userpass.encode("ascii")).decode("ascii")
headers = { 'Authorization': 'Basic %s' % userpassb64, 'User-Agent': USER }
def print_unique(repos):
print("\n".join(list(set(repos))))
def check_response(res):
if res.status != 200:
body = res.read().decode('utf-8')
if "abuse detection" in body:
print("A", end="", file=stderr, flush=True)
sleep(300)
return False
else:
print("Error accessing github API", file=stderr)
print(res.read(), file=stderr)
print(res.getheaders(), file=stderr)
exit(1)
return True
def check_complete_results(data):
if data["incomplete_results"]:
print("I", end="", file=stderr, flush=True)
sleep(5)
return False
return True
def fetch_for_size(size_from, size_to):
total_count = None
page = 0
page_size = PAGE_SIZE
query = "%s size:%s..%s" % (QUERY, size_from, size_to)
repositories = []
print("\n Size %s..%s: " % (size_from, size_to), file=stderr, end="")
while (total_count is None) or (page * page_size) < total_count:
qs = urllib3.request.urlencode({ "per_page": str(page_size), "page": str(page + 1), "q": query })
res = None
try:
connection.request('GET', '/search/code?%s' % (qs), headers=headers)
res = connection.getresponse()
except:
print("E", end="", file=stderr, flush=True)
sleep(150)
continue
if not check_response(res):
continue
data = loads(res.read().decode('utf-8'))
if not check_complete_results(data):
continue
print(".", end="", file=stderr, flush=True)
repositories += list([item["repository"]["full_name"] for item in data["items"]])
total_count = data["total_count"]
sleep(5)
page += 1
return repositories
def fetch_all():
repositories = []
for size_from in range(0, SIZE_UPPER_BOUND, SIZE_STEP):
size_repositories = fetch_for_size(size_from, size_from + SIZE_STEP)
repositories += size_repositories
repositories += fetch_for_size(SIZE_UPPER_BOUND, "*")
return repositories
print("Fetching...", file=stderr, end="")
repositories = fetch_all()
unique_repositories = list(set(repositories))
print("\nNumber of repos: %d" % (len(unique_repositories),), file=stderr)
if PRINT_REPOS:
print_unique(unique_repositories)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment