Skip to content

Instantly share code, notes, and snippets.

@d01phin
d01phin / uvloop_socket_ipc.py
Created January 13, 2025 19:05
IPC with uvloop and unix sockets
import asyncio
import os
import uvloop
# Set uvloop as the default event loop policy
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
class UnixSocketIPCServer:
def __init__(self, socket_path):
@d01phin
d01phin / stream.go
Created November 25, 2024 10:46
Generic Go Stream implementation
package main
import "fmt"
type Stream[T any] struct {
iter func() (T, bool)
}
func StreamFromSlice[T any](items []T) *Stream[T] {
index := 0
import uvloop
import asyncio
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def handle_client(client_reader, client_writer, remote_host, remote_port):
try:
# Connect to the remote server
remote_reader, remote_writer = await asyncio.open_connection(remote_host, remote_port)