Skip to content

Instantly share code, notes, and snippets.

@d01phin
Created January 13, 2025 19:05
Show Gist options
  • Save d01phin/fe6ede20bc21c2a2442bf814f99c6ea7 to your computer and use it in GitHub Desktop.
Save d01phin/fe6ede20bc21c2a2442bf814f99c6ea7 to your computer and use it in GitHub Desktop.
IPC with uvloop and unix sockets
import asyncio
import os
import uvloop
# Set uvloop as the default event loop policy
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
class UnixSocketIPCServer:
def __init__(self, socket_path):
self.socket_path = socket_path
self.server = None
async def handle_client(self, reader, writer):
peername = writer.get_extra_info('peername')
print(f"Connected to client: {peername}")
try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode()
print(f"Received: {message}")
response = f"Echo: {message}"
writer.write(response.encode())
await writer.drain()
except asyncio.CancelledError:
print("Client connection closed.")
finally:
print("Closing client connection.")
writer.close()
await writer.wait_closed()
async def start(self):
# Ensure the socket path does not exist
if os.path.exists(self.socket_path):
os.remove(self.socket_path)
self.server = await asyncio.start_unix_server(self.handle_client, path=self.socket_path)
print(f"Server listening on {self.socket_path}")
async with self.server:
await self.server.serve_forever()
async def stop(self):
if self.server:
self.server.close()
await self.server.wait_closed()
if os.path.exists(self.socket_path):
os.remove(self.socket_path)
print("Server stopped.")
class UnixSocketIPCClient:
def __init__(self, socket_path):
self.socket_path = socket_path
async def send_message(self, message):
reader, writer = await asyncio.open_unix_connection(self.socket_path)
print(f"Connected to server at {self.socket_path}")
try:
print(f"Sending: {message}")
writer.write(message.encode())
await writer.drain()
response = await reader.read(1024)
print(f"Received: {response.decode()}")
finally:
print("Closing connection.")
writer.close()
await writer.wait_closed()
async def main():
socket_path = "/tmp/ipc_socket"
# Create and start the server
server = UnixSocketIPCServer(socket_path)
server_task = asyncio.create_task(server.start())
# Allow the server to start
await asyncio.sleep(1)
# Create the client and send a message
client = UnixSocketIPCClient(socket_path)
await client.send_message("Hello, Server!")
# Stop the server
await server.stop()
# Ensure the server task is cancelled
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment