Last active
November 7, 2025 18:24
-
-
Save tzot/c9076646ae26ce86af8c07e485261b5e to your computer and use it in GitHub Desktop.
Short example of aiohttp server plus some worker threads waiting on a queue.Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json, time | |
| import functools as ft | |
| import _thread as thr, queue as que | |
| import asyncio as aio | |
| from aiohttp import web | |
| def processor(que_i: que.Queue): | |
| """This is an example of a worker thread that waits on a queue.Queue; | |
| it receives Futures and params, does the work, schedules the completion of the Future.""" | |
| global loop | |
| ident = thr.get_ident() | |
| # print("Thread", ident, "started") | |
| if 1: # SETUP for this example | |
| import sqlite3 as sql | |
| db= sql.connect('file:data/rates.sqlite?mode=ro', uri=True) | |
| cur=db.cursor() | |
| # GENERIC WORKER LOOP | |
| while (queued:= que_i.get()) is not ...: | |
| quantum, item= queued | |
| if 1: # WORK for this example | |
| item['served_by'] = ident | |
| print("before call soon") | |
| item['counts']= list( | |
| cur | |
| .execute("SELECT endpk, COUNT(*) FROM Events GROUP BY 1 ORDER BY 2 DESC LIMIT 1") | |
| .fetchone() | |
| ) | |
| loop.call_soon_threadsafe(quantum.set_result, item) | |
| if 1: # TIME for this example | |
| item['served']= time.time() | |
| # print("after call soon") | |
| loop= None | |
| # This is not an async function! It returns a asyncio.Future (which means is awaitable ☺) | |
| def ingress(que_i, item): | |
| """Generic function that creates a future and pushes it and any args to the que that threads are listening to. | |
| Returns the Future so that it can be awaited.""" | |
| global loop | |
| if loop is None: | |
| loop= aio.get_running_loop() | |
| quantum= loop.create_future() | |
| print("ingress", item) | |
| item['got']= time.time() | |
| que_i.put( (quantum, item) ) | |
| return quantum | |
| que_i= queue.Queue() | |
| for _ in range(3): | |
| print("Starting", thr.start_new_thread(processor, (que_i,))) | |
| async def process(request): | |
| """Request processor.""" | |
| result= await ingress(que_i, {'start': time.time()}) | |
| return web.Response( | |
| text=json.dumps(result, check_circular=False, sort_keys=False, ensure_ascii=False, separators=(',',':')), | |
| content_type='application/json' | |
| ) | |
| app= web.Application() | |
| app.router.add_route('*', '/{rest:.*}', process) | |
| web.run_app(app, host='ratelimit', port=8111) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment