Skip to content

Instantly share code, notes, and snippets.

@davidselassie
Created March 4, 2022 22:26
Show Gist options
  • Save davidselassie/e2e58497e8e6485ffcda1cbe85d9dad1 to your computer and use it in GitHub Desktop.
Save davidselassie/e2e58497e8e6485ffcda1cbe85d9dad1 to your computer and use it in GitHub Desktop.
Wikistream example that returns a running top 10
import collections
import json
import operator
from datetime import timedelta
import sseclient
import urllib3
from bytewax import Dataflow, inputs, parse, spawn_cluster
def open_stream():
pool = urllib3.PoolManager()
resp = pool.request(
"GET",
"https://stream.wikimedia.org/v2/stream/recentchange/",
preload_content=False,
headers={"Accept": "text/event-stream"},
)
client = sseclient.SSEClient(resp)
for event in client.events():
yield event.data
def input_builder(worker_index, worker_count):
if worker_index == 0:
return inputs.tumbling_epoch(open_stream(), timedelta(seconds=2))
else:
return []
def output_builder(worker_index, worker_count):
return print
def initial_count(data_dict):
return "all", {data_dict["server_name"]: 1}
def addi_counts(lhs, rhs):
"""Adds two dictionaries of counts in place.
>>> x = {"a": 1}
>>> y = {"a": 2, "b": 1}
>>> addi_counts(x, y)
>>> x
{'a': 3, 'b': 1}
"""
for k, v in rhs.items():
if k not in lhs:
lhs[k] = 0
lhs[k] += v
def sum_counts(this_epoch_server_to_counts, server_to_count):
addi_counts(this_epoch_server_to_counts, server_to_count)
return this_epoch_server_to_counts
def calc_top_n(n):
def calc_top(running_server_to_counts, this_epoch_server_to_counts):
addi_counts(running_server_to_counts, this_epoch_server_to_counts)
top_servers = sorted(running_server_to_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:n]
return running_server_to_counts, top_servers
return calc_top
def flatten(all_top):
_, top = all_top
return top
flow = Dataflow()
# "event_json"
flow.map(json.loads)
# {"server_name": "server.name", ...}
flow.map(initial_count)
# ("all", {"server_name": 1})
flow.reduce_epoch(sum_counts)
# ("all", {"server_name": count, ...})
flow.stateful_map(dict, calc_top_n(10))
# ("all", [("top_server", count), ...])
flow.map(flatten)
# [("top_server", count), ...]
flow.capture()
if __name__ == "__main__":
spawn_cluster(flow, input_builder, output_builder, **parse.cluster_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment