Created
March 5, 2019 12:16
-
-
Save 64lines/91f80c7a0e5f848c384252f86f439272 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Job to load data from platform events db to parquet | |
# Based on Ky's script | |
# | |
# Parameters: | |
# --MONTHS: amount of months to overwrite the data. If value is "ALL" load all data | |
import os | |
import sys | |
import math | |
from datetime import datetime | |
from awsglue.job import Job | |
from awsglue.context import GlueContext | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from pyspark.sql.functions import * | |
import he_glue_libs.utils.dynamic_frame_util as dfu | |
import he_glue_libs.settings as settings | |
import he_glue_libs.utils.date_util as du | |
glueContext = GlueContext(SparkContext.getOrCreate()) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
args = getResolvedOptions(sys.argv, ['JOB_NAME']) | |
# Setting the name of the table to be stored | |
TABLENAME = 'normalized_events_full' | |
# Setting the pushdown query to get historic data. | |
PUSHDOWN_QUERY = '(select * from normalized_events where "eventKey" in (\'WINKOUT_PRODUCT_VIEW\',\'LINKOUT_PRODUCT_VIEW\',\'WINKOUT_PRODUCT_CONVERSION\',\'LINKOUT_PRODUCT_CONVERSION\')) events' | |
UPPERBOUND_QUERY = '(select max(id) from normalized_events) events' | |
LOWERBOUND_QUERY = '(select min(id) from normalized_events) events' | |
class DataFrameHelper: | |
def minus(self, first_df, second_df): | |
return first_df.subtract(second_df).alias('new_only_df') | |
def get_max_id(self, df): | |
return df.agg({'id': 'max'}).collect()[0][0] | |
def add_dates(self, fact_df): | |
return fact_df \ | |
.withColumn('month', month('createdat')) \ | |
.withColumn('day', dayofmonth('createdat')) \ | |
.withColumn('year', year('createdat')) | |
class PartitionOperator: | |
# Getting upper and lower bounds | |
RECORDS_PER_QUERY = 10000 | |
def calculate_num_of_partitions(self, upperbound, lowerbound): | |
return math.ceil((upperbound - lowerbound) / self.RECORDS_PER_QUERY) | |
class JDBCOperator: | |
def get_bound(self, jdbcUrl, query): | |
return glueContext.read.jdbc( | |
url=jdbcUrl, | |
table=query | |
).head()[0] | |
def get_jdbc_url(self): | |
# need to replace this with aws secrets manager | |
username = 'app' | |
password = 'i.9yxwksn7Wf,xN8' | |
jdbcHostname = "172.31.60.227" | |
jdbcDatabase = "platform_events" | |
jdbcPort = 5432 | |
return "jdbc:postgresql://{0}:{1}/{2}?user={3}&password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password) | |
def get_destination(): | |
return os.path.join(settings.TRANSFORMED_DATA_BUCKET, 'PlatformEvents', TABLENAME) | |
def main(): | |
operator = JDBCOperator() | |
helper = DataFrameHelper() | |
jdbcUrl = operator.get_jdbc_url() | |
upperbound = operator.get_bound(jdbcUrl, UPPERBOUND_QUERY) | |
lowerbound = operator.get_bound(jdbcUrl, LOWERBOUND_QUERY) | |
existing_df = glueContext.create_dynamic_frame.from_catalog( | |
database = "conversion-ledger-parquet", | |
table_name = "normalized_events_full", | |
transformation_ctx = "datasource0" | |
).toDF() | |
fact_df = glueContext.read.jdbc( | |
url=jdbcUrl, | |
table=PUSHDOWN_QUERY, | |
column='id', | |
upperBound=upperbound, | |
lowerBound=lowerbound, | |
numPartitions=PartitionOperator().calculate_num_of_partitions(upperbound, lowerbound) | |
) | |
fact_df = helper.add_dates(fact_df) | |
fact_df = helper.minus(fact_df, existing_df) | |
dfu.write_df_to_parq( | |
df=fact_df, | |
destination_path=get_destination(), | |
partition_keys= ['year', 'month', 'day'], | |
mode='append', | |
num_files=1 | |
) | |
main() | |
job.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment