Skip to content

Instantly share code, notes, and snippets.

@kelvan
Created October 29, 2020 14:35
Show Gist options
  • Save kelvan/49e3efb99c329b4c2476d49458b19c19 to your computer and use it in GitHub Desktop.
Save kelvan/49e3efb99c329b4c2476d49458b19c19 to your computer and use it in GitHub Desktop.
Async client for apistar
import apistar # type: ignore
import typesystem
from apistar.client.transports import BlockAllCookies, HTTPTransport
from httpx import AsyncClient
class AsyncHTTPTransport(HTTPTransport):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if kwargs.get('session') is None:
session = AsyncClient()
auth = kwargs.get('auth')
if auth is not None:
session.auth = auth
if not kwargs.get('allow_cookies', True):
session.cookies.set_policy(BlockAllCookies())
self.session = session
async def send(self, method, url, query_params=None, content=None, encoding=None): # NOQA: E501
options = self.get_request_options(query_params, content, encoding)
response = await self.session.request(method, url, **options)
result = self.decode_response_content(response)
if 400 <= response.status_code <= 599:
title = f'{response.status_code} {response.reason}'
raise apistar.exceptions.ErrorResponse(
title=title, status_code=response.status_code, content=result
)
return result
class Client(apistar.Client):
def init_transport(self, *args, **kwargs):
return AsyncHTTPTransport(*args, **kwargs)
async def request(self, operation_id: str, **params):
link = self.lookup_operation(operation_id)
validator = typesystem.Object(
properties={field.name: typesystem.Any() for field in link.fields},
required=[field.name for field in link.fields if field.required],
additional_properties=False,
)
try:
validator.validate(params)
except typesystem.ValidationError as exc:
raise apistar.exceptions.ClientError(messages=exc.messages()) from None # NOQA: E501
method = link.method
url = self.get_url(link, params)
query_params = self.get_query_params(link, params)
content, encoding = self.get_content_and_encoding(link, params)
return await self.transport.send(
method, url, query_params=query_params, content=content,
encoding=encoding
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment