Skip to content

Instantly share code, notes, and snippets.

@tzot
Last active November 7, 2025 18:24
Show Gist options
  • Select an option

  • Save tzot/c9076646ae26ce86af8c07e485261b5e to your computer and use it in GitHub Desktop.

Select an option

Save tzot/c9076646ae26ce86af8c07e485261b5e to your computer and use it in GitHub Desktop.
Short example of aiohttp server plus some worker threads waiting on a queue.Queue
import json, time
import functools as ft
import _thread as thr, queue as que
import asyncio as aio
from aiohttp import web
def processor(que_i: que.Queue):
"""This is an example of a worker thread that waits on a queue.Queue;
it receives Futures and params, does the work, schedules the completion of the Future."""
global loop
ident = thr.get_ident()
# print("Thread", ident, "started")
if 1: # SETUP for this example
import sqlite3 as sql
db= sql.connect('file:data/rates.sqlite?mode=ro', uri=True)
cur=db.cursor()
# GENERIC WORKER LOOP
while (queued:= que_i.get()) is not ...:
quantum, item= queued
if 1: # WORK for this example
item['served_by'] = ident
print("before call soon")
item['counts']= list(
cur
.execute("SELECT endpk, COUNT(*) FROM Events GROUP BY 1 ORDER BY 2 DESC LIMIT 1")
.fetchone()
)
loop.call_soon_threadsafe(quantum.set_result, item)
if 1: # TIME for this example
item['served']= time.time()
# print("after call soon")
loop= None
# This is not an async function! It returns a asyncio.Future (which means is awaitable ☺)
def ingress(que_i, item):
"""Generic function that creates a future and pushes it and any args to the que that threads are listening to.
Returns the Future so that it can be awaited."""
global loop
if loop is None:
loop= aio.get_running_loop()
quantum= loop.create_future()
print("ingress", item)
item['got']= time.time()
que_i.put( (quantum, item) )
return quantum
que_i= queue.Queue()
for _ in range(3):
print("Starting", thr.start_new_thread(processor, (que_i,)))
async def process(request):
"""Request processor."""
result= await ingress(que_i, {'start': time.time()})
return web.Response(
text=json.dumps(result, check_circular=False, sort_keys=False, ensure_ascii=False, separators=(',',':')),
content_type='application/json'
)
app= web.Application()
app.router.add_route('*', '/{rest:.*}', process)
web.run_app(app, host='ratelimit', port=8111)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment