Skip to content

Instantly share code, notes, and snippets.

@pauldzy
Created April 20, 2025 21:54
Show Gist options
  • Save pauldzy/7ce508069cdb4280fb820d484f407ad2 to your computer and use it in GitHub Desktop.
Save pauldzy/7ce508069cdb4280fb820d484f407ad2 to your computer and use it in GitHub Desktop.
async_cipservice
from __future__ import annotations;
import os,sys;
import httpx;
import asyncio;
import anyio;
import time;
import sqlite3;
import json;
from collections.abc import Callable,Sequence;
from functools import partial, wraps;
from typing import Any;
###############################################################################
client = httpx.AsyncClient(
limits = httpx.Limits(
max_connections = None
,max_keepalive_connections = 20
)
);
###############################################################################
# From agronholm/sqlite.py
class Connection:
def __init__(self, _real_connection: sqlite3.Connection) -> None:
self._real_connection = _real_connection;
self._limiter = anyio.CapacityLimiter(1);
@wraps(sqlite3.Connection.close)
async def close(self):
return await anyio.to_thread.run_sync(self._real_connection.close,limiter=self._limiter);
@wraps(sqlite3.Connection.commit)
async def commit(self):
return await anyio.to_thread.run_sync(self._real_connection.commit,limiter=self._limiter);
@wraps(sqlite3.Connection.rollback)
async def rollback(self):
return await anyio.to_thread.run_sync(self._real_connection.rollback,limiter=self._limiter);
async def cursor(self, factory: Callable[[sqlite3.Connection],sqlite3.Cursor] = sqlite3.Cursor) -> Cursor:
real_cursor = await anyio.to_thread.run_sync(self._real_connection.cursor,factory, limiter=self._limiter);
return Cursor(real_cursor,self._limiter);
class Cursor:
def __init__(
self
,real_cursor: sqlite3.Cursor
,limiter: anyio.CapacityLimiter
) -> None:
self._real_cursor = real_cursor;
self._limiter = limiter;
@property
def description(self) -> str:
return self._real_cursor.description;
@property
def rowcount(self) -> int:
return self._real_cursor.rowcount;
@property
def arraysize(self) -> int:
return self._real_cursor.arraysize;
@wraps(sqlite3.Cursor.close)
async def close(self) -> None:
await anyio.to_thread.run_sync(self._real_cursor.close, limiter=self._limiter);
@wraps(sqlite3.Cursor.execute)
async def execute(
self
,sql: str
,parameters: Sequence[Any] = ()
,/
) -> Cursor:
real_cursor = await anyio.to_thread.run_sync(self._real_cursor.execute, sql, parameters, limiter=self._limiter);
return Cursor(real_cursor, self._limiter);
@wraps(sqlite3.Cursor.executemany)
async def executemany(self, sql: str, parameters: Sequence[Any], /) -> Cursor:
real_cursor = await anyio.to_thread.run_sync(self._real_cursor.executemany, sql, parameters, limiter=self._limiter);
return Cursor(real_cursor, self._limiter);
@wraps(sqlite3.Cursor.executescript)
async def executescript(self, sql_script: str, /) -> Cursor:
real_cursor = await anyio.to_thread.run_sync(self._real_cursor.executescript, sql_script, limiter=self._limiter);
return Cursor(real_cursor, self._limiter);
@wraps(sqlite3.Cursor.fetchone)
async def fetchone(self) -> tuple[Any, ...] | None:
return await anyio.to_thread.run_sync(self._real_cursor.fetchone, limiter=self._limiter);
@wraps(sqlite3.Cursor.fetchmany)
async def fetchmany(self, size: int) -> list[tuple[Any, ...]]:
return await anyio.to_thread.run_sync(self._real_cursor.fetchmany, size, limiter=self._limiter);
@wraps(sqlite3.Cursor.fetchall)
async def fetchall(self) -> list[tuple[Any, ...]]:
return await anyio.to_thread.run_sync(self._real_cursor.fetchall, limiter=self._limiter);
async def connect(database: str) -> Connection:
real_connection = await anyio.to_thread.run_sync(
partial(sqlite3.connect, database, check_same_thread=False)
);
return Connection(real_connection);
###############################################################################
def sync_do_work(pid,url,data,headers):
response = httpx.post(url,json=data);
print("s " + str(pid));
return (pid,response.status_code,response.text);
###############################################################################
def sync_pull_all_work(items):
rez = [];
for pid,url,data,headers in items:
rez.append(sync_do_work(pid,url,data,headers));
return rez;
###############################################################################
async def do_work(pid,url,data,headers):
response = await client.post(url,json=data);
print("a " + str(pid));
return (pid,response.status_code,response.text);
###############################################################################
async def pull_all_work(items):
return await asyncio.gather(
*[do_work(pid,url,data,headers) for pid,url,data,headers in items]
);
###############################################################################
async def read_min_max(curs):
await curs.execute(
"SELECT MIN(a.objectid) AS min_objectid, MAX(a.objectid) AS max_objectid FROM requests a " +
"LEFT JOIN responses b ON a.objectid = b.objectid WHERE b.objectid IS NULL"
);
rez = await curs.fetchone();
return (rez[0],rez[1]);
###############################################################################
async def read_requests(curs,start_oid,stop_oid):
await curs.execute(
"SELECT a.objectid,a.url,a.data,a.headers FROM requests a WHERE a.objectid >= $1 AND a.objectid <= $2"
,(start_oid,stop_oid)
);
rows = await curs.fetchall();
rez = [];
for row in rows:
rez.append((row[0],row[1],json.loads(row[2]),row[3]));
return rez;
###############################################################################
async def insert_responses(curs,responses):
if responses is None or len(responses) == 0:
return None;
await curs.executemany("INSERT INTO responses(objectid,status_code,jsontext) VALUES ($1,$2,$3)",responses);
return len(responses);
###############################################################################
def chunker(iterable,chunksize):
i = 0;
while i < len(iterable):
yield iterable[i:i + chunksize];
i += chunksize;
###############################################################################
async def main(
mgdb_file
,batch_size
,start_oid = None
,stop_oid = None
):
mgdb = await connect(mgdb_file);
curs = await mgdb.cursor();
if batch_size is None or batch_size < 1:
batch_size = 2;
if start_oid is None \
or stop_oid is None:
(calc_start,calc_stop) = await read_min_max(curs);
if start_oid is None:
start_oid = calc_start;
if stop_oid is None:
stop_oid = calc_stop;
print("start: " + str(start_oid) + " stop: " + str(stop_oid));
inserted_count = 0;
chk = 1;
for chunk in chunker(range(start_oid,stop_oid+1),batch_size):
print("chunk: " + str(chk) + " for " + str(min(chunk)) + " to " + str(max(chunk)));
requests = await read_requests(
curs = curs
,start_oid = min(chunk)
,stop_oid = max(chunk)
);
async_start = time.time();
responses = await pull_all_work(requests);
inserted = await insert_responses(curs,responses)
async_end = time.time();
print(f" lapsed: {async_end - async_start}");
chk += 1;
inserted_count += inserted;
await mgdb.commit();
return inserted_count;
###############################################################################
if __name__ == "__main__":
print("Running asynchronously...")
z = asyncio.run(
main('async_cipservice.geodatabase',10,100,200)
);
print(" inserted " + str(z) + " records.");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment