Last active
April 2, 2019 21:34
-
-
Save valeriocos/a3da13be7821a1e1915681c23ce441be to your computer and use it in GitHub Desktop.
Download and process the archives from githubarchive.com between two dates, the results are stored to a MySQL database
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Copyright (C) 2015-2019 Bitergia | |
# | |
# This program is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation; either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program; if not, write to the Free Software | |
# Foundation, 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA. | |
# | |
# Authors: | |
# Valerio Cosentino <[email protected]> | |
# | |
import argparse | |
import datetime | |
import gzip | |
import json | |
import logging | |
import os | |
import queue | |
import re | |
import requests | |
import sys | |
import threading | |
import urllib3 | |
import mysql.connector | |
from mysql.connector import errorcode | |
from grimoirelab_toolkit.datetime import datetime_utcnow | |
GHARCHIVE_URL = 'http://data.githubarchive.org/{0}.json.gz' | |
FROM_DATE = '2017-01-01-00' | |
TO_DATE = '2019-01-01-00' | |
TARGET_EVENTS = ['IssueEvent', 'PullRequestEvent'] | |
POOL_SIZE = 16 | |
MAX_RETRY = 10 | |
CONNECTION_RETRY = 10 | |
DB_NAME = "github_events" | |
DB_CONFIG = { | |
'user': 'root', | |
'password': '', | |
'database': 'github_events', | |
'host': 'localhost', | |
'port': '3306', | |
'raise_on_warnings': False, | |
'buffered': True | |
} | |
class ProcessArchive(threading.Thread): | |
"""Process a single archive and store the obtained results to the database. | |
ProcessArchive is a thread which reads from a queue and processes its items (archive paths). Each | |
path is used to load the corresponding archive, its content is parsed and analysed to look for | |
specific events defined in TARGET_EVENTS, the obtained data is then stored in the database. | |
The thread ends as soon as it receives a None item, thus it returns and waits the other | |
threads to finish. | |
""" | |
def __init__(self, qarchive): | |
threading.Thread.__init__(self) | |
self.qarchive = qarchive | |
def run(self): | |
cnx = mysql.connector.connect(**DB_CONFIG) | |
cursor = cnx.cursor() | |
use_database = "USE " + DB_NAME | |
cursor.execute(use_database) | |
while True: | |
archive_path = self.qarchive.get() | |
if not archive_path: | |
logging.debug("Thread %s exiting", self.getName()) | |
self.qarchive.task_done() | |
break | |
if not os.path.exists(archive_path): | |
logging.debug("Archive %s not found", archive_path) | |
self.qarchive.task_done() | |
continue | |
with gzip.open(archive_path, 'r') as content: | |
for line in content: | |
decoded = line.decode("utf-8") | |
delimited = re.sub(r'}{"(?!\W)', '}JSONDELIMITER{"', decoded) | |
for chunk in delimited.split('JSONDELIMITER'): | |
if len(chunk) == 0: | |
continue | |
try: | |
event = json.loads(chunk) | |
except Exception as e: | |
logging.error("Failed to load JSON %s in archive %s, %s", | |
chunk, archive_path, str(e)) | |
continue | |
event_type = event['type'] | |
event_payload = event['payload'] | |
if event_type not in TARGET_EVENTS: | |
continue | |
# action attribute is in IssueEvents and PullRequestEvents | |
event_action = event_payload['action'] | |
if event_action != 'opened': | |
continue | |
event_actor = event['actor']['login'] | |
event_repo = event['repo']['url'] | |
event_created_at = event['created_at'] | |
cursor = cnx.cursor() | |
query = "INSERT IGNORE INTO events " \ | |
"VALUES (%s, %s, %s, %s, %s, %s)" | |
arguments = [None, event_repo, event_actor, event_type, event_action, event_created_at] | |
cursor.execute(query, arguments) | |
cnx.commit() | |
cursor.close() | |
self.qarchive.task_done() | |
logging.debug("Archive %s processed by %s", archive_path, self.getName()) | |
def connection(): | |
"""Create a requests.Session obj which includes a retry mechanism on connection faults""" | |
con = requests.Session() | |
retries = urllib3.util.Retry(total=MAX_RETRY, connect=CONNECTION_RETRY) | |
adapter = requests.adapters.HTTPAdapter(max_retries=retries) | |
con.mount('http://', adapter) | |
con.mount('https://', adapter) | |
return con | |
def format_date(date): | |
"""Format the date to be able to query the archives hosted on githubarchive.com. | |
:param date: a Datetime obj | |
""" | |
strp_date = datetime.datetime.strptime(date, "%Y-%m-%d-%H") | |
output_format = re.sub(r'-[0-9][0-9]$', '', date) | |
if strp_date.hour == 00: | |
output_format += '-0' | |
else: | |
output_format += '-' + str(strp_date.hour).lstrip('0') | |
return output_format | |
def update_current_date(date): | |
"""Update a given date by one hour. | |
;param date: target date | |
""" | |
d = datetime.datetime.strptime(date, "%Y-%m-%d-%H") | |
d = d + datetime.timedelta(hours=1) | |
new_date = d.strftime("%Y-%m-%d-%H") | |
return new_date | |
def download_archives(folder, from_date, to_date): | |
"""Download the archives from githubarchive.com generated between `from_date` | |
and `to_date`, and store them to `folder`. | |
:param folder: the folder where to store the archives | |
:param from_date: the starting date to download the archives | |
:param to_date: the ending date to download the archives | |
""" | |
con = connection() | |
current_date = from_date | |
while current_date != to_date: | |
formatted_date = format_date(current_date) | |
archive_path = os.path.join(folder, formatted_date) + '.gz' | |
if os.path.exists(archive_path): | |
logging.debug("Archive %s already downloaded", archive_path) | |
current_date = update_current_date(current_date) | |
continue | |
url = GHARCHIVE_URL.format(formatted_date) | |
try: | |
response = con.get(url, stream=True) | |
with open(archive_path, 'wb') as fd: | |
fd.write(response.raw.read()) | |
logging.debug("Archive %s downloaded", archive_path) | |
except Exception as e: | |
logging.error('Archive %s not collected due to %s', url, str(e)) | |
continue | |
# update current date | |
current_date = update_current_date(current_date) | |
def process_archives(folder, from_date, to_date): | |
"""Process the GitHub events stored in the archives in `folder` with names | |
between `from_date` and `to_date`. | |
:param folder: the folder where to store the archives | |
:param from_date: the starting date to process the archives | |
:param to_date: the ending date to process the archives | |
""" | |
qarchives = queue.Queue() | |
# init threads | |
threads = [] | |
for i in range(POOL_SIZE): | |
t = ProcessArchive(qarchives) | |
threads.append(t) | |
# start threads | |
for t in threads: | |
logging.debug("Thread %s created", t.getName()) | |
t.start() | |
current_date = from_date | |
while current_date != to_date: | |
formatted_date = format_date(current_date) | |
archive_path = os.path.join(folder, formatted_date) + '.gz' | |
qarchives.put(archive_path) | |
current_date = update_current_date(current_date) | |
# add dead pills | |
for _ in threads: | |
qarchives.put(None) | |
# wait for all threads to finish | |
for t in threads: | |
t.join() | |
def init_database(force_init): | |
"""Initialize the database (if `force_init` is True, the database is recreated), which is | |
composed of only the table `events` defined in the following way: | |
- ID - unique ID to identify the event | |
- REPO - API URL of the repo | |
- ACTOR - GitHub username | |
- TYPE - type of the event | |
- ACTION - action of the event | |
Two indexes on action and actor are defined to speed up queries. | |
:param force_init: if True deletes the previous version of the database | |
""" | |
cnx = mysql.connector.connect(**DB_CONFIG) | |
cursor = cnx.cursor() | |
if force_init: | |
drop_database_if_exists = "DROP DATABASE IF EXISTS " + DB_NAME | |
cursor.execute(drop_database_if_exists) | |
try: | |
create_database = "CREATE DATABASE " + DB_NAME | |
cursor.execute(create_database) | |
except mysql.connector.errors.DatabaseError as e: | |
logging.debug("Database not created, %s", str(e)) | |
pass | |
cursor.execute("set global innodb_file_format = BARRACUDA") | |
cursor.execute("set global innodb_file_format_max = BARRACUDA") | |
cursor.execute("set global innodb_large_prefix = ON") | |
cursor.execute("set global character_set_server = utf8") | |
cursor.execute("set global max_connections = 500") | |
use_database = "USE " + DB_NAME | |
cursor.execute(use_database) | |
create_table = "CREATE TABLE IF NOT EXISTS events( " \ | |
"id int(20) AUTO_INCREMENT PRIMARY KEY, " \ | |
"repo varchar(255), " \ | |
"actor varchar(255), " \ | |
"type varchar(32), " \ | |
"action varchar(32), " \ | |
"created_at timestamp, " \ | |
"INDEX actor (actor) " \ | |
") ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;" | |
cursor.execute(create_table) | |
cursor.close() | |
def parser(*args): | |
"""Parse the commands of the script.""" | |
def commands(): | |
"""Define the commands of the script, which are: | |
--folder: Folder to store/read the archives | |
--download: if True, it downloads the archives, default False | |
--from-date: the date to start storing/reading the archives, default FROM_DATE | |
--to-date: the date to end storing/reading the archives, default TO_DATE | |
--new-db: recreate the database if exists | |
""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--folder', dest='folder', help='Folder to store/read the GHArchive data') | |
parser.add_argument('--download', action='store_true', help='Download GHArchive data') | |
parser.add_argument('--from-date', dest='from_date', default=FROM_DATE, help="Starting date (yyyy-mm-dd-hh)") | |
parser.add_argument('--to-date', dest='to_date', default=TO_DATE, help="Ending date (yyyy-mm-dd-hh)") | |
parser.add_argument('--new-db', action='store_true', help="Delete the previous version of the DB if exists") | |
return parser | |
parsed_args = commands().parse_args(*args) | |
return parsed_args | |
def main(): | |
"""This script downloads and processes the archives from githubarchive.com | |
between two dates (--from-date and --to-date) to a folder (--folder). It filters them according to | |
specific GitHub events and store them in a MySQL database (`github_events`). | |
""" | |
logging.getLogger().setLevel(logging.DEBUG) | |
args = parser(sys.argv[1:]) | |
folder = args.folder | |
download = args.download | |
from_date = args.from_date | |
to_date = args.to_date | |
force_init = args.new_db | |
start_time = datetime_utcnow().isoformat() | |
logging.debug("script started at: %s", start_time) | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
if download: | |
download_archives(folder, from_date, to_date) | |
init_database(force_init) | |
process_archives(folder, from_date, to_date) | |
end_time = datetime_utcnow().isoformat() | |
logging.debug("script ended at: %s", end_time) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment