Created
August 30, 2023 06:32
-
-
Save commonism/2ea36e85b29bcfa4a53a7326b584989b to your computer and use it in GitHub Desktop.
pagination parameter caching
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/Dorthu/openapi3/issues/110 | |
import asyncio | |
import dataclasses | |
import random | |
import sys | |
import string | |
import uuid | |
if sys.version_info >= (3, 9): | |
from typing import List, Annotated | |
else: | |
from typing import List | |
from typing_extensions import Annotated | |
from pathlib import Path | |
import uvloop | |
from hypercorn.asyncio import serve | |
from hypercorn.config import Config | |
import pydantic | |
from fastapi import FastAPI, Request, Response, Query, UploadFile, Body | |
from fastapi.responses import PlainTextResponse | |
import pytest | |
import pytest_asyncio | |
import aiopenapi3 | |
app = FastAPI(version="1.0.0", title="pagination test", servers=[{"url": "/", "description": "Default, relative server"}]) | |
@pytest.fixture(scope="session") | |
def config(unused_tcp_port_factory): | |
c = Config() | |
c.bind = [f"localhost:{unused_tcp_port_factory()}"] | |
return c | |
@pytest_asyncio.fixture(scope="session") | |
async def server(event_loop, config): | |
policy = asyncio.get_event_loop_policy() | |
uvloop.install() | |
try: | |
sd = asyncio.Event() | |
task = event_loop.create_task(serve(app, config, shutdown_trigger=sd.wait)) | |
yield config | |
finally: | |
sd.set() | |
await task | |
asyncio.set_event_loop_policy(policy) | |
@pytest_asyncio.fixture(scope="session") | |
async def client(event_loop, server): | |
api = await aiopenapi3.OpenAPI.load_async(f"http://{server.bind[0]}/openapi.json") | |
return api | |
class Content(pydantic.BaseModel): | |
data: str = pydantic.Field(default_factory=lambda: ''.join([random.choice(string.ascii_letters) for i in range(16)])) | |
class Page(pydantic.BaseModel): | |
next: str|None | |
items: List[Content] | |
@dataclasses.dataclass | |
class Session: | |
total: int | |
offset: int = 0 | |
SESSIONS = {} | |
@app.get("/pagination", operation_id="pagination") | |
def pagination(request: Request, response: Response, next: str|None = Query(None), perpage: int=Query(12)) -> Page: | |
if next is None: | |
SESSIONS[s:=str(uuid.uuid4())] = (S:=Session(total=random.randint(3*perpage,7*perpage), offset=perpage)) | |
else: | |
if (S:=SESSIONS.get(next, None)) is not None: | |
del SESSIONS[next] | |
if S.offset + perpage < S.total: | |
S.offset += perpage | |
SESSIONS[s:=str(uuid.uuid4())] = S | |
else: | |
perpage = S.total - S.offset | |
s = None | |
else: | |
raise ValueError("Not found next") | |
return Page(next=s, items=[Content() for i in range(perpage)]) | |
@pytest.mark.asyncio | |
async def test_aiopenapi3_pagination(event_loop, server, client): | |
params = dict() | |
size = 0 | |
while True: | |
page = await client._.pagination(parameters=params) | |
for i in page.items: | |
size += len(i.data) | |
if page.next is None: | |
break | |
params["next"] = page.next | |
@pytest.mark.asyncio | |
@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires asyncio.to_thread") | |
async def test_sync_pagination(event_loop, server): | |
client = await asyncio.to_thread( | |
aiopenapi3.OpenAPI.load_sync, | |
f"http://{server.bind[0]}/openapi.json", | |
) | |
params = dict() | |
size = 0 | |
while True: | |
page = await asyncio.to_thread(client._.pagination, parameters=params) | |
for i in page.items: | |
size += len(i.data) | |
if page.next is None: | |
break | |
params["next"] = page.next | |
@pytest.mark.asyncio | |
@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires asyncio.to_thread") | |
async def test_openapi3_pagination_to_thread(event_loop, server): | |
import openapi3 | |
import requests | |
data = await asyncio.to_thread(requests.get, f"http://{server.bind[0]}/openapi.json") | |
data = data.json() | |
data["servers"][0]["url"] = f"http://{server.bind[0]}" | |
client = openapi3.OpenAPI(data) | |
params = dict() | |
size = 0 | |
while True: | |
page = await asyncio.to_thread(client.call_pagination, parameters=params) | |
for i in page.items: | |
size += len(i.data) | |
if page.next is None: | |
break | |
params["next"] = page.next | |
@pytest.mark.asyncio | |
@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires asyncio.to_thread") | |
async def test_openapi3_pagination_async(event_loop, server): | |
import openapi3 | |
import requests | |
data = await asyncio.to_thread(requests.get, f"http://{server.bind[0]}/openapi.json") | |
data = data.json() | |
data["servers"][0]["url"] = f"http://{server.bind[0]}" | |
client = openapi3.OpenAPI(data) | |
params = dict() | |
size = 0 | |
while True: | |
page = await client.call_pagination(parameters=params) | |
for i in page.items: | |
size += len(i.data) | |
if page.next is None: | |
break | |
params["next"] = page.next |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment