Created
January 13, 2025 19:05
-
-
Save d01phin/fe6ede20bc21c2a2442bf814f99c6ea7 to your computer and use it in GitHub Desktop.
IPC with uvloop and unix sockets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import uvloop | |
# Set uvloop as the default event loop policy | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
class UnixSocketIPCServer: | |
def __init__(self, socket_path): | |
self.socket_path = socket_path | |
self.server = None | |
async def handle_client(self, reader, writer): | |
peername = writer.get_extra_info('peername') | |
print(f"Connected to client: {peername}") | |
try: | |
while True: | |
data = await reader.read(1024) | |
if not data: | |
break | |
message = data.decode() | |
print(f"Received: {message}") | |
response = f"Echo: {message}" | |
writer.write(response.encode()) | |
await writer.drain() | |
except asyncio.CancelledError: | |
print("Client connection closed.") | |
finally: | |
print("Closing client connection.") | |
writer.close() | |
await writer.wait_closed() | |
async def start(self): | |
# Ensure the socket path does not exist | |
if os.path.exists(self.socket_path): | |
os.remove(self.socket_path) | |
self.server = await asyncio.start_unix_server(self.handle_client, path=self.socket_path) | |
print(f"Server listening on {self.socket_path}") | |
async with self.server: | |
await self.server.serve_forever() | |
async def stop(self): | |
if self.server: | |
self.server.close() | |
await self.server.wait_closed() | |
if os.path.exists(self.socket_path): | |
os.remove(self.socket_path) | |
print("Server stopped.") | |
class UnixSocketIPCClient: | |
def __init__(self, socket_path): | |
self.socket_path = socket_path | |
async def send_message(self, message): | |
reader, writer = await asyncio.open_unix_connection(self.socket_path) | |
print(f"Connected to server at {self.socket_path}") | |
try: | |
print(f"Sending: {message}") | |
writer.write(message.encode()) | |
await writer.drain() | |
response = await reader.read(1024) | |
print(f"Received: {response.decode()}") | |
finally: | |
print("Closing connection.") | |
writer.close() | |
await writer.wait_closed() | |
async def main(): | |
socket_path = "/tmp/ipc_socket" | |
# Create and start the server | |
server = UnixSocketIPCServer(socket_path) | |
server_task = asyncio.create_task(server.start()) | |
# Allow the server to start | |
await asyncio.sleep(1) | |
# Create the client and send a message | |
client = UnixSocketIPCClient(socket_path) | |
await client.send_message("Hello, Server!") | |
# Stop the server | |
await server.stop() | |
# Ensure the server task is cancelled | |
server_task.cancel() | |
try: | |
await server_task | |
except asyncio.CancelledError: | |
pass | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment