Created
April 20, 2025 21:54
-
-
Save pauldzy/7ce508069cdb4280fb820d484f407ad2 to your computer and use it in GitHub Desktop.
async_cipservice
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations; | |
import os,sys; | |
import httpx; | |
import asyncio; | |
import anyio; | |
import time; | |
import sqlite3; | |
import json; | |
from collections.abc import Callable,Sequence; | |
from functools import partial, wraps; | |
from typing import Any; | |
############################################################################### | |
client = httpx.AsyncClient( | |
limits = httpx.Limits( | |
max_connections = None | |
,max_keepalive_connections = 20 | |
) | |
); | |
############################################################################### | |
# From agronholm/sqlite.py | |
class Connection: | |
def __init__(self, _real_connection: sqlite3.Connection) -> None: | |
self._real_connection = _real_connection; | |
self._limiter = anyio.CapacityLimiter(1); | |
@wraps(sqlite3.Connection.close) | |
async def close(self): | |
return await anyio.to_thread.run_sync(self._real_connection.close,limiter=self._limiter); | |
@wraps(sqlite3.Connection.commit) | |
async def commit(self): | |
return await anyio.to_thread.run_sync(self._real_connection.commit,limiter=self._limiter); | |
@wraps(sqlite3.Connection.rollback) | |
async def rollback(self): | |
return await anyio.to_thread.run_sync(self._real_connection.rollback,limiter=self._limiter); | |
async def cursor(self, factory: Callable[[sqlite3.Connection],sqlite3.Cursor] = sqlite3.Cursor) -> Cursor: | |
real_cursor = await anyio.to_thread.run_sync(self._real_connection.cursor,factory, limiter=self._limiter); | |
return Cursor(real_cursor,self._limiter); | |
class Cursor: | |
def __init__( | |
self | |
,real_cursor: sqlite3.Cursor | |
,limiter: anyio.CapacityLimiter | |
) -> None: | |
self._real_cursor = real_cursor; | |
self._limiter = limiter; | |
@property | |
def description(self) -> str: | |
return self._real_cursor.description; | |
@property | |
def rowcount(self) -> int: | |
return self._real_cursor.rowcount; | |
@property | |
def arraysize(self) -> int: | |
return self._real_cursor.arraysize; | |
@wraps(sqlite3.Cursor.close) | |
async def close(self) -> None: | |
await anyio.to_thread.run_sync(self._real_cursor.close, limiter=self._limiter); | |
@wraps(sqlite3.Cursor.execute) | |
async def execute( | |
self | |
,sql: str | |
,parameters: Sequence[Any] = () | |
,/ | |
) -> Cursor: | |
real_cursor = await anyio.to_thread.run_sync(self._real_cursor.execute, sql, parameters, limiter=self._limiter); | |
return Cursor(real_cursor, self._limiter); | |
@wraps(sqlite3.Cursor.executemany) | |
async def executemany(self, sql: str, parameters: Sequence[Any], /) -> Cursor: | |
real_cursor = await anyio.to_thread.run_sync(self._real_cursor.executemany, sql, parameters, limiter=self._limiter); | |
return Cursor(real_cursor, self._limiter); | |
@wraps(sqlite3.Cursor.executescript) | |
async def executescript(self, sql_script: str, /) -> Cursor: | |
real_cursor = await anyio.to_thread.run_sync(self._real_cursor.executescript, sql_script, limiter=self._limiter); | |
return Cursor(real_cursor, self._limiter); | |
@wraps(sqlite3.Cursor.fetchone) | |
async def fetchone(self) -> tuple[Any, ...] | None: | |
return await anyio.to_thread.run_sync(self._real_cursor.fetchone, limiter=self._limiter); | |
@wraps(sqlite3.Cursor.fetchmany) | |
async def fetchmany(self, size: int) -> list[tuple[Any, ...]]: | |
return await anyio.to_thread.run_sync(self._real_cursor.fetchmany, size, limiter=self._limiter); | |
@wraps(sqlite3.Cursor.fetchall) | |
async def fetchall(self) -> list[tuple[Any, ...]]: | |
return await anyio.to_thread.run_sync(self._real_cursor.fetchall, limiter=self._limiter); | |
async def connect(database: str) -> Connection: | |
real_connection = await anyio.to_thread.run_sync( | |
partial(sqlite3.connect, database, check_same_thread=False) | |
); | |
return Connection(real_connection); | |
############################################################################### | |
def sync_do_work(pid,url,data,headers): | |
response = httpx.post(url,json=data); | |
print("s " + str(pid)); | |
return (pid,response.status_code,response.text); | |
############################################################################### | |
def sync_pull_all_work(items): | |
rez = []; | |
for pid,url,data,headers in items: | |
rez.append(sync_do_work(pid,url,data,headers)); | |
return rez; | |
############################################################################### | |
async def do_work(pid,url,data,headers): | |
response = await client.post(url,json=data); | |
print("a " + str(pid)); | |
return (pid,response.status_code,response.text); | |
############################################################################### | |
async def pull_all_work(items): | |
return await asyncio.gather( | |
*[do_work(pid,url,data,headers) for pid,url,data,headers in items] | |
); | |
############################################################################### | |
async def read_min_max(curs): | |
await curs.execute( | |
"SELECT MIN(a.objectid) AS min_objectid, MAX(a.objectid) AS max_objectid FROM requests a " + | |
"LEFT JOIN responses b ON a.objectid = b.objectid WHERE b.objectid IS NULL" | |
); | |
rez = await curs.fetchone(); | |
return (rez[0],rez[1]); | |
############################################################################### | |
async def read_requests(curs,start_oid,stop_oid): | |
await curs.execute( | |
"SELECT a.objectid,a.url,a.data,a.headers FROM requests a WHERE a.objectid >= $1 AND a.objectid <= $2" | |
,(start_oid,stop_oid) | |
); | |
rows = await curs.fetchall(); | |
rez = []; | |
for row in rows: | |
rez.append((row[0],row[1],json.loads(row[2]),row[3])); | |
return rez; | |
############################################################################### | |
async def insert_responses(curs,responses): | |
if responses is None or len(responses) == 0: | |
return None; | |
await curs.executemany("INSERT INTO responses(objectid,status_code,jsontext) VALUES ($1,$2,$3)",responses); | |
return len(responses); | |
############################################################################### | |
def chunker(iterable,chunksize): | |
i = 0; | |
while i < len(iterable): | |
yield iterable[i:i + chunksize]; | |
i += chunksize; | |
############################################################################### | |
async def main( | |
mgdb_file | |
,batch_size | |
,start_oid = None | |
,stop_oid = None | |
): | |
mgdb = await connect(mgdb_file); | |
curs = await mgdb.cursor(); | |
if batch_size is None or batch_size < 1: | |
batch_size = 2; | |
if start_oid is None \ | |
or stop_oid is None: | |
(calc_start,calc_stop) = await read_min_max(curs); | |
if start_oid is None: | |
start_oid = calc_start; | |
if stop_oid is None: | |
stop_oid = calc_stop; | |
print("start: " + str(start_oid) + " stop: " + str(stop_oid)); | |
inserted_count = 0; | |
chk = 1; | |
for chunk in chunker(range(start_oid,stop_oid+1),batch_size): | |
print("chunk: " + str(chk) + " for " + str(min(chunk)) + " to " + str(max(chunk))); | |
requests = await read_requests( | |
curs = curs | |
,start_oid = min(chunk) | |
,stop_oid = max(chunk) | |
); | |
async_start = time.time(); | |
responses = await pull_all_work(requests); | |
inserted = await insert_responses(curs,responses) | |
async_end = time.time(); | |
print(f" lapsed: {async_end - async_start}"); | |
chk += 1; | |
inserted_count += inserted; | |
await mgdb.commit(); | |
return inserted_count; | |
############################################################################### | |
if __name__ == "__main__": | |
print("Running asynchronously...") | |
z = asyncio.run( | |
main('async_cipservice.geodatabase',10,100,200) | |
); | |
print(" inserted " + str(z) + " records."); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment