-
-
Save absognety/8a5240956dc2c98bf66aa3c39deed1f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
# write to a path using the Hudi format | |
def hudi_write(df, schema, table, path, mode, hudi_options): | |
hudi_options = { | |
"hoodie.datasource.write.recordkey.field": "recordkey", | |
"hoodie.datasource.write.precombine.field": "precombine_field", | |
"hoodie.datasource.write.partitionpath.field": "partitionpath_field", | |
"hoodie.datasource.write.operation": "write_operaion", | |
"hoodie.datasource.write.table.type": "table_type", | |
"hoodie.table.name": TABLE, | |
"hoodie.datasource.write.table.name": TABLE, | |
"hoodie.bloom.index.update.partition.path": True, | |
"hoodie.index.type": "GLOBAL_BLOOM", | |
"hoodie.consistency.check.enabled": True, | |
# Set Glue Data Catalog related Hudi configs | |
"hoodie.datasource.hive_sync.enable": True, | |
"hoodie.datasource.hive_sync.use_jdbc": False, | |
"hoodie.datasource.hive_sync.database": SCHEMA, | |
"hoodie.datasource.hive_sync.table": TABLE, | |
} | |
if ( | |
hudi_options.get("hoodie.datasource.write.partitionpath.field") | |
and hudi_options.get("hoodie.datasource.write.partitionpath.field") != "" | |
): | |
hudi_options.setdefault( | |
"hoodie.datasource.write.keygenerator.class", | |
"org.apache.hudi.keygen.ComplexKeyGenerator", | |
) | |
hudi_options.setdefault( | |
"hoodie.datasource.hive_sync.partition_extractor_class", | |
"org.apache.hudi.hive.MultiPartKeysValueExtractor", | |
) | |
hudi_options.setdefault( | |
"hoodie.datasource.hive_sync.partition_fields", | |
hudi_options.get("hoodie.datasource.write.partitionpath.field"), | |
) | |
hudi_options.setdefault("hoodie.datasource.write.hive_style_partitioning", True) | |
else: | |
hudi_options[ | |
"hoodie.datasource.write.keygenerator.class" | |
] = "org.apache.hudi.keygen.NonpartitionedKeyGenerator" | |
hudi_options.setdefault( | |
"hoodie.datasource.hive_sync.partition_extractor_class", | |
"org.apache.hudi.hive.NonPartitionedExtractor", | |
) | |
df.write.format("hudi").options(**hudi_options).mode(mode).save(path) | |
# parse the OGG records and write upserts/deletes to S3 by calling the hudi_write function | |
def write_to_s3(df, path): | |
# select the pertitent fields from the df | |
flattened_df = df.select( | |
"value.*", "key", "partition", "offset", "timestamp", "timestampType" | |
) | |
# filter for only the inserts and updates | |
df_w_upserts = flattened_df.filter('op_type in ("I", "U")').select( | |
"after.*", | |
"key", | |
"partition", | |
"offset", | |
"timestamp", | |
"timestampType", | |
"op_type", | |
"op_ts", | |
"current_ts", | |
"pos", | |
) | |
# filter for only the deletes | |
df_w_deletes = flattened_df.filter('op_type in ("D")').select( | |
"before.*", | |
"key", | |
"partition", | |
"offset", | |
"timestamp", | |
"timestampType", | |
"op_type", | |
"op_ts", | |
"current_ts", | |
"pos", | |
) | |
# invoke hudi_write function for upserts | |
if df_w_upserts and df_w_upserts.count() > 0: | |
hudi_write( | |
df=df_w_upserts, | |
schema="schema_name", | |
table="table_name", | |
path=path, | |
mode="append", | |
hudi_options=hudi_options | |
) | |
# invoke hudi_write function for deletes | |
if df_w_deletes and df_w_deletes.count() > 0: | |
hudi_options_copy = copy.deepcopy(hudi_options) | |
hudi_options_copy["hoodie.datasource.write.operation"] = "delete" | |
hudi_options_copy["hoodie.bloom.index.update.partition.path"] = False | |
hudi_write( | |
df=df_w_deletes, | |
schema="schema_name", | |
table="table_name", | |
path=path, | |
mode="append", | |
hudi_options=hudi_options_copy | |
) | |
TABLE = "table_name" | |
SCHEMA = "schema_name" | |
CHECKPOINT_LOCATION = "s3://bucket/checkpoint_path/" | |
TARGET_PATH="s3://bucket/target_path/" | |
STREAMING = True | |
# instantiate writeStream object | |
query = deserialized_df.writeStream | |
# add attribute to writeStream object for batch writes | |
if not STREAMING: | |
query = query.trigger(once=True) | |
# write to a path using the Hudi format | |
write_to_s3_hudi = query.foreachBatch( | |
lambda batch_df, batch_id: write_to_s3(df=batch_df, path=TARGET_PATH) | |
).start(checkpointLocation=CHECKPOINT_LOCATION) | |
# await termination of the write operation | |
write_to_s3_hudi.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment