Last active
March 28, 2023 08:55
-
-
Save karol-blaszczyk/decd2114c1c049bb836ca766d7eb40cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import re | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.dynamicframe import DynamicFrame | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
from pyspark.sql.functions import col | |
from awsglue.transforms import * | |
import pg8000 | |
import hashlib | |
## @params: [JOB_NAME] | |
args = getResolvedOptions(sys.argv, | |
['JOB_NAME', | |
'db_name', | |
'db_user', | |
'db_password', | |
'db_host']) | |
# CREATE SPARK & GLUE CONTEXT's | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
## Connect PG8000 to database | |
conn = pg8000.connect(database=args['db_name'],user=args['db_user'],password=args['db_password'],host=args['db_host'],port=5432) | |
cur = conn.cursor() | |
# data catalog from aws glue | |
catalog_database = "reports" | |
age_catalog = "aage_report" | |
gender_catalog = "gender_report" | |
performance_catalog = "campaign_report" | |
# Main Database Tables that data will be UPSERT to | |
campaigns_table = "campaigns" | |
adgroups_table = "adgroups" | |
reports_table = "reports" | |
# Temporary tables used for UPSERT'ing | |
tmp_campaigns_peformance_table = "performance_campaigns_tmp" | |
tmp_campaigns_age_table = "age_campaigns_tmp" | |
tmp_campaigns_gender_table = "gender_campaigns_tmp" | |
tmp_adgroup_age_table = "age_ad_groups_tmp" | |
tmp_adgroup_gender_table = "gender_ad_groups_tmp" | |
tmp_performance_report_table = "performance_report_tmp" | |
tmp_gender_report_table = "gender_report_tmp" | |
tmp_age_report_table = "age_report_tmp" | |
# Generate report ID | |
def _report_id(row): | |
m = hashlib.md5() | |
if 'age range' in row: | |
m.update('{}_{}_{}_{}_{}_{}'.format(row["campaign id"],row["ad group id"], row["day"], row["type"], row['device'], row['age range']).encode()) | |
return m.hexdigest() | |
elif 'gender' in row: | |
m.update('{}_{}_{}_{}_{}_{}'.format(row["campaign id"],row["ad group id"], row["day"], row["type"], row['device'], row['gender']).encode()) | |
return m.hexdigest() | |
else: | |
m.update('{}_{}'.format(row["campaign id"], row["day"]).encode()) | |
return m.hexdigest() | |
####### Map STRIKE ID for each row, Map video played to 100% as double | |
def map_function(dynamicRecord): | |
if 'age range' in dynamicRecord: | |
dynamicRecord['type'] = 'AGE' | |
elif 'gender' in dynamicRecord: | |
dynamicRecord['type'] = 'GENDER' | |
else: | |
dynamicRecord['type'] = 'PERFORMANCE' | |
dynamicRecord['report_id'] = _report_id(dynamicRecord) | |
return dynamicRecord | |
# Save DynamicFrame to Database table | |
# Remove duplicated records before save | |
def _save_to_database(frame, table_name): | |
df = frame.toDF().drop_duplicates(subset = ['id']) | |
df.write.format("jdbc").mode('overwrite')\ | |
.option("url", "jdbc:postgresql://{}:5432/{}".format(args['db_host'],args['db_name'])) \ | |
.option("user", args['db_user']).option("password", args['db_password']).option("truncate", "true") \ | |
.option("dbtable", table_name).save() | |
# Extract campaign data from report and save it to database table | |
def _save_campaings_from_report(frame, table_name): | |
campaigns = SelectFields.apply(frame = frame, paths = ['campaign', 'campaign id'], transformation_ctx='campaigns_{}'.format(table_name)) | |
campaings_mapped = ApplyMapping.apply( | |
frame = campaigns, | |
mappings = [("campaign id", "long", "id", "long"), ("campaign", "string", "name", "string")], | |
transformation_ctx = 'mapping_campaigns_{}'.format(table_name) | |
) | |
_save_to_database(campaings_mapped,table_name) | |
# Extract AdGroups data from report and save it to database table | |
def _save_ad_groups_from_report(frame, table_name): | |
adgroups = SelectFields.apply(frame = frame, paths = ['ad group', 'ad group id'], transformation_ctx='adgroup_{}'.format(table_name)) | |
adgroups_mapped = ApplyMapping.apply( | |
frame = adgroups, | |
mappings = [("ad group id", "long", "id", "long"), ("ad group", "string", "name", "string")], | |
transformation_ctx = 'mapping_adgroup_{}'.format(table_name) | |
) | |
_save_to_database(adgroups_mapped, table_name) | |
# Save Report data to database table | |
def _save_report(frame=[],mappings=[],table_name="",drop_fields=[]): | |
report = DropFields.apply(frame = frame, paths = drop_fields, transformation_ctx = table_name) | |
report = ApplyMapping.apply(frame = report, mappings = mappings, transformation_ctx = 'ApplyMapping_{}'.format(table_name)) | |
_save_to_database(report, table_name) | |
# Collect data from catalog | |
def _get_mapped_dynamic_frame(table_name): | |
# Collect data from catalog | |
data = glueContext.create_dynamic_frame.from_catalog(database = catalog_database, table_name = table_name, transformation_ctx = table_name) | |
if data.count() == 0: | |
return data | |
# Map Reports | |
data = Map.apply(frame = data, f = map_function, transformation_ctx = table_name) | |
return data | |
#### AGE | |
age_report = _get_mapped_dynamic_frame(age_catalog) | |
if age_report.count() > 0: | |
_save_campaings_from_report(age_report, tmp_campaigns_age_table) | |
_save_ad_groups_from_report(age_report, tmp_adgroup_age_table) | |
_save_report(frame = age_report, table_name = tmp_age_report_table, drop_fields = ['campaign', 'ad group'], mappings = [ | |
("report_id", "string", "id", "string"), | |
("type", "string", "type", "string"), | |
("campaign id", "long", "campaign_id", "long"), | |
("day", "string", "day", "date"), | |
("clicks", "long", "clicks", "long"), | |
("cost", "long", "cost", "long"), | |
("impressions", "long", "impressions", "long"), | |
("views", "long", "views", "long"), | |
("ad group id", "long", "adgroup_id", "long"), | |
("device", "string", "device", "string"), | |
("age range", "string", "age_range", "string"), | |
]) | |
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(campaigns_table, tmp_campaigns_age_table)) | |
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(adgroups_table,tmp_adgroup_age_table)) | |
cur.execute("INSERT INTO {} (id, type, campaign_id, day, clicks, cost, impressions, views, video_played, adgroup_id, device, age_range) SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET clicks = EXCLUDED.clicks, cost = EXCLUDED.cost, impressions = EXCLUDED.impressions, views = EXCLUDED.views".format(reports_table, tmp_age_report_table)) | |
cur.execute("DROP TABLE {}".format(tmp_campaigns_age_table)) | |
cur.execute("DROP TABLE {}".format(tmp_adgroup_age_table)) | |
cur.execute("DROP TABLE {}".format(tmp_age_report_table)) | |
### GENDER | |
gender_report = _get_mapped_dynamic_frame(gender_catalog) | |
if gender_report.count() > 0: | |
_save_campaings_from_report(gender_report, tmp_campaigns_gender_table) | |
_save_ad_groups_from_report(gender_report, tmp_adgroup_gender_table) | |
_save_report(frame = gender_report, table_name = tmp_gender_report_table, drop_fields = ['campaign', 'ad group'], mappings = [ | |
("report_id", "string", "id", "string"), | |
("type", "string", "type", "string"), | |
("campaign id", "long", "campaign_id", "long"), | |
("day", "string", "day", "date"), | |
("clicks", "long", "clicks", "long"), | |
("cost", "long", "cost", "long"), | |
("impressions", "long", "impressions", "long"), | |
("views", "long", "views", "long"), | |
("ad group id", "long", "adgroup_id", "long"), | |
("device", "string", "device", "string"), | |
("gender", "string", "gender", "string"), | |
]) | |
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(campaigns_table, tmp_campaigns_gender_table)) | |
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(adgroups_table,tmp_adgroup_gender_table)) | |
cur.execute("INSERT INTO {} (id, type, campaign_id, day, clicks, cost, impressions, views, adgroup_id, device, gender) SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET clicks = EXCLUDED.clicks, cost = EXCLUDED.cost, impressions = EXCLUDED.impressions, views = EXCLUDED.views".format(reports_table, tmp_gender_report_table)) | |
cur.execute("DROP TABLE {}".format(tmp_campaigns_gender_table)) | |
cur.execute("DROP TABLE {}".format(tmp_adgroup_gender_table)) | |
cur.execute("DROP TABLE {}".format(tmp_gender_report_table)) | |
# PERFORMANCE | |
performance_report = _get_mapped_dynamic_frame(performance_catalog) | |
if performance_report.count() > 0: | |
_save_campaings_from_report(performance_report, tmp_campaigns_peformance_table) | |
_save_report(frame = performance_report, table_name = tmp_performance_report_table, drop_fields = ['campaign'], mappings = [ | |
("report_id", "string", "id", "string"), | |
("type", "string", "type", "string"), | |
("campaign id", "long", "campaign_id", "long"), | |
("day", "string", "day", "date"), | |
("clicks", "long", "clicks", "long"), | |
("cost", "long", "cost", "long"), | |
("impressions", "long", "impressions", "long"), | |
("views", "long", "views", "long"), | |
]) | |
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(campaigns_table, tmp_campaigns_peformance_table)) | |
cur.execute("INSERT INTO {} (id, type, campaign_id, day, clicks, cost, impressions, views) SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET clicks = EXCLUDED.clicks, cost = EXCLUDED.cost, impressions = EXCLUDED.impressions, views = EXCLUDED.views".format(reports_table, tmp_performance_report_table)) | |
cur.execute("DROP TABLE {}".format(tmp_campaigns_peformance_table)) | |
cur.execute("DROP TABLE {}".format(tmp_performance_report_table)) | |
conn.commit() | |
cur.close() | |
conn.close() | |
job.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment