Skip to content

Instantly share code, notes, and snippets.

@nabilm
Created January 23, 2021 03:24
Show Gist options
  • Save nabilm/9acc857ee3e9ec485575efed333d58fd to your computer and use it in GitHub Desktop.
Save nabilm/9acc857ee3e9ec485575efed333d58fd to your computer and use it in GitHub Desktop.
An apache beam custom PTransform that do duplication based on the passed fields
import apache_beam as beam
class Dedupe(beam.PTransform):
"""
A substitute to distinct that accept list of fields
to highlight this distinction
"""
def __init__(self, dedupe_keys=[]):
super(Dedupe, self).__init__()
self._dedupe_keys = dedupe_keys
def _extract_list_item(self, record):
"""
Extract item(s) from list if the list has two records
as a result from the combination then the first one
is choosen by default
TODO: extend this to also accept a method that can be applied to
the data as the aim to generalize the process to be
extract --> transform --> load
or
extract --> sandardize --> correction --> load
"""
if isinstance(record, list) and len(record) > 1:
return record[1]
return record
def _hashify(self, record):
"""
Create a hashed key out of a dict structure
this can be just a lambda function lilke
lambda v: (tuple(v[i] for i in self._dedupe_keys
but this is written to handle if one of the values is
a list. ex:
{"name": "Ford", "models": ["Fiesta", "Focus", "Mustang"]}
"""
hash_keys = []
for key, value in record.items():
if key in self._dedupe_keys:
if isinstance(value, list):
value = tuple(value)
hash_keys.append(value)
return (tuple(hash_keys), record)
def expand(self, pcoll):
return (
pcoll
| "ToPairs" >> beam.Map(self._hashify)
| "Group" >> beam.CombinePerKey(lambda vs: vs)
| "GetValues" >> beam.Values()
| "Extract" >> beam.Map(lambda record: record[0].__dict__["iterables"][0])
| "Flatten" >> beam.FlatMap(lambda elements: elements)
| "Flattened" >> beam.Map(self._extract_list_item)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment