Skip to content

Instantly share code, notes, and snippets.

@lewoudar
Last active November 20, 2023 19:11
Show Gist options
  • Save lewoudar/afd6c157cb3039704760373fd89e711a to your computer and use it in GitHub Desktop.
Save lewoudar/afd6c157cb3039704760373fd89e711a to your computer and use it in GitHub Desktop.
An example of a server supporting tcp and udp with anyio
import signal
import anyio
async def signal_handler(scope):
with anyio.open_signal_receiver(signal.SIGINT, signal.SIGTERM) as signals:
async for signum in signals:
if signum == signal.SIGINT:
print('Ctrl+C pressed!')
else:
print('Terminated!')
scope.cancel()
return
async def udp_server():
print('starting udp server')
async with await anyio.create_udp_socket(
local_host='localhost', local_port=1234, reuse_port=True) as udp:
async for packet, (host, port) in udp:
print('received: ', packet)
message = b'Hello, ' + packet
print('will send:', message)
await udp.sendto(message, host, port)
async def handle(client):
async with client:
name = await client.receive(1024)
print('received from TCP:', name)
await client.send(b'Hello, %s\n' % name)
async def tcp_server():
print('starting tcp server')
listener = await anyio.create_tcp_listener(local_port=1234, reuse_port=True)
await listener.serve(handle)
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(signal_handler, tg.cancel_scope)
tg.start_soon(udp_server)
tg.start_soon(tcp_server)
anyio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment