Created
February 26, 2023 14:02
-
-
Save mrmamongo/d8631496a020f453bf4b1fe8b4e8f282 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import enum | |
import logging | |
import nats | |
import nats.errors | |
from nats.aio.msg import Msg | |
from pydantic import BaseModel, ValidationError | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class InitMsg(BaseModel): | |
id: int | |
name: str | None | |
async def init_user(id: int): | |
return | |
async def init_user_handler(msg: Msg): | |
try: | |
logger.info(f"init_user_handler: {msg}") | |
try: | |
init_msg = InitMsg.parse_raw(msg.data) | |
except ValidationError as e: | |
logger.error(e) | |
return | |
user = await init_user(init_msg.id) | |
nc = await nats.connect("localhost") | |
js = nc.jetstream() | |
await js.publish( | |
f"{user.id}", InitMsg(init_msg.id, init_msg.name).json().encode("utf-8") | |
) | |
except Exception as e: | |
logger.exception(e) | |
async def main(): | |
logging.basicConfig(level=logging.INFO) | |
nc = await nats.connect("localhost") | |
js = nc.jetstream() | |
try: | |
await js.add_stream(name="init_stream", subjects=["init_server"]) | |
except Exception as e: | |
logger.error(e) | |
js = nc.jetstream() | |
psub = await js.pull_subscribe( | |
subject="init_server", durable="server" | |
) | |
while True: | |
try: | |
msgs = await psub.fetch(10) | |
for msg in msgs: | |
await init_user_handler(msg) | |
except nats.errors.TimeoutError: | |
continue | |
except Exception as e: | |
logger.exception(e) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment