Skip to content

Instantly share code, notes, and snippets.

@64lines
Created March 5, 2019 12:16
Show Gist options
  • Save 64lines/91f80c7a0e5f848c384252f86f439272 to your computer and use it in GitHub Desktop.
Save 64lines/91f80c7a0e5f848c384252f86f439272 to your computer and use it in GitHub Desktop.
# Job to load data from platform events db to parquet
# Based on Ky's script
#
# Parameters:
# --MONTHS: amount of months to overwrite the data. If value is "ALL" load all data
import os
import sys
import math
from datetime import datetime
from awsglue.job import Job
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
import he_glue_libs.utils.dynamic_frame_util as dfu
import he_glue_libs.settings as settings
import he_glue_libs.utils.date_util as du
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Setting the name of the table to be stored
TABLENAME = 'normalized_events_full'
# Setting the pushdown query to get historic data.
PUSHDOWN_QUERY = '(select * from normalized_events where "eventKey" in (\'WINKOUT_PRODUCT_VIEW\',\'LINKOUT_PRODUCT_VIEW\',\'WINKOUT_PRODUCT_CONVERSION\',\'LINKOUT_PRODUCT_CONVERSION\')) events'
UPPERBOUND_QUERY = '(select max(id) from normalized_events) events'
LOWERBOUND_QUERY = '(select min(id) from normalized_events) events'
class DataFrameHelper:
def minus(self, first_df, second_df):
return first_df.subtract(second_df).alias('new_only_df')
def get_max_id(self, df):
return df.agg({'id': 'max'}).collect()[0][0]
def add_dates(self, fact_df):
return fact_df \
.withColumn('month', month('createdat')) \
.withColumn('day', dayofmonth('createdat')) \
.withColumn('year', year('createdat'))
class PartitionOperator:
# Getting upper and lower bounds
RECORDS_PER_QUERY = 10000
def calculate_num_of_partitions(self, upperbound, lowerbound):
return math.ceil((upperbound - lowerbound) / self.RECORDS_PER_QUERY)
class JDBCOperator:
def get_bound(self, jdbcUrl, query):
return glueContext.read.jdbc(
url=jdbcUrl,
table=query
).head()[0]
def get_jdbc_url(self):
# need to replace this with aws secrets manager
username = 'app'
password = 'i.9yxwksn7Wf,xN8'
jdbcHostname = "172.31.60.227"
jdbcDatabase = "platform_events"
jdbcPort = 5432
return "jdbc:postgresql://{0}:{1}/{2}?user={3}&password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)
def get_destination():
return os.path.join(settings.TRANSFORMED_DATA_BUCKET, 'PlatformEvents', TABLENAME)
def main():
operator = JDBCOperator()
helper = DataFrameHelper()
jdbcUrl = operator.get_jdbc_url()
upperbound = operator.get_bound(jdbcUrl, UPPERBOUND_QUERY)
lowerbound = operator.get_bound(jdbcUrl, LOWERBOUND_QUERY)
existing_df = glueContext.create_dynamic_frame.from_catalog(
database = "conversion-ledger-parquet",
table_name = "normalized_events_full",
transformation_ctx = "datasource0"
).toDF()
fact_df = glueContext.read.jdbc(
url=jdbcUrl,
table=PUSHDOWN_QUERY,
column='id',
upperBound=upperbound,
lowerBound=lowerbound,
numPartitions=PartitionOperator().calculate_num_of_partitions(upperbound, lowerbound)
)
fact_df = helper.add_dates(fact_df)
fact_df = helper.minus(fact_df, existing_df)
dfu.write_df_to_parq(
df=fact_df,
destination_path=get_destination(),
partition_keys= ['year', 'month', 'day'],
mode='append',
num_files=1
)
main()
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment