Skip to content

Instantly share code, notes, and snippets.

@arnetheduck
Created January 12, 2026 14:15
Show Gist options
  • Select an option

  • Save arnetheduck/3f4671d216b6bb36d14f3dab7117cb14 to your computer and use it in GitHub Desktop.

Select an option

Save arnetheduck/3f4671d216b6bb36d14f3dab7117cb14 to your computer and use it in GitHub Desktop.
chronos accept queue
import chronos, std/sequtils
type
TransportFut = Future[StreamTransport].Raising([CancelledError, TransportError])
Server = ref object
socket: StreamServer
acceptLoop: Future[void].Raising([])
acceptQueue: AsyncQueue[TransportFut]
proc handshake(
transp: StreamTransport
) {.async: (raises: [CancelledError, TransportError]).} =
# Simulate a hanshake that requires a bit of time
await sleepAsync(1.seconds)
discard await transp.write("\r\n")
discard await transp.readLine()
proc handshakeOrClose(
transp: StreamTransport
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportError]).} =
try:
await transp.handshake()
except TransportError as exc:
await transp.closeWait()
raise exc
except CancelledError as exc:
await transp.closeWait()
raise exc
transp
proc acceptWorker(server: Server) {.async: (raises: []).} =
try:
while true:
let transp =
try:
await server.socket.accept()
except TransportError as exc:
# Accept can fail either because the socket being accepted is broken
# _or_ because the server socket is broken - both are bad :/
# In order not to busy-loop, queue the failure
let fut = TransportFut.init()
fut.fail(exc)
await server.acceptQueue.addLast(fut)
continue
# We are owners transp during the handshake, therefore we must close it
# if the handshake fails - once the handshake is done, ownership passes
# to whoever called `accept`
let fut = transp.handshakeOrClose()
try:
await server.acceptQueue.addLast(fut)
except CancelledError as exc: # Cancelled while waiting for addLast
await fut.cancelAndWait()
raise exc
except CancelledError:
discard
proc accept(
server: Server
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportError]).} =
await (await server.acceptQueue.popFirst())
proc start(server: Server) =
server.acceptLoop = server.acceptWorker()
proc stop(server: Server) {.async: (raises: []).} =
if server.acceptLoop == nil:
return
await server.acceptLoop.cancelAndWait()
proc closeOrCancel(f: Future[StreamTransport]) {.async: (raises: []).} =
await f.cancelAndWait()
if f.completed():
await f.value().closeWait()
proc closeWait(server: Server) {.async: (raises: []).} =
await noCancel server.stop()
await noCancel server.socket.closeWait()
await noCancel allFutures(move(server.acceptQueue).mapIt(it.closeOrCancel()))
proc connectTo(
ta: TransportAddress
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportError]).} =
let transp = await connect(ta)
# Until we've finished connecting, we're owners of transp and therefore need
# to close it
await transp.handshakeOrClose()
proc myApp(acceptQueueSize = 5) {.async.} =
let server = Server(
socket: createStreamServer(AnyAddress6),
acceptQueue: newAsyncQueue[TransportFut](acceptQueueSize),
)
server.start()
let now = Moment.now()
var clients: seq[TransportFut]
echo "Starting..."
for i in 0 ..< 20:
clients.add connectTo(initTAddress("127.0.0.1", server.socket.local.port))
for i in 0 ..< 20:
let accepted = await server.accept()
echo "."
await accepted.closeWait()
for c in clients:
await (await c).closeWait()
await server.stop()
await server.closeWait()
echo "Total: ", Moment.now() - now
waitFor myApp()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment