Skip to content

Instantly share code, notes, and snippets.

@mrmamongo
Created February 26, 2023 14:02
Show Gist options
  • Save mrmamongo/d8631496a020f453bf4b1fe8b4e8f282 to your computer and use it in GitHub Desktop.
Save mrmamongo/d8631496a020f453bf4b1fe8b4e8f282 to your computer and use it in GitHub Desktop.
import asyncio
import enum
import logging
import nats
import nats.errors
from nats.aio.msg import Msg
from pydantic import BaseModel, ValidationError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class InitMsg(BaseModel):
id: int
name: str | None
async def init_user(id: int):
return
async def init_user_handler(msg: Msg):
try:
logger.info(f"init_user_handler: {msg}")
try:
init_msg = InitMsg.parse_raw(msg.data)
except ValidationError as e:
logger.error(e)
return
user = await init_user(init_msg.id)
nc = await nats.connect("localhost")
js = nc.jetstream()
await js.publish(
f"{user.id}", InitMsg(init_msg.id, init_msg.name).json().encode("utf-8")
)
except Exception as e:
logger.exception(e)
async def main():
logging.basicConfig(level=logging.INFO)
nc = await nats.connect("localhost")
js = nc.jetstream()
try:
await js.add_stream(name="init_stream", subjects=["init_server"])
except Exception as e:
logger.error(e)
js = nc.jetstream()
psub = await js.pull_subscribe(
subject="init_server", durable="server"
)
while True:
try:
msgs = await psub.fetch(10)
for msg in msgs:
await init_user_handler(msg)
except nats.errors.TimeoutError:
continue
except Exception as e:
logger.exception(e)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment