Skip to content

Instantly share code, notes, and snippets.

@a-mpch
Last active September 15, 2020 22:19
Show Gist options
  • Save a-mpch/38963528656d5efbde062e924a7bb855 to your computer and use it in GitHub Desktop.
Save a-mpch/38963528656d5efbde062e924a7bb855 to your computer and use it in GitHub Desktop.
from apache_beam.pvalue import TaggedOutput
class SplitByRoute(beam.DoFn):
OUTPUT_PARROT = "PARROT"
OUTPUT_DOGGO = "DOGGO"
OUTPUT_CAT = "CAT"
OUTPUT_HORSE = "HORSE"
def __init__(self):
beam.DoFn.__init__(self)
def process(self, elem):
pass
# Example of a return value yield TaggedOutput(self.OUTPUT_PARROT, {**elem})
router = (
entry_events
| "Flatten lists" >> beam.FlatMap(lambda elements: elements)
| "FileRouter"
>> beam.ParDo(SplitByRoute()).with_outputs(
SplitByRoute.PARROT,
SplitByRoute.DOGGO,
SplitByRoute.CAT,
SplitByRoute.HORSE,
)
)
_ = file_router[SplitByRoute.PARROT] | SendToPubsub(
file_processing_options.output_topic
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment