Created
January 23, 2021 03:24
-
-
Save nabilm/9acc857ee3e9ec485575efed333d58fd to your computer and use it in GitHub Desktop.
An apache beam custom PTransform that do duplication based on the passed fields
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import apache_beam as beam | |
class Dedupe(beam.PTransform): | |
""" | |
A substitute to distinct that accept list of fields | |
to highlight this distinction | |
""" | |
def __init__(self, dedupe_keys=[]): | |
super(Dedupe, self).__init__() | |
self._dedupe_keys = dedupe_keys | |
def _extract_list_item(self, record): | |
""" | |
Extract item(s) from list if the list has two records | |
as a result from the combination then the first one | |
is choosen by default | |
TODO: extend this to also accept a method that can be applied to | |
the data as the aim to generalize the process to be | |
extract --> transform --> load | |
or | |
extract --> sandardize --> correction --> load | |
""" | |
if isinstance(record, list) and len(record) > 1: | |
return record[1] | |
return record | |
def _hashify(self, record): | |
""" | |
Create a hashed key out of a dict structure | |
this can be just a lambda function lilke | |
lambda v: (tuple(v[i] for i in self._dedupe_keys | |
but this is written to handle if one of the values is | |
a list. ex: | |
{"name": "Ford", "models": ["Fiesta", "Focus", "Mustang"]} | |
""" | |
hash_keys = [] | |
for key, value in record.items(): | |
if key in self._dedupe_keys: | |
if isinstance(value, list): | |
value = tuple(value) | |
hash_keys.append(value) | |
return (tuple(hash_keys), record) | |
def expand(self, pcoll): | |
return ( | |
pcoll | |
| "ToPairs" >> beam.Map(self._hashify) | |
| "Group" >> beam.CombinePerKey(lambda vs: vs) | |
| "GetValues" >> beam.Values() | |
| "Extract" >> beam.Map(lambda record: record[0].__dict__["iterables"][0]) | |
| "Flatten" >> beam.FlatMap(lambda elements: elements) | |
| "Flattened" >> beam.Map(self._extract_list_item) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment