Last active
December 8, 2023 01:18
-
-
Save tekumara/298dbf392f5db259dce32fe4aca65b66 to your computer and use it in GitHub Desktop.
fixture for using aiobotocore with moto see https://github.com/aio-libs/aiobotocore/issues/755#issuecomment-1242592893
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ruff: noqa: SLF001 ARG001 | |
from typing import Callable | |
from unittest.mock import MagicMock | |
import aiobotocore.awsrequest | |
import aiobotocore.endpoint | |
import aiohttp | |
import aiohttp.client_reqrep | |
import aiohttp.typedefs | |
import botocore.awsrequest | |
import botocore.model | |
import pytest | |
""" | |
Patch aiobotocore to work with moto | |
See https://github.com/aio-libs/aiobotocore/issues/755 | |
""" | |
class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse): | |
def __init__(self, response: botocore.awsrequest.AWSResponse): | |
self._moto_response = response | |
self.status_code = response.status_code | |
self.raw = MockHttpClientResponse(response) | |
# adapt async methods to use moto's response | |
async def _content_prop(self) -> bytes: | |
return self._moto_response.content | |
async def _text_prop(self) -> str: | |
return self._moto_response.text | |
class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse): | |
def __init__(self, response: botocore.awsrequest.AWSResponse): | |
async def read(n: int = -1) -> bytes: | |
# streaming/range requests. used by s3fs | |
if not self.content._eof: | |
self.content._eof = True | |
return response.content | |
return b"" | |
self.content = MagicMock(aiohttp.StreamReader) | |
self.content.read = read | |
self.content._eof = False | |
@property | |
def raw_headers(self) -> aiohttp.typedefs.RawHeaders: | |
return () | |
@pytest.fixture(scope="session") | |
def _patch_aiobotocore() -> None: | |
def factory(original: Callable) -> Callable: | |
def patched_convert_to_response_dict( | |
http_response: botocore.awsrequest.AWSResponse, operation_model: botocore.model.OperationModel | |
): | |
return original(MockAWSResponse(http_response), operation_model) | |
return patched_convert_to_response_dict | |
aiobotocore.endpoint.convert_to_response_dict = factory(aiobotocore.endpoint.convert_to_response_dict) |
Updated to work with
aiobotocore==2.5.4
moto==4.1.14
s3fs==2023.6.0
@tekumara I am running into an issue utilizing this patch with s3fs.copy due to an issue in
aiobotocore.handlers._looks_like_special_case_error. Any idea how to add this to the patch and resolve that?
@k-scherer I've not hit that, could you share a stack trace?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Tested with aiobotocore 2.3.4, moto 3.1.18, and s3fs 2022.7.1.