Created
October 15, 2021 17:22
-
-
Save memoryleak/f03242b8ea85907f15a471ba22e544d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import threading | |
from concurrent.futures import ALL_COMPLETED | |
from queue import Queue | |
from time import sleep | |
from timeit import default_timer as timer | |
import networkx as nx | |
from networking import Crawler, NetworkDisabledException | |
graph = nx.DiGraph() | |
queue = Queue() | |
processed = list() | |
crawler = Crawler('XXXXXX') | |
graph_lock = threading.Lock() | |
processed_lock = threading.Lock() | |
def consumer(): | |
url = queue.get(block=True, timeout=5) | |
response_repository = None | |
try: | |
response_repository = crawler.get_url(url) | |
except NetworkDisabledException: | |
return | |
if response_repository is None: | |
queue.task_done() | |
processed_lock.acquire() | |
processed.append(url) | |
processed_lock.release() | |
return | |
repository_data: dict = response_repository.json() | |
repository_language = repository_data.get("language") | |
if repository_language is None: | |
repository_language = parse_repository_language(repository_data.get('languages_url')) | |
if repository_language == "Rust" and repository_data.get('fork') is False: | |
graph_lock.acquire() | |
graph.add_node( | |
repository_data.get('url'), | |
type='repository', | |
stars=repository_data.get("stargazers_count"), | |
fork=repository_data.get("fork"), | |
url=repository_data.get("url"), | |
label=repository_data.get("name"), | |
language=repository_language | |
) | |
graph_lock.release() | |
parse_contributors(repository_data.get('url'), repository_data.get('contributors_url')) | |
queue.task_done() | |
processed_lock.acquire() | |
processed.append(url) | |
processed_lock.release() | |
def parse_contributors(parent_url, url): | |
try: | |
response_contributors = crawler.get_url(url) | |
except NetworkDisabledException: | |
return | |
contributors_list: list = response_contributors.json() | |
contribution_list = sorted(contributors_list, key=lambda d: d['contributions']) | |
# Pop the last item which has the highest contributions | |
if len(contribution_list) > 1: | |
contribution_list.pop() | |
contributor_data: dict | |
for contributor_data in contributors_list: | |
contributions_count = contributor_data.get('contributions') | |
graph_lock.acquire() | |
graph.add_node( | |
contributor_data.get("url"), | |
type='contributor', | |
label=contributor_data.get("login") | |
) | |
graph.add_edge( | |
contributor_data.get("url"), | |
parent_url, | |
weight=contributions_count | |
) | |
graph_lock.release() | |
parse_repository_list(contributor_data.get('repos_url')) | |
def parse_repository_list(url: str): | |
response_repository_list = None | |
try: | |
response_repository_list = crawler.get_url(url) | |
except NetworkDisabledException: | |
return | |
repository_list: list = response_repository_list.json() | |
repository_dict: dict | |
for repository_dict in repository_list: | |
if repository_dict.get('url') not in processed: | |
queue.put_nowait(repository_dict.get('url')) | |
def parse_repository_language(url: str) -> str: | |
language_list = None | |
try: | |
language_list = crawler.get_url(url) | |
except NetworkDisabledException: | |
return "" | |
language_dict = language_list.json() | |
language_dict_key_iter = iter(language_dict.keys()) | |
# The for loop listens for StopIteration explicitly. | |
for language in language_dict_key_iter: | |
return language | |
return "" | |
queue.put('https://api.github.com/repos/{}/{}'.format('rust-lang', 'rust')) | |
consumers = [] | |
start = timer() | |
inter_timer = timer() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
while not queue.empty(): | |
for _ in range(20): | |
consumers.append(executor.submit(consumer)) | |
sleep(5) | |
if timer() - inter_timer > 100: | |
graph_lock.acquire() | |
print(nx.info(graph)) | |
nx.write_gpickle(graph, "graph.gpickle") | |
nx.write_gexf(graph, "graph.gexf") | |
graph_lock.release() | |
remaining_futures = concurrent.futures.wait(consumers, timeout=10, return_when=ALL_COMPLETED) | |
print("") | |
print("Last item: {}".format(queue.get())) | |
print("Queue Size: {} ".format(queue.qsize())) | |
print(nx.info(graph)) | |
nx.write_graphml(graph, "graph.gml") | |
nx.write_gexf(graph, "graph.gexf") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import os | |
import pickle | |
import requests | |
class RateLimitException(Exception): | |
def __init__(self, *args: object) -> None: | |
super().__init__(*args) | |
class NetworkDisabledException(Exception): | |
def __init__(self, *args: object) -> None: | |
super().__init__(*args) | |
def cachedresponse(func): | |
def cached_request_func(*args, **kwargs): | |
url: str = args[1] | |
url_hash = hashlib.md5(url.encode()).hexdigest() | |
url_cache_path = os.path.join('cache', url_hash) | |
if os.path.exists(url_cache_path): | |
with open(url_cache_path, 'rb') as url_cache_file: | |
url_cache_response = pickle.load(url_cache_file) | |
else: | |
raise NetworkDisabledException("Network disabled") | |
url_cache_response: requests.models.Response = func(*args, **kwargs) | |
# if url_cache_response.headers.get("X-Rate-Limit") | |
if url_cache_response.status_code in [200, 201]: | |
with open(url_cache_path, 'wb') as url_cache_file: | |
pickle.dump(url_cache_response, url_cache_file) | |
else: | |
raise Exception(url_cache_response) | |
return url_cache_response | |
return cached_request_func | |
class Crawler: | |
def __init__(self, token: str) -> None: | |
self._params = {'per_page': 100} | |
self._headers = { | |
'Authorization': 'Bearer {}'.format(token), | |
'Accept': 'application/json' | |
} | |
@cachedresponse | |
def get_url(self, url: str) -> requests.models.Response: | |
response = requests.get(url, params=self._params, headers=self._headers) | |
response.close() | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment