Created
December 5, 2019 21:37
-
-
Save njsmith/05c61f6e06ca6a23bef732fbf5e832e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
import trio.testing | |
# A simple protocol where messages are single bytes b"a", b"b", b"c", etc., | |
# and each one is acknowledged by echoing it back uppercased. So send b"a", | |
# get b"A", etc. | |
# To make it easier to shut down, message b"z" causes the receiver to exit. | |
async def receiver(stream): | |
# The receiver is a simple loop that reads messages from the stream, and | |
# acknowledges each one before reading the next. (Of course in a real | |
# program it would probably also do something with each message before | |
# acknowledging it, but this is just an example.) | |
while True: | |
print("receiver: waiting for message to arrive") | |
# Get message | |
message = await stream.receive_some(1) | |
# Acknowledge message | |
reply = message.upper() | |
print(f"receiver: got {message}, sending {reply}") | |
await stream.send_all(reply) | |
if message == b"z": | |
print("receiver: exiting") | |
return | |
# Now we want to send a whole batch of messages at once, and wait for the | |
# replies to all of them. There are a few different ways to do this. | |
# The simplest way is to send the first message, wait for the reply, send the | |
# next message, wait for the reply, etc. This strategy is simple, and probably | |
# the best place to start. You can wrap the basic send/receive flow into a | |
# single function that sends a message and then waits for the reply and | |
# returns it. | |
async def send_message_batch_0(stream, messages): | |
for message in messages: | |
print(f"send_message_batch_0: sending {message}") | |
await stream.send_all(message) | |
print(f"send_message_batch_0: waiting for reply") | |
reply = await stream.receive_some(1) | |
print(f"send_message_batch_0: got reply {reply}") | |
# Another way to do it is to send all the messages in one big batch, and | |
# *then* wait for all the replies. Sending multiple messages without waiting | |
# for the replies is called "pipelining", and it can speed things up if you | |
# have a high-latency link, because it reduces the number of round trips. | |
# However, this implementation has a subtle bug! It can deadlock if the stream | |
# runs out of buffer space. This can lead to frustrating intermittent | |
# problems, because whether you hit it or not depends on the size of the | |
# batch, the size of the messages, how much buffer the OS decides to allocate | |
# to this socket connection, the state of intermediate network routers, the | |
# phase of the moon, etc. | |
async def send_message_batch_1(stream, messages): # This one is buggy! | |
for message in messages: | |
print(f"send_message_batch_1: sending {message}") | |
await stream.send_all(message) | |
for i in range(len(messages)): | |
print(f"send_message_batch_1: waiting for reply") | |
reply = await stream.receive_some(1) | |
print(f"send_message_batch_1: got reply {reply}") | |
# Here's another way to do pipelining: it's the exact same code as the | |
# function above, but now instead of running the two loops sequentially, we | |
# run them concurrently in two different tasks. This fixes the bug. | |
async def send_message_batch_2(stream, messages): | |
async def send_all(): | |
for message in messages: | |
print(f"send_message_batch_2: sending {message}") | |
await stream.send_all(message) | |
async def wait_for_all_replies(): | |
for i in range(len(messages)): | |
print(f"send_message_batch_2: waiting for reply") | |
reply = await stream.receive_some(1) | |
print(f"send_message_batch_2: got reply {reply}") | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(send_all) | |
nursery.start_soon(wait_for_all_replies) | |
# Trio's `LockstepStream` class is useful to help test for these kinds of | |
# subtle flow-control based deadlocks. It simulates the worst-case network | |
# connection: one that has no buffering whatsoever. So now the bug is easy to | |
# catch, without having to mock the phase of the moon. | |
async def test_send_message_batch_impl(send_message_batch_impl): | |
stream1, stream2 = trio.testing.lockstep_stream_pair() | |
async with trio.open_nursery() as nursery: | |
# One task pretends to be the server, running the receiver loop | |
nursery.start_soon(receiver, stream1) | |
# The other task pretends to be the client, sending a batch of | |
# messages. The last message is b"z", so the receiver will exit | |
# cleanly. | |
nursery.start_soon(send_message_batch_impl, stream2, [b"a", b"b", b"z"]) | |
# Try running the program to test these three different implementations. You | |
# should find that send_message_batch_0 and send_message_batch_2 work, but | |
# send_message_batch_1 gets stuck and locks up. | |
trio.run(test_send_message_batch_impl, send_message_batch_0) | |
#trio.run(test_send_message_batch_impl, send_message_batch_1) | |
#trio.run(test_send_message_batch_impl, send_message_batch_2) | |
# TODO to make this a good example for docs: | |
# - once we have helpers for working with lines, use a line-based protocol to | |
# make it more realistic. | |
# - demonstrate running the protocol over TCP, to show how it works for some | |
# message/batch sizes, but if you make them big enough then you can provoke | |
# the deadlock. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment