Last active
January 7, 2020 17:08
-
-
Save AndreiPashkin/04c287def6d165fc2832 to your computer and use it in GitHub Desktop.
Python asyncio coroutines pipelining and exception propagation sandbox
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
"""Client that demonstrates processing pipeline.""" | |
import asyncio | |
from functools import partial | |
import logging | |
logging.basicConfig(level='DEBUG') | |
class Pipeline(object): | |
def __init__(self, *units): | |
in_ = output = self.input = asyncio.Queue() | |
tasks = [] | |
for unit in units: | |
output = asyncio.Queue() | |
tasks.append(asyncio.async(unit(in_, output))) | |
in_ = output | |
self.output = output | |
self.future = asyncio.gather(*tasks) | |
self.future.add_done_callback(lambda _: self.future.cancel()) | |
@asyncio.coroutine | |
def put(self, item): | |
yield from self.input.put(item) | |
@staticmethod | |
def _propagate_exc(future, target_future): | |
if future.cancelled(): | |
target_future.cancel() | |
elif future.exception(): | |
target_future.set_exception(future.exception()) | |
else: | |
target_future.set_excpetion( | |
RuntimeError('One of the unit was prematurely ' | |
'stopped execution.')) | |
@asyncio.coroutine | |
def _waiter(self, target): | |
result = (yield from self.output.get()) | |
target.set_result(result) | |
@asyncio.coroutine | |
def get(self): | |
try: | |
result = self.output.get_nowait() | |
except asyncio.QueueEmpty: | |
result = asyncio.Future() | |
self.future.add_done_callback(partial(self._propagate_exc, | |
target_future=result)) | |
asyncio.async(self._waiter(result)) | |
return result | |
def empty(self): | |
return self.output.empty() | |
@asyncio.coroutine | |
def upper(input, output): | |
while True: | |
char = yield from input.get() | |
print('Got char: ', char) | |
yield from output.put(char.upper()) | |
@asyncio.coroutine | |
def words(input, output): | |
"""Raises after collecting 3 words.""" | |
counter = 0 | |
word = [] | |
while True: | |
if counter >= 3: | |
print('Oops!..') | |
raise RuntimeError('Oops!..') | |
e = yield from input.get() | |
if e.isalnum(): | |
word.append(e) | |
print('Current word: ', word) | |
elif len(word) > 0: | |
yield from output.put(''.join(word)) | |
counter += 1 | |
word = [] | |
@asyncio.coroutine | |
def tcp_echo_client(loop): | |
reader, writer = yield from asyncio.open_connection('127.0.0.1', 5555, | |
loop=loop) | |
pipeline = Pipeline(upper, words) | |
@asyncio.coroutine | |
def read(): | |
while True: | |
data = yield from reader.read(100) | |
data = data.decode('utf-8') | |
print('Data: ', data) | |
for byte in data: | |
yield from pipeline.put(byte) | |
asyncio.async(read()) | |
while True: | |
print('Pipeline output: ', (yield from pipeline.get())) | |
print('Close the socket') | |
writer.close() | |
@asyncio.coroutine | |
def background_stuff(): | |
while True: | |
yield from asyncio.sleep(3) | |
print('Other background stuff...') | |
loop = asyncio.get_event_loop() | |
asyncio.async(background_stuff()) | |
loop.run_until_complete(tcp_echo_client(loop)) | |
loop.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
"""Server that accepts a client and send it strings from user input.""" | |
import socket | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
host = '' | |
port = 5555 | |
s.bind((host, port)) | |
s.listen(1) | |
print('Listening...') | |
conn, addr = s.accept() | |
print('Client ({}) connected.'.format(addr)) | |
while True: | |
conn.send(bytes(input('Enter data to send: '), 'UTF-8')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment