Skip to content

Instantly share code, notes, and snippets.

@memoryleak
Created October 15, 2021 17:22
Show Gist options
  • Save memoryleak/f03242b8ea85907f15a471ba22e544d1 to your computer and use it in GitHub Desktop.
Save memoryleak/f03242b8ea85907f15a471ba22e544d1 to your computer and use it in GitHub Desktop.
import concurrent.futures
import threading
from concurrent.futures import ALL_COMPLETED
from queue import Queue
from time import sleep
from timeit import default_timer as timer
import networkx as nx
from networking import Crawler, NetworkDisabledException
graph = nx.DiGraph()
queue = Queue()
processed = list()
crawler = Crawler('XXXXXX')
graph_lock = threading.Lock()
processed_lock = threading.Lock()
def consumer():
url = queue.get(block=True, timeout=5)
response_repository = None
try:
response_repository = crawler.get_url(url)
except NetworkDisabledException:
return
if response_repository is None:
queue.task_done()
processed_lock.acquire()
processed.append(url)
processed_lock.release()
return
repository_data: dict = response_repository.json()
repository_language = repository_data.get("language")
if repository_language is None:
repository_language = parse_repository_language(repository_data.get('languages_url'))
if repository_language == "Rust" and repository_data.get('fork') is False:
graph_lock.acquire()
graph.add_node(
repository_data.get('url'),
type='repository',
stars=repository_data.get("stargazers_count"),
fork=repository_data.get("fork"),
url=repository_data.get("url"),
label=repository_data.get("name"),
language=repository_language
)
graph_lock.release()
parse_contributors(repository_data.get('url'), repository_data.get('contributors_url'))
queue.task_done()
processed_lock.acquire()
processed.append(url)
processed_lock.release()
def parse_contributors(parent_url, url):
try:
response_contributors = crawler.get_url(url)
except NetworkDisabledException:
return
contributors_list: list = response_contributors.json()
contribution_list = sorted(contributors_list, key=lambda d: d['contributions'])
# Pop the last item which has the highest contributions
if len(contribution_list) > 1:
contribution_list.pop()
contributor_data: dict
for contributor_data in contributors_list:
contributions_count = contributor_data.get('contributions')
graph_lock.acquire()
graph.add_node(
contributor_data.get("url"),
type='contributor',
label=contributor_data.get("login")
)
graph.add_edge(
contributor_data.get("url"),
parent_url,
weight=contributions_count
)
graph_lock.release()
parse_repository_list(contributor_data.get('repos_url'))
def parse_repository_list(url: str):
response_repository_list = None
try:
response_repository_list = crawler.get_url(url)
except NetworkDisabledException:
return
repository_list: list = response_repository_list.json()
repository_dict: dict
for repository_dict in repository_list:
if repository_dict.get('url') not in processed:
queue.put_nowait(repository_dict.get('url'))
def parse_repository_language(url: str) -> str:
language_list = None
try:
language_list = crawler.get_url(url)
except NetworkDisabledException:
return ""
language_dict = language_list.json()
language_dict_key_iter = iter(language_dict.keys())
# The for loop listens for StopIteration explicitly.
for language in language_dict_key_iter:
return language
return ""
queue.put('https://api.github.com/repos/{}/{}'.format('rust-lang', 'rust'))
consumers = []
start = timer()
inter_timer = timer()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
while not queue.empty():
for _ in range(20):
consumers.append(executor.submit(consumer))
sleep(5)
if timer() - inter_timer > 100:
graph_lock.acquire()
print(nx.info(graph))
nx.write_gpickle(graph, "graph.gpickle")
nx.write_gexf(graph, "graph.gexf")
graph_lock.release()
remaining_futures = concurrent.futures.wait(consumers, timeout=10, return_when=ALL_COMPLETED)
print("")
print("Last item: {}".format(queue.get()))
print("Queue Size: {} ".format(queue.qsize()))
print(nx.info(graph))
nx.write_graphml(graph, "graph.gml")
nx.write_gexf(graph, "graph.gexf")
import hashlib
import os
import pickle
import requests
class RateLimitException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
class NetworkDisabledException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
def cachedresponse(func):
def cached_request_func(*args, **kwargs):
url: str = args[1]
url_hash = hashlib.md5(url.encode()).hexdigest()
url_cache_path = os.path.join('cache', url_hash)
if os.path.exists(url_cache_path):
with open(url_cache_path, 'rb') as url_cache_file:
url_cache_response = pickle.load(url_cache_file)
else:
raise NetworkDisabledException("Network disabled")
url_cache_response: requests.models.Response = func(*args, **kwargs)
# if url_cache_response.headers.get("X-Rate-Limit")
if url_cache_response.status_code in [200, 201]:
with open(url_cache_path, 'wb') as url_cache_file:
pickle.dump(url_cache_response, url_cache_file)
else:
raise Exception(url_cache_response)
return url_cache_response
return cached_request_func
class Crawler:
def __init__(self, token: str) -> None:
self._params = {'per_page': 100}
self._headers = {
'Authorization': 'Bearer {}'.format(token),
'Accept': 'application/json'
}
@cachedresponse
def get_url(self, url: str) -> requests.models.Response:
response = requests.get(url, params=self._params, headers=self._headers)
response.close()
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment