Skip to content

Instantly share code, notes, and snippets.

@cpcloud
Last active February 20, 2018 19:04
Show Gist options
  • Save cpcloud/d30d6f8e923b503f522ffc6d80a7b488 to your computer and use it in GitHub Desktop.
Save cpcloud/d30d6f8e923b503f522ffc6d80a7b488 to your computer and use it in GitHub Desktop.
0cp: Arrow + streamz + 0mq
import random
import time
from collections import deque
from threading import Thread
import zmq
import numpy as np
import pandas as pd
import pyarrow as pa
def send_arrow(socket, df, flags=0, copy=True, track=False):
buf = pa.serialize_pandas(df)
return socket.send(buf, flags, copy=copy, track=track)
def recv_arrow(socket, flags=0, copy=True, track=False):
msg = socket.recv(flags=flags, copy=copy, track=track)
df = pa.deserialize_pandas(msg)
return df
def produce(url, ident, generate_data):
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect(url)
while True:
df = generate_data()
send_arrow(s, df)
time.sleep(random.random())
s.close()
def consume(url, source):
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.connect(url)
while True:
df = recv_arrow(s)
source.emit(df)
s.close()
def proxy(in_url, out_url):
"""Simulate a device running in the background"""
ctx = zmq.Context()
in_s = ctx.socket(zmq.PULL)
in_s.bind(in_url)
out_s = ctx.socket(zmq.PUSH)
out_s.bind(out_url)
try:
zmq.proxy(in_s, out_s)
except zmq.ContextTerminated:
in_s.close()
out_s.close()
def main(source, generate_data, producers=2):
in_url = 'tcp://127.0.0.1:5555'
out_url = 'tcp://127.0.0.1:5556'
consumer = Thread(target=consume, args=(out_url, source))
proxy_thread = Thread(target=proxy, args=(in_url, out_url))
producers = [
Thread(target=produce, args=(in_url, i, generate_data))
for i in range(producers)
]
consumer.start()
proxy_thread.start()
for p in producers:
p.start()
consumer.join()
def generate_data():
return pd.DataFrame({
'key': np.random.choice(list('abc'), size=30),
'value': np.random.randn(30)
})
if __name__ == '__main__':
from streamz import Stream
source = Stream()
buffered = (
source.map(lambda df: (df.groupby('key')
.value.sum()
.rename('value_sum').reset_index()))
.filter(lambda df: df.value_sum.gt(0).all())
.buffer(5))
buffered.sink(print)
results = deque(maxlen=5)
buffered.map(lambda df: results.append(df.value_sum.max())).sink(
lambda x: print(np.round(np.array(list(results)), 2)))
main(source, generate_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment