Last active
June 30, 2016 15:29
-
-
Save marcocitus/7316d7ec1aa73d1bac5b to your computer and use it in GitHub Desktop.
Scripts for loading Github events into Citus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE github_events | |
( | |
event_id bigint, | |
event_type text, | |
event_public boolean, | |
repo_id bigint, | |
payload jsonb, | |
repo jsonb, | |
actor jsonb, | |
org jsonb, | |
created_at timestamp | |
); | |
CREATE INDEX ON github_events (event_type); | |
CREATE INDEX ON github_events USING GIN (actor jsonb_path_ops); | |
CREATE INDEX ON github_events USING GIN (repo jsonb_path_ops); | |
SELECT master_create_distributed_table('github_events', 'created_at', 'append'); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION get_date_shard(events_date date) | |
RETURNS bigint AS | |
$BODY$ | |
DECLARE | |
date_shard_id bigint; | |
BEGIN | |
SELECT shardid INTO date_shard_id FROM pg_dist_shard | |
WHERE logicalrelid = 'github_events'::regclass AND shardminvalue::date = events_date; | |
IF NOT FOUND THEN | |
SELECT master_create_empty_shard('github_events') INTO date_shard_id; | |
UPDATE pg_dist_shard | |
SET shardminvalue = events_date::timestamp, | |
shardmaxvalue = events_date::timestamp + interval '1 day' - interval '1 second' | |
WHERE shardid = date_shard_id; | |
END IF; | |
RETURN date_shard_id; | |
END; | |
$BODY$ | |
LANGUAGE plpgsql; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh -e | |
date=$1 | |
start_hour=$2 | |
end_hour=${3:-$2} | |
# Get a shard for the date | |
shard_id=$(psql -tA -c "SELECT get_date_shard('$date')") | |
# Stage from one of the shard placements | |
worker_name=$(psql -tA -F" " -c "SELECT nodename FROM pg_dist_shard_placement WHERE shardid = $shard_id LIMIT 1") | |
# Load the raw data from githubarchive.org | |
stage_table=$(psql -tA -h $worker_name -c "SELECT load_github_events('$date', $start_hour, $end_hour)") | |
# Append the data to the appropriate shard | |
psql -c "SELECT master_append_table_to_shard($shard_id, '$stage_table', '$worker_name', 5432)" >/dev/null | |
# Drop the stage table | |
psql -h $worker_name -c "DROP TABLE $stage_table" >/dev/null | |
echo loaded $date from $start_hour:00:00 to $end_hour:59:59 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE SEQUENCE IF NOT EXISTS stage_id; | |
CREATE OR REPLACE FUNCTION load_github_events(events_date date, start_hour int, end_hour int) RETURNS text AS | |
$BODY$ | |
DECLARE | |
stage_table text := 'stage_'||nextval('stage_id'); | |
BEGIN | |
CREATE TEMPORARY TABLE input (data jsonb); | |
/* Download, decompress, and filter JSON data */ | |
EXECUTE format('COPY input FROM PROGRAM ''curl -s http://data.githubarchive.org/%s-{%s..%s}.json.gz | zcat | grep -v "\\u0000"'''|| | |
'CSV QUOTE e''\x01'' DELIMITER e''\x02''', events_date, start_hour, end_hour); | |
/* Convert raw JSON to table format */ | |
EXECUTE format('CREATE TABLE %I AS '|| | |
'SELECT (data->>''id'')::bigint AS event_id, '|| | |
'(data->>''type'')::text AS event_type, '|| | |
'(data->>''public'')::boolean AS event_public, '|| | |
'(data->''repo''->>''id'')::bigint AS repo_id, '|| | |
'data->''payload'' AS payload, '|| | |
'data->''repo'' AS repo, '|| | |
'data->''actor'' AS actor, '|| | |
'data->''org'' AS org, '|| | |
'(data->>''created_at'')::timestamp AS created_at '|| | |
'FROM input', stage_table); | |
RETURN stage_table; | |
END; | |
$BODY$ | |
LANGUAGE plpgsql; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment