Created
April 5, 2017 12:15
-
-
Save pohmelie/a7f53e1e64fe4906f0b9ed6d7e8a4469 to your computer and use it in GitHub Desktop.
aiozmq XPUB/XSUB example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
xpub connection made | |
xsub connection made | |
xsub binding tcp://127.0.0.1:45098 | |
pub connection made | |
xpub binding tcp://127.0.0.1:34462 | |
sub connection made | |
xpub message received [b'\x01test'] | |
xpub writing [b'\x01test'] to xsub | |
xsub message received [b'test', b'0'] | |
xsub writing [b'test', b'0'] to xpub | |
sub message received [b'test', b'0'] | |
xsub message received [b'test', b'1'] | |
xsub writing [b'test', b'1'] to xpub | |
sub message received [b'test', b'1'] | |
xsub message received [b'test', b'2'] | |
xsub writing [b'test', b'2'] to xpub | |
sub message received [b'test', b'2'] | |
xpub message received [b'\x00test'] | |
xpub writing [b'\x00test'] to xsub |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiozmq | |
import zmq | |
class Protocol(aiozmq.ZmqProtocol): | |
def __init__(self, name): | |
self.name = name | |
self.sink = None | |
def connection_made(self, transport): | |
print(self.name, "connection made") | |
self.transport = transport | |
def connection_lost(self, exc): | |
print(self.name, "connection lost") | |
self.transport = None | |
def msg_received(self, data): | |
print(self.name, "message received", data) | |
if self.sink: | |
print(self.name, "writing", data, "to", self.sink.name) | |
self.sink.transport.write(data) | |
@property | |
def binding(self): | |
binding = next(iter(self.transport.bindings())) | |
print(self.name, "binding", binding) | |
return binding | |
async def test_aiozmq(): | |
xpub, xpub_p = await aiozmq.create_zmq_connection( | |
lambda: Protocol("xpub"), | |
zmq.XPUB, | |
bind="tcp://127.0.0.1:*", | |
) | |
xsub, xsub_p = await aiozmq.create_zmq_connection( | |
lambda: Protocol("xsub"), | |
zmq.XSUB, | |
bind="tcp://127.0.0.1:*", | |
) | |
xpub_p.sink, xsub_p.sink = xsub_p, xpub_p | |
pub, pub_p = await aiozmq.create_zmq_connection( | |
lambda: Protocol("pub"), | |
zmq.PUB, | |
connect=xsub_p.binding, | |
) | |
sub, sub_p = await aiozmq.create_zmq_connection( | |
lambda: Protocol("sub"), | |
zmq.SUB, | |
connect=xpub_p.binding, | |
) | |
sub.subscribe(b"test") | |
await asyncio.sleep(1) | |
for i in range(3): | |
pub.write([b"test", str(i).encode()]) | |
pub.write([b"foo", b"123"]) | |
await asyncio.sleep(1) | |
sub.unsubscribe(b"test") | |
await asyncio.sleep(1) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(test_aiozmq()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment