Skip to content

Instantly share code, notes, and snippets.

@xennygrimmato
Last active March 30, 2019 07:20
Show Gist options
  • Save xennygrimmato/6727bd1848f9c2343468ae5427294be5 to your computer and use it in GitHub Desktop.
Save xennygrimmato/6727bd1848f9c2343468ae5427294be5 to your computer and use it in GitHub Desktop.
Trying to convert a list of sentences containing space-separated words to a list of list of words
from __future__ import absolute_import
from __future__ import print_function
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class TestTokenizerDoFn(beam.DoFn):
def process(self, element):
words = element.split()
return words
class ConvertFn(beam.DoFn):
def process(self, elem):
yield [[i] for i in elem]
def test():
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | 'Create' >> beam.Create(['Exception: Index out of bounds',
'ValueError: int cannot be typecasted to str',
'KeyError: Key 1 does not exist in dictionary'])
counts = (
lines
| 'Split' >> (beam.ParDo(TestTokenizerDoFn()))
| 'Process' >> beam.ParDo(ConvertFn())
)
# Expected output: [['a', 'b', 'c'], ['d', 'e'], ['a', 'e', 'f']]
counts | 'Print' >> beam.ParDo(lambda x: print(x))
# Usage: python beam_pipeline.py
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment