Created
April 21, 2020 18:15
-
-
Save chinmaychandak/a9647eb2794e733b3ee2e0c32075a98e to your computer and use it in GitHub Desktop.
custreamz-word-count
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# cudf and streamz imports\n", | |
"import cudf\n", | |
"from streamz import Stream\n", | |
"from streamz.dataframe import DataFrame" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"'''\n", | |
"This is a helper function to do some data pre-processing.\n", | |
"This also prints out the word count for each batch.\n", | |
"'''\n", | |
"def process_batch(messages):\n", | |
" words = []\n", | |
" \n", | |
" # Gather all words\n", | |
" for message in messages:\n", | |
" words = words + message.split(\" \")\n", | |
" \n", | |
" # Create a cudf dataframe\n", | |
" batch_df = cudf.DataFrame({'word': words, 'count': [1]*len(words)})\n", | |
" \n", | |
" # [Optional] Show local (stateless) word count for this batch \n", | |
" local_word_count = batch_df.groupby('word').sum()\n", | |
" print(\"\\nWord Count for this batch:\")\n", | |
" print(local_word_count)\n", | |
" \n", | |
" return batch_df" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Starting a stream \n", | |
"source = Stream()\n", | |
"\n", | |
"# Preprocess each batch\n", | |
"stream_df = source.map(process_batch)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Create a streamz dataframe to get stateful word count\n", | |
"sdf = DataFrame(stream_df, example=cudf.DataFrame({'word':[], 'count':[]}))\n", | |
"\n", | |
"# Formatting the print statements\n", | |
"def print_format(sdf):\n", | |
" print(\"\\nGlobal Word Count:\")\n", | |
" return sdf\n", | |
"\n", | |
"# Print cumulative word count from the start of the stream, after every batch. \n", | |
"# One can also sink the output to a list.\n", | |
"sdf.groupby('word').sum().stream.gather().map(print_format).sink(print)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\n", | |
"Word Count for this batch:\n", | |
" count\n", | |
"word \n", | |
"1 1\n", | |
"2 1\n", | |
"is 2\n", | |
"line 2\n", | |
"this 2\n", | |
"\n", | |
"Global Word Count:\n", | |
" count\n", | |
"word \n", | |
"1 1\n", | |
"2 1\n", | |
"is 2\n", | |
"line 2\n", | |
"this 2\n" | |
] | |
} | |
], | |
"source": [ | |
"# Emitting a batch of elements into the stream\n", | |
"source.emit([\"this is line 1\", \"this is line 2\"])" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\n", | |
"Word Count for this batch:\n", | |
" count\n", | |
"word \n", | |
"3 1\n", | |
"4 1\n", | |
"is 2\n", | |
"line 2\n", | |
"this 2\n", | |
"\n", | |
"Global Word Count:\n", | |
" count\n", | |
"word \n", | |
"1 1\n", | |
"2 1\n", | |
"3 1\n", | |
"4 1\n", | |
"is 4\n", | |
"line 4\n", | |
"this 4\n" | |
] | |
} | |
], | |
"source": [ | |
"# Emitting another batch of elements into the stream\n", | |
"source.emit([\"this is line 3\", \"this is line 4\"])" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python (kc3)", | |
"language": "python", | |
"name": "kc3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.6" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment