Skip to content

Instantly share code, notes, and snippets.

@narulkargunjan
Forked from tim-patterson/HivePartitioning.md
Created August 19, 2017 10:34
Show Gist options
  • Save narulkargunjan/66a27fa89e296dbdee944ee84f02e462 to your computer and use it in GitHub Desktop.
Save narulkargunjan/66a27fa89e296dbdee944ee84f02e462 to your computer and use it in GitHub Desktop.
Hive Partitioning

Hive partitioning scheme for dealing with late arriving data etc.

Over the last few years I've been quite involved with using hive for big data analysis.

I've read many web tutorials and blogs about using hadoop/hive/pig for data analysis but all them seem to be over simplified and targeted as a "my first hive query" kind of audience instead of showing how to structure hive tables and queries for real word use cases eg years of data, reoccurring batch jobs to build aggregate/reporting tables and having to deal with late arriving data etc.

Most of these tutorials look something like this

Twitter Data -> hdfs/external hive table external hive table -> hive query -> results.

This tutorial (http://hortonworks.com/hadoop-tutorial/using-hive-data-analysis/) is better than most as it at least shows the process of loading data into an ORC formatted table from an uncompressed delimited text formatted table. This process of loading data into a hive "native" file format can result in some pretty big gains in terms of storage and query performance.

The example query used in the hortonworks tutorial is an aggregate of number of tweets per profile location.

If we productionized this query as is, we would end up with the scenario where the query has to reprocess all the raw data(potentially years worth) every batch run!.

Lets see if we can improve on this.

We'll start with a couple of aggregates.

  • Daily statistics for each tweeter, eg number of tweets, time of first tweet.
  • Daily Statistics by location, eg number of tweets, number of unique tweeters.

To keep things clean we'll use hive database's to keep things organised. eg

  • staging

    • twitter_staging (This table will be an external table to load data from, it will just contain the "fresh" data for each batch run)
  • raw

    • twitter_tweets (contains all historical data in orc format)
  • aggregates

    • stats_by_tweeter_daily
    • stats_by_location_daily

Tables

First I'll create the databases.

create database staging;
create database raw;
create database aggregates;

And now the tables.

staging.twitter_staging

The staging table is pretty simple, just a table that we expect the "fresh" csv data to be loaded into prior to each batch run(after deleting the data from the previous run)

CREATE EXTERNAL TABLE staging.twitter_staging (
  tweetId BIGINT,
  username STRING,
  txt STRING,
  CreatedAt STRING,
  profileLocation STRING,
  favc BIGINT,
  retweet STRING,
  retcount BIGINT,
  followerscount BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

raw.twitter_tweets

The raw table, partitioned by date for efficient querying when querying on restricted date ranges. Also partitioned by batch_id, the purpose of this will become clear once you look at the queries.

CREATE TABLE raw.twitter_tweets (
  tweet_id BIGINT,
  username STRING,
  txt STRING,
  created_at BIGINT,
  profile_location STRING,
  favc BIGINT,
  retweet STRING,
  retcount BIGINT,
  followers_count BIGINT
)
PARTITIONED BY (batch_id STRING, date STRING)
STORED AS ORC tblproperties ("orc.compress"="ZLIB");

aggregates.stats_by_tweeter_daily

The first aggregate table, pretty simple, I've chosen to partition this by date due to it potentially being quite a large table

CREATE TABLE aggregates.stats_by_tweeter_daily (
  username STRING,
  profile_location STRING,
  total_tweets BIGINT,
  first_tweet_time BIGINT,
  last_tweet_time BIGINT
)
PARTITIONED BY (date string)
STORED AS ORC tblproperties ("orc.compress"="ZLIB");

aggregates.stats_by_location_daily

The final aggregate table, I've chosen to leave this table unpartitioned due to its relatively small size.

CREATE TABLE aggregates.stats_by_location_daily (
  location STRING,
  date STRING,
  unique_tweeter_count BIGINT,
  total_tweets BIGINT,
  avg_first_tweet_time BIGINT,
  avg_last_tweet_time BIGINT
)
STORED AS ORC tblproperties ("orc.compress"="ZLIB");

A note on Batch_id.

One of the partition columns for raw.twitter_tweets is batch_id, I'm expecting to have the scheduler running these queries inject some kind of unique identifier/batch_start_time that can be used for this.

The data can then be queried by date(eg when the data was generated) or by batch_id(when the data was loaded into the system) or both.

With having both the batch_id and date as partition columns we can represent how the data might look by using the following diagram, with the 'X's representing data. Partitions Diagram The diagram is illustrating the common case of running the queries once a day but the queries will still work just as well if you decide to run them 10 times a day or once every 10 days or even on an adhoc schedule.

Queries.

staging.twitter_staging -> raw.twitter_tweets

This query is fairly simple the only tricky thing it does is convert the incoming timestamps to unix epoch style timestamps and use dymanic partitioning for the date partition.

INSERT OVERWRITE TABLE raw.twitter_tweets
PARTITION (batch_id = '${hiveconf:batch_id}', date)
SELECT
  tweetId AS tweet_id,  
  username,
  txt,
  unix_timestamp(CreatedAt,'EEE MMM dd HH:mm:ss z yyyy') AS created_at,
  profileLocation AS profile_location,
  favc,
  retweet,
  retcount,
  followerscount,
  from_unixtime(unix_timestamp(CreatedAt,'EEE MMM dd HH:mm:ss z yyyy'),'yyyy-MM-dd') AS date
FROM
  staging.twitter_staging;

raw.twitter_tweets -> aggregates.stats_by_tweeter_daily

This query uses a custom udf - partition_prune to only select the date's which have had new data with this batch_id, because the arguments to the functions are all constants or partition columns hive will actually execute this at query compile time and will be able to prune out all the partitions not needed/used by the query. The udf I used is located here: https://github.com/tim-patterson/partitionprune/blob/master/src/main/java/partitionprune/PartitionPrune.java

add jar partitionprune-0.0.1-SNAPSHOT.jar;
create temporary function partition_prune as 'partitionprune.PartitionPrune';
set hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE aggregates.stats_by_tweeter_daily
PARTITION(date)
SELECT
  username,
  profile_location,
  count(1) AS total_tweets,
  min(created_at) AS first_tweet_time,
  max(created_at) AS last_tweet_time,
  date
FROM
  raw.twitter_tweets
WHERE
  partition_prune('raw.twitter_tweets', 'batch_id','${hiveconf:batch_id}', 'date', date)
GROUP BY
  username,
  profile_location,
  date;

aggregates.stats_by_tweeter_daily -> aggregates.stats_by_location_daily

This query is similar to the previous one except that it doesn't use partitioning, so we need to merge the newly arriving data with the data already in the table.

add jar partitionprune-0.0.1-SNAPSHOT.jar;
create temporary function partition_prune as 'partitionprune.PartitionPrune';

INSERT OVERWRITE TABLE aggregates.stats_by_location_daily
SELECT
  coalesce(new.location, existing.location) AS location,
  coalesce(new.date, existing.date) AS date,
  coalesce(new.unique_tweeter_count, existing.unique_tweeter_count) AS unique_tweeter_count,
  coalesce(new.total_tweets, existing.total_tweets) AS total_tweets,
  coalesce(new.avg_first_tweet_time, existing.avg_first_tweet_time) AS avg_first_tweet_time,
  coalesce(new.avg_last_tweet_time, existing.avg_last_tweet_time) AS avg_last_tweet_time
FROM
  aggregates.stats_by_location_daily existing
FULL OUTER JOIN (
  SELECT
    profile_location AS location,
    date,
    count(1) AS unique_tweeter_count,
    sum(total_tweets) AS total_tweets,
    avg(first_tweet_time) AS avg_first_tweet_time,
    avg(last_tweet_time) AS avg_last_tweet_time
  FROM
    aggregates.stats_by_tweeter_daily
  WHERE
    partition_prune('raw.twitter_tweets', 'batch_id','${hiveconf:batch_id}', 'date', date)
  GROUP BY
    profile_location,
    date
) new
ON new.location <=> existing.location
AND new.date <=> existing.date;

Below I've attempted to illustrate the end to end dataflow for batch_id=5 with the cells representing hive partitions. Dataflow Diagram It should be noted that stats_by_location_daily isn't partitioned so each time we write to it we have to join onto the data already contained in the table. Depending on the amount of data in the partition it might be of benefit to partition this table as well or maybe partition on a coarser grain eg month instead of date to avoid the overhead of too many small partitions

Theres a few nice properties of structuring your data/queries like this

  • Frequency of batch runs isn't tied to once a day/something else, its completely flexible.
  • All queries are individually idempotent.
  • No special logic/adhoc queries required for backloading old data etc.
  • You only pay the cost for late arriving data when needed(unlike in the case of always recalculating the last 'x' days of aggregates).

While I've used hive in these examples I don't see why the same concepts can't be applied to pig/raw mapreduce scripts etc.

If anyone has any feedback or potential improvements to this approach I'd love to hear them!.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment