Last active
May 8, 2024 14:16
-
-
Save tolufakiyesi/b754c3b9eb3e8bbf247400331e790459 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
from pyspark.sql.functions import * | |
from awsglue.dynamicframe import DynamicFrame | |
## @params: [JOB_NAME] | |
args = getResolvedOptions(sys.argv, ['JOB_NAME']) | |
OUTPUT_PATH = "s3://data-store-staging/ordered/" | |
OUTPUT_FILE_FORMAT = "parquet" | |
DATABASE = "data-pipeline-lake-staging" | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
## @type: DataSource | |
## @args: [database = "data-pipeline-lake-staging", table_name = "profiles", transformation_ctx = "profiles_source"] | |
## @return: profiles_source | |
## @inputs: [] | |
profiles_source = glueContext.create_dynamic_frame.from_catalog(database = "data-pipeline-lake-staging", table_name = "profiles", transformation_ctx = "profiles_source") | |
## @type: ApplyMapping | |
## @args: [mapping = [("user_id", "string", "user_id", "string"), ("gender", "string", "gender", "string"), ("age", "string", "age", "string"), ("year_of_birth", "string", "year_of_birth", "string"), ("address_continent", "string", "address_continent", "string"), ("address_region", "string", "address_region", "string"), ("address_country", "string", "address_country", "string"), ("address_lga", "string", "address_lga", "string"), ("address_state", "string", "address_state", "string"), ("phone_type", "string", "phone_type", "string"), ("profile_value", "string", "profile_value", "string"), ("browser_name", "string", "browser_name", "string"), ("browser_version", "string", "browser_version", "string"), ("phone_manufacturer", "string", "phone_manufacturer", "string"), ("phone_model", "string", "phone_model", "string"), ("os_vendor", "string", "os_vendor", "string"), ("os_name", "string", "os_name", "string"), ("os_version", "string", "os_version", "string"), ("os_sub_version", "string", "os_sub_version", "string"), ("spend_total", "double", "spend_total", "double"), ("channel", "string", "channel", "string"), ("service_name", "string", "service_name", "string"), ("service_cost", "string", "service_cost", "string"), ("interest", "string", "interest", "string"), ("recharge_mode", "string", "recharge_mode", "string"), ("airtime_recharge", "string", "airtime_recharge", "string"), ("prior_recharge", "string", "prior_recharge", "string"), ("post_recharge", "string", "post_recharge", "string"), ("useragent", "string", "useragent", "string")], transformation_ctx = "applymapping_profiles"] | |
## @return: applymapping_profiles | |
## @inputs: [frame = profiles_source] | |
applymapping_profiles = ApplyMapping.apply(frame = profiles_source, mappings = [("user_id", "string", "user_id", "string"), ("gender", "string", "gender", "string"), ("age", "string", "age", "string"), ("year_of_birth", "string", "year_of_birth", "string"), ("address_continent", "string", "address_continent", "string"), ("address_region", "string", "address_region", "string"), ("address_country", "string", "address_country", "string"), ("address_lga", "string", "address_lga", "string"), ("address_state", "string", "address_state", "string"), ("phone_type", "string", "phone_type", "string"), ("profile_value", "string", "profile_value", "string"), ("browser_name", "string", "browser_name", "string"), ("browser_version", "string", "browser_version", "string"), ("phone_manufacturer", "string", "phone_manufacturer", "string"), ("phone_model", "string", "phone_model", "string"), ("os_vendor", "string", "os_vendor", "string"), ("os_name", "string", "os_name", "string"), ("os_version", "string", "os_version", "string"), ("os_sub_version", "string", "os_sub_version", "string"), ("spend_total", "double", "spend_total", "double"), ("channel", "string", "channel", "string"), ("service_name", "string", "service_name", "string"), ("service_cost", "string", "service_cost", "string"), ("interest", "string", "interest", "string"), ("recharge_mode", "string", "recharge_mode", "string"), ("airtime_recharge", "string", "airtime_recharge", "string"), ("prior_recharge", "string", "prior_recharge", "string"), ("post_recharge", "string", "post_recharge", "string"), ("useragent", "string", "useragent", "string")], transformation_ctx = "applymapping_profiles") | |
## @type: SelectFields | |
## @args: [paths = ["paths"], transformation_ctx = "profiles_fields"] | |
## @return:profiles_fields | |
## @inputs:applymapping_profiles | |
profiles_fields = SelectFields.apply(frame = applymapping_profiles, paths = ["tap_id","msisdn", "age", "gender", "marital_status","location_address","location_continent","location_region","location_country","location_state","location_lga","location_city","device_manufacturer","device_model","os_vendor","os_name","os_version","occupation","spend_data","spend_total","spend_vas","customer_class","customer_value","@version"], transformation_ctx = "profiles_fields") | |
## @type: ResolveChoice | |
## @args: [choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "terragon_sterling_profiles", transformation_ctx = "resolvechoiceprofiles0"] | |
## @return: resolvechoiceprofiles0 | |
## @inputs: [frame = mtnBibFields] | |
resolvechoiceprofiles0 = ResolveChoice.apply(frame = profiles_fields, choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "terragon_sterling_profiles", transformation_ctx = "resolvechoiceprofiles0") | |
## @type: ResolveChoice | |
## @args: [choice = "make_struct", transformation_ctx = "resolvechoiceprofiles1"] | |
## @return: resolvechoiceprofiles1 | |
## @inputs: [frame = applymapping_profiles] | |
resolvechoiceprofiles1 = ResolveChoice.apply(frame = applymapping_profiles, choice = "make_struct", transformation_ctx = "resolvechoiceprofiles1") | |
## @type: DataSource | |
## @args: [database = "data-pipeline-lake-new-db", table_name = "demography", transformation_ctx = "selected_source"] | |
## @return: selected_source | |
## @inputs: [] | |
# Check location | |
selected_source = glueContext.create_dynamic_frame.from_catalog(database = "data-pipeline-lake-staging", table_name = "selected", transformation_ctx="selected_source") | |
## @type: ApplyMapping | |
## @args: [mappings = <mappings>, transformation_ctx = "<transformation_ctx>"] | |
## @return: applymapping_selected | |
## @inputs: selected_source | |
applymapping_selected = ApplyMapping.apply(frame = selected_source, mappings = [("user_id", "string", "user_id", "string"), ("column_count", "int", "column_count", "int")], transformation_ctx = "applymapping_selected") | |
## @type: SelectFields | |
## @args: [paths = ["paths"], transformation_ctx = "selected_fields"] | |
## @return:selected_fields | |
## @inputs:applymapping_selected | |
selected_fields = SelectFields.apply(frame = applymapping_selected, paths = ["user_id","column_count"], transformation_ctx = "selected_fields") | |
## @type: ResolveChoice | |
## @args: [choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "selected", transformation_ctx = "resolvechoiceselected0"] | |
## @return: resolvechoiceselected0 | |
## @inputs: [frame = selected_fields] | |
resolvechoiceselected0 = ResolveChoice.apply(frame = selected_fields, choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "selected", transformation_ctx = "resolvechoiceselected0") | |
## @type: ResolveChoice | |
## @args: [choice = "make_struct", transformation_ctx = "resolvechoiceselected1"] | |
## @return: resolvechoiceselected1 | |
## @inputs: [frame = resolvechoiceselected0] | |
resolvechoiceselected1 = ResolveChoice.apply(frame = resolvechoiceselected0, choice = "make_struct", transformation_ctx = "resolvechoiceselected1") | |
profiles_df = resolvechoiceprofiles1.toDF() | |
profiles_df.createOrReplaceTempView("profiles_temp_table") | |
selected_df = resolvechoiceselected1.toDF() | |
selected_df.createOrReplaceTempView("selected_temp_table") | |
consolidated_df = spark.sql(""" | |
SELECT | |
A.user_id, | |
A.gender, | |
A.age, | |
A.year_of_birth, | |
A.address_continent, | |
A.address_region, | |
A.address_country, | |
A.address_lga, | |
A.address_state, | |
A.phone_type, | |
A.profile_value, | |
A.browser_name, | |
A.browser_version, | |
A.phone_manufacturer, | |
A.phone_model, | |
A.os_vendor, | |
A.os_name, | |
A.os_version, | |
A.os_sub_version, | |
A.spend_total, | |
A.channel, | |
A.service_name, | |
A.service_cost, | |
A.interest, | |
B.column_count | |
FROM profiles_temp_table A | |
LEFT JOIN selected_temp_table B | |
ON A.user_id=B.user_id | |
""") | |
output_df = consolidated_df.orderBy('column_count', ascending=False) | |
## @type: DropNullFields | |
## @args: [transformation_ctx = "dropnullfields3"] | |
## @return: dropnullfields3 | |
## @inputs: [frame = resolvechoice2] | |
# dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") | |
## @type: DataSink | |
## @return: datasink4 | |
## @inputs: [frame = dropnullfields3] | |
consolidated_dynamicframe = DynamicFrame.fromDF(output_df.repartition(1), glueContext, "consolidated_dynamicframe") | |
datasink_output = glueContext.write_dynamic_frame.from_options(frame = consolidated_dynamicframe, connection_type = "s3", connection_options = {"path": OUTPUT_PATH}, format = OUTPUT_FILE_FORMAT, transformation_ctx = "datasink_output") | |
job.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment