Created
March 4, 2022 22:26
-
-
Save davidselassie/e2e58497e8e6485ffcda1cbe85d9dad1 to your computer and use it in GitHub Desktop.
Wikistream example that returns a running top 10
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import json | |
import operator | |
from datetime import timedelta | |
import sseclient | |
import urllib3 | |
from bytewax import Dataflow, inputs, parse, spawn_cluster | |
def open_stream(): | |
pool = urllib3.PoolManager() | |
resp = pool.request( | |
"GET", | |
"https://stream.wikimedia.org/v2/stream/recentchange/", | |
preload_content=False, | |
headers={"Accept": "text/event-stream"}, | |
) | |
client = sseclient.SSEClient(resp) | |
for event in client.events(): | |
yield event.data | |
def input_builder(worker_index, worker_count): | |
if worker_index == 0: | |
return inputs.tumbling_epoch(open_stream(), timedelta(seconds=2)) | |
else: | |
return [] | |
def output_builder(worker_index, worker_count): | |
return print | |
def initial_count(data_dict): | |
return "all", {data_dict["server_name"]: 1} | |
def addi_counts(lhs, rhs): | |
"""Adds two dictionaries of counts in place. | |
>>> x = {"a": 1} | |
>>> y = {"a": 2, "b": 1} | |
>>> addi_counts(x, y) | |
>>> x | |
{'a': 3, 'b': 1} | |
""" | |
for k, v in rhs.items(): | |
if k not in lhs: | |
lhs[k] = 0 | |
lhs[k] += v | |
def sum_counts(this_epoch_server_to_counts, server_to_count): | |
addi_counts(this_epoch_server_to_counts, server_to_count) | |
return this_epoch_server_to_counts | |
def calc_top_n(n): | |
def calc_top(running_server_to_counts, this_epoch_server_to_counts): | |
addi_counts(running_server_to_counts, this_epoch_server_to_counts) | |
top_servers = sorted(running_server_to_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:n] | |
return running_server_to_counts, top_servers | |
return calc_top | |
def flatten(all_top): | |
_, top = all_top | |
return top | |
flow = Dataflow() | |
# "event_json" | |
flow.map(json.loads) | |
# {"server_name": "server.name", ...} | |
flow.map(initial_count) | |
# ("all", {"server_name": 1}) | |
flow.reduce_epoch(sum_counts) | |
# ("all", {"server_name": count, ...}) | |
flow.stateful_map(dict, calc_top_n(10)) | |
# ("all", [("top_server", count), ...]) | |
flow.map(flatten) | |
# [("top_server", count), ...] | |
flow.capture() | |
if __name__ == "__main__": | |
spawn_cluster(flow, input_builder, output_builder, **parse.cluster_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment