Skip to content

Instantly share code, notes, and snippets.

@Apakottur
Last active May 3, 2025 12:52
Show Gist options
  • Save Apakottur/1ca70682665c1dcd58fce207a910e0cb to your computer and use it in GitHub Desktop.
Save Apakottur/1ca70682665c1dcd58fce207a910e0cb to your computer and use it in GitHub Desktop.
Moto + aiobotocore
from collections.abc import Awaitable, Callable, Iterator
from dataclasses import dataclass
from typing import Any, TypeVar
import aiobotocore
import aiobotocore.awsrequest
import aiobotocore.endpoint
import aiobotocore.httpchecksum
import aiobotocore.response
import botocore.awsrequest
import botocore.httpchecksum
T = TypeVar("T")
R = TypeVar("R")
@dataclass
class _PatchedAWSResponseContent:
"""Patched version of `botocore.awsrequest.AWSResponse.content`"""
content: bytes | Awaitable[bytes]
def __await__(self) -> Iterator[bytes]:
async def _generate_async() -> bytes:
if isinstance(self.content, Awaitable):
return await self.content
else:
return self.content
return _generate_async().__await__()
def decode(self, encoding: str) -> str:
assert isinstance(self.content, bytes)
return self.content.decode(encoding)
class PatchedAWSResponse:
"""Patched version of `botocore.awsrequest.AWSResponse`"""
def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
self._response = response
self.status_code = response.status_code
self.content = _PatchedAWSResponseContent(response.content)
self.raw = response.raw
self.headers = response.headers
if not hasattr(self.raw, "raw_headers"):
self.raw.raw_headers = {}
def _factory(
original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
"""Factory for patching `aiobotocore.endpoint.convert_to_response_dict`"""
async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
return await original(PatchedAWSResponse(http_response), operation_model) # type: ignore[arg-type]
return patched_convert_to_response_dict
aiobotocore.endpoint.convert_to_response_dict = _factory(aiobotocore.endpoint.convert_to_response_dict) # type: ignore[assignment, arg-type]
async def _patched_read(self: aiobotocore.response.StreamingBody, _amt: Any = None) -> Any:
"""Patched version of `aiobotocore.response.StreamingBody.read`"""
return self.__wrapped__.read()
aiobotocore.response.StreamingBody.read = _patched_read # type: ignore[assignment]
# Remove async versions of functions which are not compatible with Moto.
del aiobotocore.httpchecksum.AioAwsChunkedWrapper._make_chunk # noqa: SLF001
del aiobotocore.httpchecksum.AioAwsChunkedWrapper.read
moto (5.1.1)
aiobotocore (2.21.0)
mypy (1.15.0)
pyright (1.1.396)
ruff (0.9.9)
@Apakottur
Copy link
Author

Been posting in this thread for a few years now - aio-libs/aiobotocore#755, decided to put my most up-to-date code in a gist.

Works with these packages:

moto (5.1.0)
aiobotocore (2.20.0)
mypy (1.15.0)
pyright (1.1.394)
ruff (0.9.7)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment