Skip to content

Instantly share code, notes, and snippets.

@edpyt
Last active May 2, 2024 19:07
Show Gist options
  • Save edpyt/8eaf8bb9e88bb6e9983f069e6ba57c93 to your computer and use it in GitHub Desktop.
Save edpyt/8eaf8bb9e88bb6e9983f069e6ba57c93 to your computer and use it in GitHub Desktop.
Strange error. pytest + nats.py
from typing import AsyncGenerator, Generator
import nats
import pytest
import pytest_asyncio
from testcontainers.nats import NatsContainer
@pytest.fixture(name="nats_container", scope="session")
def create_nats_container() -> Generator[NatsContainer, None, None]:
with NatsContainer() as nats_container:
yield nats_container
@pytest_asyncio.fixture(name="nats_conn", scope="session")
async def connect_nats(
nats_container: NatsContainer,
) -> AsyncGenerator[nats.NATS, None]:
conn_url = nats_container.nats_uri()
nats_conn = await nats.connect(conn_url)
yield nats_conn
await nats_conn.close()
nats-py==2.7.2
testcontainers==4.4.0
pytest==8.2.0
pytest-asyncio==0.23.6
from nats import NATS
import pytest
@pytest.mark.asyncio
async def test_simple(nats_conn: NATS):
sub = await nats_conn.subscribe("test")
await nats_conn.publish("test", b"hello world")
# Raise error
await sub.next_msg(timeout=5.0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment