Skip to content

Instantly share code, notes, and snippets.

@chinmaychandak
Created April 21, 2020 18:15
Show Gist options
  • Save chinmaychandak/a9647eb2794e733b3ee2e0c32075a98e to your computer and use it in GitHub Desktop.
Save chinmaychandak/a9647eb2794e733b3ee2e0c32075a98e to your computer and use it in GitHub Desktop.
custreamz-word-count
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# cudf and streamz imports\n",
"import cudf\n",
"from streamz import Stream\n",
"from streamz.dataframe import DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"This is a helper function to do some data pre-processing.\n",
"This also prints out the word count for each batch.\n",
"'''\n",
"def process_batch(messages):\n",
" words = []\n",
" \n",
" # Gather all words\n",
" for message in messages:\n",
" words = words + message.split(\" \")\n",
" \n",
" # Create a cudf dataframe\n",
" batch_df = cudf.DataFrame({'word': words, 'count': [1]*len(words)})\n",
" \n",
" # [Optional] Show local (stateless) word count for this batch \n",
" local_word_count = batch_df.groupby('word').sum()\n",
" print(\"\\nWord Count for this batch:\")\n",
" print(local_word_count)\n",
" \n",
" return batch_df"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# Starting a stream \n",
"source = Stream()\n",
"\n",
"# Preprocess each batch\n",
"stream_df = source.map(process_batch)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# Create a streamz dataframe to get stateful word count\n",
"sdf = DataFrame(stream_df, example=cudf.DataFrame({'word':[], 'count':[]}))\n",
"\n",
"# Formatting the print statements\n",
"def print_format(sdf):\n",
" print(\"\\nGlobal Word Count:\")\n",
" return sdf\n",
"\n",
"# Print cumulative word count from the start of the stream, after every batch. \n",
"# One can also sink the output to a list.\n",
"sdf.groupby('word').sum().stream.gather().map(print_format).sink(print)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Word Count for this batch:\n",
" count\n",
"word \n",
"1 1\n",
"2 1\n",
"is 2\n",
"line 2\n",
"this 2\n",
"\n",
"Global Word Count:\n",
" count\n",
"word \n",
"1 1\n",
"2 1\n",
"is 2\n",
"line 2\n",
"this 2\n"
]
}
],
"source": [
"# Emitting a batch of elements into the stream\n",
"source.emit([\"this is line 1\", \"this is line 2\"])"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Word Count for this batch:\n",
" count\n",
"word \n",
"3 1\n",
"4 1\n",
"is 2\n",
"line 2\n",
"this 2\n",
"\n",
"Global Word Count:\n",
" count\n",
"word \n",
"1 1\n",
"2 1\n",
"3 1\n",
"4 1\n",
"is 4\n",
"line 4\n",
"this 4\n"
]
}
],
"source": [
"# Emitting another batch of elements into the stream\n",
"source.emit([\"this is line 3\", \"this is line 4\"])"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python (kc3)",
"language": "python",
"name": "kc3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment