Skip to content

Instantly share code, notes, and snippets.

@matham
Created December 14, 2017 01:29
Show Gist options
  • Select an option

  • Save matham/9ff11412de769df28918de60f5edb38e to your computer and use it in GitHub Desktop.

Select an option

Save matham/9ff11412de769df28918de60f5edb38e to your computer and use it in GitHub Desktop.
timing of python trio external thread interactions
import threading
import queue as stdlib_queue
import trio
class Executor(object):
_thread = None
name = 'Executor'
_exec_queue = None
def __del__(self):
self.stop_executor(block=False)
def start_executor(self):
queue = self._exec_queue = stdlib_queue.Queue()
# daemon=True because it might get left behind if we cancel, and in
# this case shouldn't block process exit.
thread = self._thread = threading.Thread(
target=self.worker_thread_fn, name=self.name, daemon=True,
args=(queue, ))
thread.start()
def stop_executor(self, block=True):
if not self._thread:
return
if not self._exec_queue:
self._thread = None
return
self._exec_queue.put('eof', block=block)
if block:
self._thread.join()
self._thread = self._exec_queue = None
def report_back_in_trio_thread_fn(self, task_container):
# This function gets scheduled into the trio run loop to deliver the
# thread's result.
def do_release_then_return_result():
# if canceled, do the cancellation otherwise the result.
if task_container[1] is not None:
task_container[1]()
return task_container[2].unwrap()
result = trio.hazmat.Result.capture(do_release_then_return_result)
trio.hazmat.reschedule(task_container[0], result)
def worker_thread_fn(self, queue):
# This is the function that runs in the worker thread to do the actual
# work and then schedule the calls to report_back_in_trio_thread_fn
while True:
sync_fn, args, task_container, token = queue.get(block=True)
if sync_fn == 'eof':
return
if task_container[1] is None:
task_container[2] = trio.hazmat.Result.capture(sync_fn, *args)
try:
token.run_sync_soon(self.report_back_in_trio_thread_fn, task_container)
except trio.RunFinishedError:
# The entire run finished, so our particular tasks are certainly
# long gone - it must have cancelled. Continue eating the queue.
raise # pass
@trio.hazmat.enable_ki_protection
async def run_in_worker_thread(self, sync_fn, *args, cancellable=False):
await trio.hazmat.checkpoint_if_cancelled()
# Holds a reference to the task that's blocked in this function waiting
# for the result as well as to the cancel callback and the result
# (when not canceled).
if self._thread is None:
self.start_executor()
task_container = [trio.hazmat.current_task(), None, None]
self._exec_queue.put(
(sync_fn, args, task_container,
trio.hazmat.current_trio_token()))
def abort(raise_cancel):
if cancellable:
task_container[1] = raise_cancel
return trio.hazmat.Abort.FAILED
return await trio.hazmat.wait_task_rescheduled(abort)
if __name__ == '__main__':
import time
from timeit import timeit
import trio
from src.playground import Executor
executor = Executor()
def echo_queue_thread(queue, answer):
while True:
val = queue.get(block=True)
if val == 'eof':
return
answer.put(val)
def start_thread():
queue, answer = stdlib_queue.Queue(), stdlib_queue.Queue()
thread = threading.Thread(target=echo_queue_thread, args=(queue, answer))
thread.start()
return queue, answer
def get_value(value):
return value
async def run_sequence():
for i in range(1000):
await executor.run_in_worker_thread(get_value, i)
async def run_sequence_sleep():
for i in range(1000):
await trio.sleep(0)
get_value(i)
async def run_sequence_base():
for i in range(1000):
get_value(i)
async def run_sequence_queue(queue, answer):
for i in range(1000):
queue.put(i)
val = answer.get(block=True)
queue.put('eof')
async def run_sequence_time_sleep():
for i in range(1000):
time.sleep(0)
get_value(i)
async def run_sequence_trio():
for i in range(1000):
await trio.run_sync_in_worker_thread(get_value, i)
print(timeit(stmt='trio.run(run_sequence)', setup='from __main__ import executor, trio, run_sequence', number=10))
print(timeit(stmt='trio.run(run_sequence_trio)', setup='from __main__ import executor, trio, run_sequence_trio', number=10))
print(timeit(stmt='trio.run(run_sequence_sleep)', setup='from __main__ import executor, trio, run_sequence_sleep', number=10))
print(timeit(stmt='trio.run(run_sequence_base)', setup='from __main__ import executor, trio, run_sequence_base', number=10))
print(timeit(stmt='trio.run(run_sequence_time_sleep)', setup='from __main__ import executor, trio, run_sequence_time_sleep', number=10))
print(timeit(stmt='args = start_thread(); trio.run(run_sequence_queue, *args)', setup='from __main__ import executor, trio, run_sequence_queue, start_thread', number=10))
1.618648644484486
3.5385297625561436
0.9438036765220037
0.030416185514593685
0.031911098504258106
0.420887142365574
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment