Created
April 24, 2020 17:54
-
-
Save chinmaychandak/3137d8825d2f34ed7abdf72068bf78a7 to your computer and use it in GitHub Desktop.
custreamz-CCU-state-management
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# cudf and streamz imports\n", | |
"import cudf\n", | |
"from streamz import Stream\n", | |
"from streamz.dataframe import DataFrame" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"'''\n", | |
"This function filters data for relevant events. It also creates a new column based on event start/end value.\n", | |
"An event marked with \"Started\" indicates that the CCUs for that particular Zone/Application/UserPriority has increased.\n", | |
"Similarly, an event marked with \"Ended\" indicates that the CCUs for that particular Zone/Application/UserPriority has decreased.\n", | |
"'''\n", | |
"def filter_by_event(df):\n", | |
" mask = (df[\"Event\"] == \"Started\") | (df[\"Event\"] == \"Ended\")\n", | |
" df = df.loc[mask]\n", | |
" df[\"CCU\"] = 0\n", | |
" ccu_inc_mask = (df[\"Event\"] == \"Started\")\n", | |
" ccu_dec_mask = (df[\"Event\"] == \"Ended\")\n", | |
" df[\"CCU\"].loc[ccu_inc_mask] = 1\n", | |
" df[\"CCU\"].loc[ccu_dec_mask] = -1\n", | |
" return df\n", | |
"\n", | |
"'''\n", | |
"This function calculates the CCU results for this batch only.\n", | |
"''' \n", | |
"def ccu_include_aggregations(df):\n", | |
" groups = df.groupby([\"ZoneName\", \"ApplicationName\", \"UserPriority\"]).CCU.sum()\n", | |
" return groups.reset_index()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Create a cu-stream\n", | |
"stream = Stream()\n", | |
"\n", | |
"# Preprocess batch of data, and calculate local CCUs\n", | |
"source = stream.map(filter_by_event).map(ccu_include_aggregations)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"example = cudf.DataFrame({'ZoneName': [], 'ApplicationName':[], 'UserPriority':[], 'CCU':[]})\n", | |
"\n", | |
"# Create a Streamz Dataframe to maintain CCU state from the start of the stream\n", | |
"sdf = DataFrame(source, example=example)\n", | |
"\n", | |
"# Calculate global (from time t=0) CCUs \n", | |
"sdf.groupby([\"ZoneName\", \"ApplicationName\", \"UserPriority\"]).CCU.sum().stream.gather().sink(print)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"ZoneName ApplicationName UserPriority\n", | |
"z1 AnotherApp High 1\n", | |
" SomeApp High 1\n", | |
"z2 AnotherApp High 1\n", | |
" SomeApp High 1\n", | |
"z3 RandomApp High 1\n", | |
" SomeApp High 1\n", | |
"Name: CCU, dtype: int64\n" | |
] | |
} | |
], | |
"source": [ | |
"df1 = cudf.DataFrame({'UserId': [\"Alice\", \"John\", \"Joe\", \"Tim\", \"Newton\", \"Rando\"],\n", | |
" 'ZoneName': [\"z1\", \"z1\", \"z2\", \"z2\", \"z3\", \"z3\"],\n", | |
" 'ApplicationName':[\"SomeApp\", \"AnotherApp\", \"SomeApp\", \"AnotherApp\", \"SomeApp\", \"RandomApp\"],\n", | |
" 'Event':[\"Started\",\"Started\", \"Started\",\"Started\",\"Started\", \"Started\"],\n", | |
" 'UserPriority':[\"High\",\"High\",\"High\",\"High\",\"High\",\"High\"]})\n", | |
"\n", | |
"# Emitting first batch of data\n", | |
"stream.emit(df1)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"ZoneName ApplicationName UserPriority\n", | |
"z1 AnotherApp High 0\n", | |
" SomeApp High 1\n", | |
"z2 AnotherApp High 1\n", | |
" SomeApp High 1\n", | |
"z3 RandomApp High 1\n", | |
" SomeApp High 0\n", | |
"Name: CCU, dtype: int64\n" | |
] | |
} | |
], | |
"source": [ | |
"df2 = cudf.DataFrame({'UserId': [\"Alice\", \"John\", \"Joe\", \"Tim\", \"Newton\", \"Rando\"],\n", | |
" 'ZoneName': [\"z1\", \"z1\", \"z2\", \"z2\", \"z3\", \"z3\"],\n", | |
" 'ApplicationName':[\"SomeApp\", \"AnotherApp\", \"SomeApp\", \"AnotherApp\", \"SomeApp\", \"RandomApp\"],\n", | |
" 'Event':[\"Processing\",\"Ended\", \"Processing\",\"Processing\",\"Ended\", \"Processing\"],\n", | |
" 'UserPriority':[\"High\",\"High\",\"High\",\"High\",\"High\",\"High\"]})\n", | |
"\n", | |
"# Emitting second batch of data\n", | |
"stream.emit(df2)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"ZoneName ApplicationName UserPriority\n", | |
"z1 AnotherApp High 1\n", | |
" SomeApp High 0\n", | |
"z2 AnotherApp High 0\n", | |
" SomeApp High 1\n", | |
"z3 AnotherApp Low 1\n", | |
" RandomApp High 2\n", | |
" Low 1\n", | |
" SomeApp High 0\n", | |
"Name: CCU, dtype: int64\n" | |
] | |
} | |
], | |
"source": [ | |
"df3 = cudf.DataFrame({'UserId': [\"Alice\", \"Jim\", \"Joe\", \"Tim\", \"Einstein\", \"Fluff\", \"Snuff\"],\n", | |
" 'ZoneName': [\"z1\", \"z1\", \"z2\", \"z2\", \"z3\", \"z3\", \"z3\"],\n", | |
" 'ApplicationName':[\"SomeApp\", \"AnotherApp\", \"SomeApp\", \"AnotherApp\", \"AnotherApp\", \"RandomApp\", \"RandomApp\"],\n", | |
" 'Event':[\"Ended\",\"Started\", \"Processing\",\"Ended\",\"Started\", \"Started\", \"Started\"],\n", | |
" 'UserPriority':[\"High\",\"High\",\"High\",\"High\",\"Low\",\"Low\", \"High\"]})\n", | |
"\n", | |
"# Emitting third batch of data\n", | |
"stream.emit(df3)" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python (kc3)", | |
"language": "python", | |
"name": "kc3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.6" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment