Created
June 19, 2019 19:10
-
-
Save x/2231910bd22ccd2abdb9e09470fc78c2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import | |
import argparse | |
import logging | |
import re | |
import apache_beam as beam | |
from apache_beam.io import ReadFromText, WriteToText | |
from apache_beam.metrics import Metrics | |
from apache_beam.metrics.metric import MetricsFilter | |
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions | |
from past.builtins import unicode | |
class UserOptions(PipelineOptions): | |
@classmethod | |
def _add_argparse_args(cls, parser): | |
parser.add_value_provider_argument( | |
"--input", help="Input file to process." | |
) | |
parser.add_value_provider_argument("--output", help="Output file to write results to.") | |
class WordExtractingDoFn(beam.DoFn): | |
"""Parse each line of input text into words.""" | |
def __init__(self): | |
super(WordExtractingDoFn, self).__init__() | |
self.words_counter = Metrics.counter(self.__class__, "words") | |
self.word_lengths_counter = Metrics.counter(self.__class__, "word_lengths") | |
self.word_lengths_dist = Metrics.distribution(self.__class__, "word_len_dist") | |
self.empty_line_counter = Metrics.counter(self.__class__, "empty_lines") | |
def process(self, element): | |
""" | |
Returns an iterator over the words of this element. The element is a line of text. If the | |
line is blank, note that, too. | |
Args: | |
element: the element being processed | |
Returns: | |
The processed element. | |
""" | |
text_line = element.strip() | |
if not text_line: | |
self.empty_line_counter.inc(1) | |
words = re.findall(r"[\w\']+", text_line, re.UNICODE) | |
for w in words: | |
self.words_counter.inc() | |
self.word_lengths_counter.inc(len(w)) | |
self.word_lengths_dist.update(len(w)) | |
return words | |
def count_ones(word_ones): | |
(word, ones) = word_ones | |
return (word, sum(ones)) | |
def format_result(word_count): | |
(word, count) = word_count | |
return "%s: %d" % (word, count) | |
def main(): | |
pipeline_options = PipelineOptions() | |
# We use the save_main_session option because one or more DoFn's in this | |
# workflow rely on global context (e.g., a module imported at module level). | |
pipeline_options.view_as(SetupOptions).save_main_session = True | |
# user_options allows us to specify the runtime configurable options for the template | |
# https://cloud.google.com/dataflow/docs/templates/creating-templates | |
user_options = pipeline_options.view_as(UserOptions) | |
p = beam.Pipeline(options=pipeline_options) | |
# Read the text file[pattern] into a PCollection. | |
lines = p | "read" >> ReadFromText(user_options.input) | |
# Create the counts | |
counts = ( | |
lines | |
| "split" >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)) | |
| "pair_with_one" >> beam.Map(lambda x: (x, 1)) | |
| "group" >> beam.GroupByKey() | |
| "count" >> beam.Map(count_ones) | |
) | |
# Format the counts | |
output = counts | "format" >> beam.Map(format_result) | |
# Write the output using a "Write" transform that has side effects. | |
# pylint: disable=expression-not-assigned | |
output | "write" >> WriteToText(user_options.output) | |
# Run and wait until finish | |
result = p.run() | |
result.wait_until_finish() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment