Created
January 12, 2026 14:15
-
-
Save arnetheduck/3f4671d216b6bb36d14f3dab7117cb14 to your computer and use it in GitHub Desktop.
chronos accept queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import chronos, std/sequtils | |
| type | |
| TransportFut = Future[StreamTransport].Raising([CancelledError, TransportError]) | |
| Server = ref object | |
| socket: StreamServer | |
| acceptLoop: Future[void].Raising([]) | |
| acceptQueue: AsyncQueue[TransportFut] | |
| proc handshake( | |
| transp: StreamTransport | |
| ) {.async: (raises: [CancelledError, TransportError]).} = | |
| # Simulate a hanshake that requires a bit of time | |
| await sleepAsync(1.seconds) | |
| discard await transp.write("\r\n") | |
| discard await transp.readLine() | |
| proc handshakeOrClose( | |
| transp: StreamTransport | |
| ): Future[StreamTransport] {.async: (raises: [CancelledError, TransportError]).} = | |
| try: | |
| await transp.handshake() | |
| except TransportError as exc: | |
| await transp.closeWait() | |
| raise exc | |
| except CancelledError as exc: | |
| await transp.closeWait() | |
| raise exc | |
| transp | |
| proc acceptWorker(server: Server) {.async: (raises: []).} = | |
| try: | |
| while true: | |
| let transp = | |
| try: | |
| await server.socket.accept() | |
| except TransportError as exc: | |
| # Accept can fail either because the socket being accepted is broken | |
| # _or_ because the server socket is broken - both are bad :/ | |
| # In order not to busy-loop, queue the failure | |
| let fut = TransportFut.init() | |
| fut.fail(exc) | |
| await server.acceptQueue.addLast(fut) | |
| continue | |
| # We are owners transp during the handshake, therefore we must close it | |
| # if the handshake fails - once the handshake is done, ownership passes | |
| # to whoever called `accept` | |
| let fut = transp.handshakeOrClose() | |
| try: | |
| await server.acceptQueue.addLast(fut) | |
| except CancelledError as exc: # Cancelled while waiting for addLast | |
| await fut.cancelAndWait() | |
| raise exc | |
| except CancelledError: | |
| discard | |
| proc accept( | |
| server: Server | |
| ): Future[StreamTransport] {.async: (raises: [CancelledError, TransportError]).} = | |
| await (await server.acceptQueue.popFirst()) | |
| proc start(server: Server) = | |
| server.acceptLoop = server.acceptWorker() | |
| proc stop(server: Server) {.async: (raises: []).} = | |
| if server.acceptLoop == nil: | |
| return | |
| await server.acceptLoop.cancelAndWait() | |
| proc closeOrCancel(f: Future[StreamTransport]) {.async: (raises: []).} = | |
| await f.cancelAndWait() | |
| if f.completed(): | |
| await f.value().closeWait() | |
| proc closeWait(server: Server) {.async: (raises: []).} = | |
| await noCancel server.stop() | |
| await noCancel server.socket.closeWait() | |
| await noCancel allFutures(move(server.acceptQueue).mapIt(it.closeOrCancel())) | |
| proc connectTo( | |
| ta: TransportAddress | |
| ): Future[StreamTransport] {.async: (raises: [CancelledError, TransportError]).} = | |
| let transp = await connect(ta) | |
| # Until we've finished connecting, we're owners of transp and therefore need | |
| # to close it | |
| await transp.handshakeOrClose() | |
| proc myApp(acceptQueueSize = 5) {.async.} = | |
| let server = Server( | |
| socket: createStreamServer(AnyAddress6), | |
| acceptQueue: newAsyncQueue[TransportFut](acceptQueueSize), | |
| ) | |
| server.start() | |
| let now = Moment.now() | |
| var clients: seq[TransportFut] | |
| echo "Starting..." | |
| for i in 0 ..< 20: | |
| clients.add connectTo(initTAddress("127.0.0.1", server.socket.local.port)) | |
| for i in 0 ..< 20: | |
| let accepted = await server.accept() | |
| echo "." | |
| await accepted.closeWait() | |
| for c in clients: | |
| await (await c).closeWait() | |
| await server.stop() | |
| await server.closeWait() | |
| echo "Total: ", Moment.now() - now | |
| waitFor myApp() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment