Skip to content

Instantly share code, notes, and snippets.

@vojtajina
Created December 12, 2024 06:36
Show Gist options
  • Save vojtajina/f2ca29353449a93e8e618083065642f9 to your computer and use it in GitHub Desktop.
Save vojtajina/f2ca29353449a93e8e618083065642f9 to your computer and use it in GitHub Desktop.
nats-ignoring-subject.py
needs_reconnect = True
# Keep trying to fetch new tasks forever...
while True:
if needs_reconnect:
try:
await connect_nats()
js = nc.jetstream()
psubs = []
for sub in config.nats_js_queue_subjects:
l.info(f"Subscribing to NATS Jetstream {sub}")
psubs.append(
await js.pull_subscribe(
sub,
durable=config.nats_js_queue_consumer_name,
stream=config.nats_js_queue_stream_name,
)
)
needs_reconnect = False
except Exception as e:
l.error("Exception during connect/jetstream/pull_subscribe")
traceback.print_exception(e)
done, pending = await asyncio.wait(
[asyncio.create_task(psub.fetch(1, timeout=30)) for psub in psubs],
return_when=asyncio.FIRST_COMPLETED,
)
received_msgs = None
for fetch_task in done:
try:
msgs_ = await fetch_task
if received_msgs is None:
received_msgs = msgs_
else:
l.info("Already got msg, nak")
for m in msgs_:
await m.nak()
except ConnectionClosedError:
needs_reconnect = True
except (TimeoutError, ErrTimeout):
pass
# Cancel any pending tasks
for fetch_task in pending:
fetch_task.cancel()
if received_msgs is None:
l.info("No msg received from any subjects, retrying...")
continue
for msg in received_msgs:
task = GPUWorkerTask()
task.ParseFromString(msg.data)
# process task
await msg.ack()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment