Last active
May 2, 2024 19:07
-
-
Save edpyt/8eaf8bb9e88bb6e9983f069e6ba57c93 to your computer and use it in GitHub Desktop.
Strange error. pytest + nats.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import AsyncGenerator, Generator | |
import nats | |
import pytest | |
import pytest_asyncio | |
from testcontainers.nats import NatsContainer | |
@pytest.fixture(name="nats_container", scope="session") | |
def create_nats_container() -> Generator[NatsContainer, None, None]: | |
with NatsContainer() as nats_container: | |
yield nats_container | |
@pytest_asyncio.fixture(name="nats_conn", scope="session") | |
async def connect_nats( | |
nats_container: NatsContainer, | |
) -> AsyncGenerator[nats.NATS, None]: | |
conn_url = nats_container.nats_uri() | |
nats_conn = await nats.connect(conn_url) | |
yield nats_conn | |
await nats_conn.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
nats-py==2.7.2 | |
testcontainers==4.4.0 | |
pytest==8.2.0 | |
pytest-asyncio==0.23.6 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from nats import NATS | |
import pytest | |
@pytest.mark.asyncio | |
async def test_simple(nats_conn: NATS): | |
sub = await nats_conn.subscribe("test") | |
await nats_conn.publish("test", b"hello world") | |
# Raise error | |
await sub.next_msg(timeout=5.0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment