Skip to content

Instantly share code, notes, and snippets.

@dat-boris
Last active February 1, 2017 02:10
Show Gist options
  • Select an option

  • Save dat-boris/cab29b158297cc491e6afebac9f5eb36 to your computer and use it in GitHub Desktop.

Select an option

Save dat-boris/cab29b158297cc491e6afebac9f5eb36 to your computer and use it in GitHub Desktop.
import re
from collections import Counter
RE_CHAR = re.compile('\w')
def functional_counts(stream):
"""
How would we scale and distribute a wordcount operation
"""
datapipe = pipeline.Pipeline([
emit_words,
filter_empty_word,
count_words
])
return datapipe.apply(iter(lambda: stream.read(1), ''))
assert emit_words("Hello, world🌏")
== ["Hello", "world"]
assert emit_words("世界你好")
== ["世界", "你好"]
# This test will fail in our simple example, but use to make a point in the presentation
def emit_words(stream):
"""
Return a generator of words
"""
word = ''
for char in stream:
if char is None:
break
if not RE_CHAR.match(char):
yield word
word = ''
else:
word += char
yield word
def filter_empty_word(word_stream):
"""
Filter out empty words
- principle one, edge case should be handled by separate function
- Note: can be replace by ifilter
"""
for word in word_stream:
if word:
yield word
def count_words(word_stream):
"""
Filter out empty words
"""
counter = Counter()
for word in word_stream:
counter[word] += 1
return dict(counter)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment