Created
December 12, 2024 06:36
-
-
Save vojtajina/f2ca29353449a93e8e618083065642f9 to your computer and use it in GitHub Desktop.
nats-ignoring-subject.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
needs_reconnect = True | |
# Keep trying to fetch new tasks forever... | |
while True: | |
if needs_reconnect: | |
try: | |
await connect_nats() | |
js = nc.jetstream() | |
psubs = [] | |
for sub in config.nats_js_queue_subjects: | |
l.info(f"Subscribing to NATS Jetstream {sub}") | |
psubs.append( | |
await js.pull_subscribe( | |
sub, | |
durable=config.nats_js_queue_consumer_name, | |
stream=config.nats_js_queue_stream_name, | |
) | |
) | |
needs_reconnect = False | |
except Exception as e: | |
l.error("Exception during connect/jetstream/pull_subscribe") | |
traceback.print_exception(e) | |
done, pending = await asyncio.wait( | |
[asyncio.create_task(psub.fetch(1, timeout=30)) for psub in psubs], | |
return_when=asyncio.FIRST_COMPLETED, | |
) | |
received_msgs = None | |
for fetch_task in done: | |
try: | |
msgs_ = await fetch_task | |
if received_msgs is None: | |
received_msgs = msgs_ | |
else: | |
l.info("Already got msg, nak") | |
for m in msgs_: | |
await m.nak() | |
except ConnectionClosedError: | |
needs_reconnect = True | |
except (TimeoutError, ErrTimeout): | |
pass | |
# Cancel any pending tasks | |
for fetch_task in pending: | |
fetch_task.cancel() | |
if received_msgs is None: | |
l.info("No msg received from any subjects, retrying...") | |
continue | |
for msg in received_msgs: | |
task = GPUWorkerTask() | |
task.ParseFromString(msg.data) | |
# process task | |
await msg.ack() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment