Skip to content

Instantly share code, notes, and snippets.

@valeriocos
Last active April 2, 2019 21:34
Show Gist options
  • Save valeriocos/a3da13be7821a1e1915681c23ce441be to your computer and use it in GitHub Desktop.
Save valeriocos/a3da13be7821a1e1915681c23ce441be to your computer and use it in GitHub Desktop.
Download and process the archives from githubarchive.com between two dates, the results are stored to a MySQL database
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2019 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA.
#
# Authors:
# Valerio Cosentino <[email protected]>
#
import argparse
import datetime
import gzip
import json
import logging
import os
import queue
import re
import requests
import sys
import threading
import urllib3
import mysql.connector
from mysql.connector import errorcode
from grimoirelab_toolkit.datetime import datetime_utcnow
GHARCHIVE_URL = 'http://data.githubarchive.org/{0}.json.gz'
FROM_DATE = '2017-01-01-00'
TO_DATE = '2019-01-01-00'
TARGET_EVENTS = ['IssueEvent', 'PullRequestEvent']
POOL_SIZE = 16
MAX_RETRY = 10
CONNECTION_RETRY = 10
DB_NAME = "github_events"
DB_CONFIG = {
'user': 'root',
'password': '',
'database': 'github_events',
'host': 'localhost',
'port': '3306',
'raise_on_warnings': False,
'buffered': True
}
class ProcessArchive(threading.Thread):
"""Process a single archive and store the obtained results to the database.
ProcessArchive is a thread which reads from a queue and processes its items (archive paths). Each
path is used to load the corresponding archive, its content is parsed and analysed to look for
specific events defined in TARGET_EVENTS, the obtained data is then stored in the database.
The thread ends as soon as it receives a None item, thus it returns and waits the other
threads to finish.
"""
def __init__(self, qarchive):
threading.Thread.__init__(self)
self.qarchive = qarchive
def run(self):
cnx = mysql.connector.connect(**DB_CONFIG)
cursor = cnx.cursor()
use_database = "USE " + DB_NAME
cursor.execute(use_database)
while True:
archive_path = self.qarchive.get()
if not archive_path:
logging.debug("Thread %s exiting", self.getName())
self.qarchive.task_done()
break
if not os.path.exists(archive_path):
logging.debug("Archive %s not found", archive_path)
self.qarchive.task_done()
continue
with gzip.open(archive_path, 'r') as content:
for line in content:
decoded = line.decode("utf-8")
delimited = re.sub(r'}{"(?!\W)', '}JSONDELIMITER{"', decoded)
for chunk in delimited.split('JSONDELIMITER'):
if len(chunk) == 0:
continue
try:
event = json.loads(chunk)
except Exception as e:
logging.error("Failed to load JSON %s in archive %s, %s",
chunk, archive_path, str(e))
continue
event_type = event['type']
event_payload = event['payload']
if event_type not in TARGET_EVENTS:
continue
# action attribute is in IssueEvents and PullRequestEvents
event_action = event_payload['action']
if event_action != 'opened':
continue
event_actor = event['actor']['login']
event_repo = event['repo']['url']
event_created_at = event['created_at']
cursor = cnx.cursor()
query = "INSERT IGNORE INTO events " \
"VALUES (%s, %s, %s, %s, %s, %s)"
arguments = [None, event_repo, event_actor, event_type, event_action, event_created_at]
cursor.execute(query, arguments)
cnx.commit()
cursor.close()
self.qarchive.task_done()
logging.debug("Archive %s processed by %s", archive_path, self.getName())
def connection():
"""Create a requests.Session obj which includes a retry mechanism on connection faults"""
con = requests.Session()
retries = urllib3.util.Retry(total=MAX_RETRY, connect=CONNECTION_RETRY)
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
con.mount('http://', adapter)
con.mount('https://', adapter)
return con
def format_date(date):
"""Format the date to be able to query the archives hosted on githubarchive.com.
:param date: a Datetime obj
"""
strp_date = datetime.datetime.strptime(date, "%Y-%m-%d-%H")
output_format = re.sub(r'-[0-9][0-9]$', '', date)
if strp_date.hour == 00:
output_format += '-0'
else:
output_format += '-' + str(strp_date.hour).lstrip('0')
return output_format
def update_current_date(date):
"""Update a given date by one hour.
;param date: target date
"""
d = datetime.datetime.strptime(date, "%Y-%m-%d-%H")
d = d + datetime.timedelta(hours=1)
new_date = d.strftime("%Y-%m-%d-%H")
return new_date
def download_archives(folder, from_date, to_date):
"""Download the archives from githubarchive.com generated between `from_date`
and `to_date`, and store them to `folder`.
:param folder: the folder where to store the archives
:param from_date: the starting date to download the archives
:param to_date: the ending date to download the archives
"""
con = connection()
current_date = from_date
while current_date != to_date:
formatted_date = format_date(current_date)
archive_path = os.path.join(folder, formatted_date) + '.gz'
if os.path.exists(archive_path):
logging.debug("Archive %s already downloaded", archive_path)
current_date = update_current_date(current_date)
continue
url = GHARCHIVE_URL.format(formatted_date)
try:
response = con.get(url, stream=True)
with open(archive_path, 'wb') as fd:
fd.write(response.raw.read())
logging.debug("Archive %s downloaded", archive_path)
except Exception as e:
logging.error('Archive %s not collected due to %s', url, str(e))
continue
# update current date
current_date = update_current_date(current_date)
def process_archives(folder, from_date, to_date):
"""Process the GitHub events stored in the archives in `folder` with names
between `from_date` and `to_date`.
:param folder: the folder where to store the archives
:param from_date: the starting date to process the archives
:param to_date: the ending date to process the archives
"""
qarchives = queue.Queue()
# init threads
threads = []
for i in range(POOL_SIZE):
t = ProcessArchive(qarchives)
threads.append(t)
# start threads
for t in threads:
logging.debug("Thread %s created", t.getName())
t.start()
current_date = from_date
while current_date != to_date:
formatted_date = format_date(current_date)
archive_path = os.path.join(folder, formatted_date) + '.gz'
qarchives.put(archive_path)
current_date = update_current_date(current_date)
# add dead pills
for _ in threads:
qarchives.put(None)
# wait for all threads to finish
for t in threads:
t.join()
def init_database(force_init):
"""Initialize the database (if `force_init` is True, the database is recreated), which is
composed of only the table `events` defined in the following way:
- ID - unique ID to identify the event
- REPO - API URL of the repo
- ACTOR - GitHub username
- TYPE - type of the event
- ACTION - action of the event
Two indexes on action and actor are defined to speed up queries.
:param force_init: if True deletes the previous version of the database
"""
cnx = mysql.connector.connect(**DB_CONFIG)
cursor = cnx.cursor()
if force_init:
drop_database_if_exists = "DROP DATABASE IF EXISTS " + DB_NAME
cursor.execute(drop_database_if_exists)
try:
create_database = "CREATE DATABASE " + DB_NAME
cursor.execute(create_database)
except mysql.connector.errors.DatabaseError as e:
logging.debug("Database not created, %s", str(e))
pass
cursor.execute("set global innodb_file_format = BARRACUDA")
cursor.execute("set global innodb_file_format_max = BARRACUDA")
cursor.execute("set global innodb_large_prefix = ON")
cursor.execute("set global character_set_server = utf8")
cursor.execute("set global max_connections = 500")
use_database = "USE " + DB_NAME
cursor.execute(use_database)
create_table = "CREATE TABLE IF NOT EXISTS events( " \
"id int(20) AUTO_INCREMENT PRIMARY KEY, " \
"repo varchar(255), " \
"actor varchar(255), " \
"type varchar(32), " \
"action varchar(32), " \
"created_at timestamp, " \
"INDEX actor (actor) " \
") ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;"
cursor.execute(create_table)
cursor.close()
def parser(*args):
"""Parse the commands of the script."""
def commands():
"""Define the commands of the script, which are:
--folder: Folder to store/read the archives
--download: if True, it downloads the archives, default False
--from-date: the date to start storing/reading the archives, default FROM_DATE
--to-date: the date to end storing/reading the archives, default TO_DATE
--new-db: recreate the database if exists
"""
parser = argparse.ArgumentParser()
parser.add_argument('--folder', dest='folder', help='Folder to store/read the GHArchive data')
parser.add_argument('--download', action='store_true', help='Download GHArchive data')
parser.add_argument('--from-date', dest='from_date', default=FROM_DATE, help="Starting date (yyyy-mm-dd-hh)")
parser.add_argument('--to-date', dest='to_date', default=TO_DATE, help="Ending date (yyyy-mm-dd-hh)")
parser.add_argument('--new-db', action='store_true', help="Delete the previous version of the DB if exists")
return parser
parsed_args = commands().parse_args(*args)
return parsed_args
def main():
"""This script downloads and processes the archives from githubarchive.com
between two dates (--from-date and --to-date) to a folder (--folder). It filters them according to
specific GitHub events and store them in a MySQL database (`github_events`).
"""
logging.getLogger().setLevel(logging.DEBUG)
args = parser(sys.argv[1:])
folder = args.folder
download = args.download
from_date = args.from_date
to_date = args.to_date
force_init = args.new_db
start_time = datetime_utcnow().isoformat()
logging.debug("script started at: %s", start_time)
if not os.path.exists(folder):
os.makedirs(folder)
if download:
download_archives(folder, from_date, to_date)
init_database(force_init)
process_archives(folder, from_date, to_date)
end_time = datetime_utcnow().isoformat()
logging.debug("script ended at: %s", end_time)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment