Skip to content

Instantly share code, notes, and snippets.

@vitawasalreadytaken
Created October 16, 2024 11:09
Show Gist options
  • Save vitawasalreadytaken/6d4c0ac7c79e74c83f726db0abbb889a to your computer and use it in GitHub Desktop.
Save vitawasalreadytaken/6d4c0ac7c79e74c83f726db0abbb889a to your computer and use it in GitHub Desktop.
import _interpreters as interpreters
# This will be hopefully replaced soon with 'pip install interpreters_pep_734' and 'from interpreters import queues'
# once https://github.com/ericsnowcurrently/interpreters/pull/15 is merged
from test.support.interpreters import queues
import time
import threading
def receiver() -> None:
from test.support.interpreters import queues
import threading
import time
import statistics
try:
latencies_ns = []
tid = threading.get_ident()
# The queue_id variable is injected (shared) by the calling interpreter!
print(tid, "Receiver starting, queue_id:", queue_id)
queue = queues.Queue(queue_id)
while True:
message, t = queue.get()
if message is None:
break
delta_ns = time.perf_counter_ns() - t
latencies_ns.append(delta_ns)
except Exception as exc:
print(tid, "Receiver failed:", exc)
finally:
print(tid, "Receiver exiting")
if latencies_ns:
print(tid, "Latency stats [µs]:", [round(q / 1000) for q in statistics.quantiles(latencies_ns)])
def run_interpreter(queue: queues.Queue) -> None:
subinterpreter = interpreters.create()
interpreters.exec(subinterpreter, receiver, shared={"queue_id": queue.id})
if __name__ == "__main__":
tid = threading.get_ident()
queue = queues.create(syncobj=True)
subinterpreter_thread = threading.Thread(target=run_interpreter, args=(queue,))
subinterpreter_thread.start()
time.sleep(1)
for i in range(10_000_000):
queue.put((i, time.perf_counter_ns()))
queue.put((None, 0))
subinterpreter_thread.join()
print(tid, "Main thread exiting")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment