Skip to content

Instantly share code, notes, and snippets.

@x
Created June 19, 2019 19:10
Show Gist options
  • Save x/2231910bd22ccd2abdb9e09470fc78c2 to your computer and use it in GitHub Desktop.
Save x/2231910bd22ccd2abdb9e09470fc78c2 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from past.builtins import unicode
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input", help="Input file to process."
)
parser.add_value_provider_argument("--output", help="Output file to write results to.")
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
super(WordExtractingDoFn, self).__init__()
self.words_counter = Metrics.counter(self.__class__, "words")
self.word_lengths_counter = Metrics.counter(self.__class__, "word_lengths")
self.word_lengths_dist = Metrics.distribution(self.__class__, "word_len_dist")
self.empty_line_counter = Metrics.counter(self.__class__, "empty_lines")
def process(self, element):
"""
Returns an iterator over the words of this element. The element is a line of text. If the
line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
words = re.findall(r"[\w\']+", text_line, re.UNICODE)
for w in words:
self.words_counter.inc()
self.word_lengths_counter.inc(len(w))
self.word_lengths_dist.update(len(w))
return words
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
def format_result(word_count):
(word, count) = word_count
return "%s: %d" % (word, count)
def main():
pipeline_options = PipelineOptions()
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options.view_as(SetupOptions).save_main_session = True
# user_options allows us to specify the runtime configurable options for the template
# https://cloud.google.com/dataflow/docs/templates/creating-templates
user_options = pipeline_options.view_as(UserOptions)
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
lines = p | "read" >> ReadFromText(user_options.input)
# Create the counts
counts = (
lines
| "split" >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
| "pair_with_one" >> beam.Map(lambda x: (x, 1))
| "group" >> beam.GroupByKey()
| "count" >> beam.Map(count_ones)
)
# Format the counts
output = counts | "format" >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | "write" >> WriteToText(user_options.output)
# Run and wait until finish
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment