Last active
May 6, 2023 23:38
-
-
Save Wind010/d9e37df5653f8bae257f68264f9facdd to your computer and use it in GitHub Desktop.
Mocking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import AsyncIterable, List | |
| import aiohttp | |
| import asyncio | |
| import platform | |
| import time | |
| import nest_asyncio | |
| #__import__('IPython').embed() | |
| class HttpClient: | |
| def __init__(self, base_uri: str): | |
| self.base_uri = base_uri | |
| # Resolve error: cannot run the event loop while another loop is running | |
| nest_asyncio.apply() | |
| if platform.system() == 'Windows': | |
| asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
| async def get_luke_skywalker_film_appearance(self) -> AsyncIterable[str]: | |
| films: List[str] = [] | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{self.base_uri}/people/1/") as response: | |
| response_dict = await response.json() | |
| films = response_dict['films'] | |
| print(films) | |
| return films | |
| async def get_luke_skywalker_film_appearance_eventloop_error(self) -> List[str]: | |
| films: List[str] = [] | |
| async def call(): | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{self.base_uri}/people/1/") as response: | |
| response_dict = await response.json() | |
| print(response_dict) | |
| return response_dict['films'] | |
| #asyncio.run(call()) | |
| #asyncio.get_event_loop().run_until_complete(call()) | |
| films = await call() | |
| # Workaround 'RuntimeError: Event loop is closed' on Windows. | |
| # https://github.com/encode/httpx/issues/914 | |
| if platform.system() == 'Windows': | |
| #time.sleep(0.0) | |
| asyncio.sleep(1) | |
| return films |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import datetime | |
| from typing import AsyncIterable, List | |
| from common.httpclient import HttpClient | |
| class Wrapper: | |
| def __init__(self, httpClient: HttpClient): | |
| self.httpclient = httpClient | |
| async def wrap_async(self) -> List[str]: | |
| start = datetime.now() | |
| res = await self.httpclient.get_luke_skywalker_film_appearance() | |
| print(f'Time: {datetime.now() - start}') | |
| return res | |
| async def wrap_async_async(self) -> List[str]: | |
| start = datetime.now() | |
| res = await self.httpclient.get_luke_skywalker_film_appearance() | |
| print(f'Time: {datetime.now() - start}') | |
| return [r async for r in res] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pytest | |
| from unittest.mock import MagicMock, Mock, AsyncMock | |
| from common.httpclient import HttpClient | |
| from monitor.wrapper import Wrapper | |
| from datetime import datetime, timezone | |
| class TestWrapper: | |
| BaseUri: str = 'https://swapi.dev/api/' | |
| @pytest.mark.asyncio | |
| async def test_wrap_async(self): | |
| """ Test where the dependency under test will return a list.""" | |
| # Arrange | |
| mock_httpclient = AsyncMock(name='HttpClient') | |
| # Create mock object with create_date property. | |
| create_date: datetime = datetime.utcnow().replace(tzinfo=timezone.utc) | |
| rtn_mock = MagicMock(create_date = create_date) | |
| rtn_mock.name = "TEST" | |
| mock_httpclient.get_luke_skywalker_film_appearance.return_value = [rtn_mock] | |
| wrapper = Wrapper(mock_httpclient) | |
| # Act | |
| films = await wrapper.wrap_async() | |
| # Assert | |
| assert films is not None | |
| mock_httpclient.get_luke_skywalker_film_appearance.assert_called_once() | |
| assert films[0].name == rtn_mock.name | |
| @pytest.mark.asyncio | |
| async def test_wrap_async_async(self): | |
| """ Test where the dependency under test will return a list from async for-loop list comprehension. """ | |
| # Arrange | |
| mock_httpclient = AsyncMock(name='HttpClient') | |
| # Create mock object with create_date property. | |
| create_date: datetime = datetime.utcnow().replace(tzinfo=timezone.utc) | |
| rtn_mock = MagicMock(create_date = create_date) | |
| rtn_mock.name = "TEST" | |
| mock_iterator = MagicMock() | |
| # Set the return value of what is returned from asynchronous iterator. | |
| mock_iterator.__aiter__.return_value = [rtn_mock] | |
| mock_httpclient.get_luke_skywalker_film_appearance.return_value = mock_iterator | |
| wrapper = Wrapper(mock_httpclient) | |
| # Act | |
| films = await wrapper.wrap_async_async() | |
| # Assert | |
| assert films is not None | |
| mock_httpclient.get_luke_skywalker_film_appearance.assert_called_once() | |
| assert films[0].name == rtn_mock.name | |
| @pytest.mark.asyncio | |
| @pytest.mark.e2e | |
| async def test_wrap_async_e2e_actual(self): | |
| """ Test that actually calls the API """ | |
| # Arrange | |
| httpClient = HttpClient(TestWrapper.BaseUri) | |
| wrapper = Wrapper(httpClient) | |
| # Act | |
| res = await wrapper.wrap_async() | |
| # Assert | |
| assert res is not None | |
| assert len(res) == 4 | |
| assert set([int(f.rstrip('/').split('/')[-1]) for f in res]) == set([1, 2, 3, 6]) | |
| @pytest.mark.asyncio | |
| @pytest.mark.e2e | |
| async def test_get_luke_skywalker_film_appearance_eventloop_error(self): | |
| """ This test would go into separate HttpClient test file. """ | |
| # Arrange | |
| httpClient = HttpClient(TestWrapper.BaseUri) | |
| # Act | |
| res = await httpClient.get_luke_skywalker_film_appearance_eventloop_error() | |
| # Assert | |
| assert res is not None | |
| assert len(res) == 4 | |
| assert set([int(f.rstrip('/').split('/')[-1]) for f in res]) == set([1, 2, 3, 6]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment