-
-
Save tacaswell/82fc49b682b90327813fd14dc960369d to your computer and use it in GitHub Desktop.
Streaming Arrow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import time | |
from collections import deque | |
from threading import Thread | |
import zmq | |
import numpy as np | |
import pandas as pd | |
import pyarrow as pa | |
def send_arrow(socket, df, flags=0, copy=True, track=False): | |
buf = pa.serialize_pandas(df) | |
return socket.send(buf, flags, copy=copy, track=track) | |
def recv_arrow(socket, flags=0, copy=True, track=False): | |
msg = socket.recv(flags=flags, copy=copy, track=track) | |
df = pa.deserialize_pandas(msg) | |
return df | |
def produce(url, ident, generate_data): | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PUSH) | |
s.connect(url) | |
while True: | |
df = generate_data() | |
send_arrow(s, df) | |
time.sleep(random.random()) | |
s.close() | |
def consume(url, source): | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PULL) | |
s.connect(url) | |
while True: | |
df = recv_arrow(s) | |
source.emit(df) | |
s.close() | |
def proxy(in_url, out_url): | |
"""Simulate a device running in the background""" | |
ctx = zmq.Context() | |
in_s = ctx.socket(zmq.PULL) | |
in_s.bind(in_url) | |
out_s = ctx.socket(zmq.PUSH) | |
out_s.bind(out_url) | |
try: | |
zmq.proxy(in_s, out_s) | |
except zmq.ContextTerminated: | |
in_s.close() | |
out_s.close() | |
def main(source, generate_data, producers=2): | |
in_url = 'ipc:///tmp/data/0' | |
out_url = 'ipc:///tmp/data/1' | |
consumer = Thread(target=consume, args=(out_url, source)) | |
proxy_thread = Thread(target=proxy, args=(in_url, out_url)) | |
producers = [ | |
Thread(target=produce, args=(in_url, i, generate_data)) | |
for i in range(producers) | |
] | |
consumer.start() | |
proxy_thread.start() | |
for p in producers: | |
p.start() | |
consumer.join() | |
def generate_data(): | |
n = int(3e6) | |
return pd.DataFrame({ | |
'key': np.random.choice(list('abc'), size=n), | |
'value': np.random.randn(n) | |
}) | |
if __name__ == '__main__': | |
from streamz import Stream | |
source = Stream() | |
buffered = ( | |
source.map(lambda df: (df.groupby('key') | |
.value.sum() | |
.rename('value_sum').reset_index())) | |
.filter(lambda df: df.value_sum.gt(0).all()) | |
.buffer(5)) | |
buffered.sink(print) | |
results = deque(maxlen=5) | |
buffered.map(lambda df: results.append(df.value_sum.max())).sink( | |
lambda x: print(np.round(np.array(list(results)), 2))) | |
main(source, generate_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment