|
import asyncio |
|
from typing import List, Tuple |
|
|
|
import jsonschema |
|
from aiocache import cached |
|
from pydantic import BaseSettings, Field |
|
from fastapi import FastAPI, Request |
|
from fastapi.openapi.docs import get_swagger_ui_html |
|
from fastapi.routing import APIRoute |
|
from httpx import AsyncClient |
|
|
|
|
|
OPENAPI_META_SCHEMA_URL = "https://github.com/OAI/OpenAPI-Specification/blob/master/schemas/v3.0/schema.json" |
|
|
|
|
|
class Settings(BaseSettings): |
|
openapi_urls: List[str] = Field(description="JSON-encoded list of URLs to OpenAPI schemas to combine.") |
|
specification_title: str = Field( |
|
"Combined OpenAPI Specification", description="Title of the combined OpenAPI specification." |
|
) |
|
cache_refresh_seconds: int = Field( |
|
300, description="Number of seconds before cache of OpenAPI specifications is refreshed." |
|
) |
|
|
|
|
|
SETTINGS = Settings() |
|
|
|
|
|
APP = FastAPI() |
|
|
|
|
|
@APP.get("/raw") |
|
async def openapi_json(): |
|
"""Retrieve the combined specification in JSON.""" |
|
specs = await asyncio.gather(*[fetch_openapi_spec(url) for url in SETTINGS.openapi_urls]) |
|
versions = {spec["openapi"] for spec in specs} |
|
assert len(versions) == 1, "Got specifications with different openapi versions." |
|
return { |
|
"openapi": versions.pop(), |
|
"info": { |
|
"title": SETTINGS.specification_title, |
|
"version": sum_versions(*[spec["info"]["version"] for spec in specs]), |
|
}, |
|
"paths": merge(*[spec["paths"] for spec in specs]), |
|
"components": merge(*[spec["components"] for spec in specs]), |
|
} |
|
|
|
|
|
@APP.get("/") |
|
async def swagger_ui(request: Request): |
|
"""Render a combined specification using the Swagger UI.""" |
|
root_path = request.scope.get("root_path", "").rstrip("/") |
|
openapi_json_path = {route.endpoint: route.path for route in APP.routes if isinstance(route, APIRoute)}[ |
|
openapi_json |
|
] |
|
return get_swagger_ui_html( |
|
openapi_url=root_path + openapi_json_path, |
|
title=SETTINGS.specification_title + " - Swagger UI", |
|
oauth2_redirect_url=None, |
|
init_oauth=None, |
|
) |
|
|
|
|
|
@cached(ttl=SETTINGS.cache_refresh_seconds) |
|
async def fetch_openapi_spec(url: str) -> dict: |
|
"""Fetch, validate and parse an OpenAPI specification. |
|
|
|
Supports JSON encoded specifications. |
|
""" |
|
async with AsyncClient() as client: |
|
response = await client.get(url) |
|
assert response.status_code in range(200, 300) |
|
spec = response.json() |
|
validate_openapi_specification(spec) |
|
return spec |
|
|
|
|
|
async def validate_openapi_specification(specification: dict) -> None: |
|
"""Validate that an OpenAPI specification is valid.""" |
|
jsonschema.validate(instance=specification, schema=await openapi_meta_schema()) |
|
|
|
|
|
@cached(ttl=None) |
|
async def openapi_meta_schema(): |
|
"""Fetch the meta-schema for OpenAPI specifications.""" |
|
async with AsyncClient() as client: |
|
response = await client.get(OPENAPI_META_SCHEMA_URL) |
|
meta_schema = response.json() |
|
return meta_schema |
|
|
|
|
|
def sum_versions(*versions: str) -> str: |
|
"""Element-wise sum on semantic versions. |
|
|
|
Unique assuming individual versions are never rolled back. |
|
""" |
|
|
|
def parse_version(version: str) -> Tuple[int, int, int]: |
|
try: |
|
major, minor, patch = map(int, version.split(".")) |
|
return major, minor, patch |
|
except ValueError: |
|
return 0, 0, 0 |
|
|
|
major, minor, patch = map(sum, zip(*map(parse_version, versions))) |
|
return f"{major}.{minor}.{patch}" |
|
|
|
|
|
def merge(*dictionaries: dict) -> dict: |
|
"""Merge several dictionaries recursively.""" |
|
result = {} |
|
for key in set.union(*map(set, dictionaries)): |
|
values = [dictionary[key] for dictionary in dictionaries if key in dictionary] |
|
if all(isinstance(value, dict) for value in values): |
|
result[key] = merge(*values) |
|
else: |
|
result[key] = values[-1] |
|
return result |