Created
December 14, 2017 01:29
-
-
Save matham/9ff11412de769df28918de60f5edb38e to your computer and use it in GitHub Desktop.
timing of python trio external thread interactions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import threading | |
| import queue as stdlib_queue | |
| import trio | |
| class Executor(object): | |
| _thread = None | |
| name = 'Executor' | |
| _exec_queue = None | |
| def __del__(self): | |
| self.stop_executor(block=False) | |
| def start_executor(self): | |
| queue = self._exec_queue = stdlib_queue.Queue() | |
| # daemon=True because it might get left behind if we cancel, and in | |
| # this case shouldn't block process exit. | |
| thread = self._thread = threading.Thread( | |
| target=self.worker_thread_fn, name=self.name, daemon=True, | |
| args=(queue, )) | |
| thread.start() | |
| def stop_executor(self, block=True): | |
| if not self._thread: | |
| return | |
| if not self._exec_queue: | |
| self._thread = None | |
| return | |
| self._exec_queue.put('eof', block=block) | |
| if block: | |
| self._thread.join() | |
| self._thread = self._exec_queue = None | |
| def report_back_in_trio_thread_fn(self, task_container): | |
| # This function gets scheduled into the trio run loop to deliver the | |
| # thread's result. | |
| def do_release_then_return_result(): | |
| # if canceled, do the cancellation otherwise the result. | |
| if task_container[1] is not None: | |
| task_container[1]() | |
| return task_container[2].unwrap() | |
| result = trio.hazmat.Result.capture(do_release_then_return_result) | |
| trio.hazmat.reschedule(task_container[0], result) | |
| def worker_thread_fn(self, queue): | |
| # This is the function that runs in the worker thread to do the actual | |
| # work and then schedule the calls to report_back_in_trio_thread_fn | |
| while True: | |
| sync_fn, args, task_container, token = queue.get(block=True) | |
| if sync_fn == 'eof': | |
| return | |
| if task_container[1] is None: | |
| task_container[2] = trio.hazmat.Result.capture(sync_fn, *args) | |
| try: | |
| token.run_sync_soon(self.report_back_in_trio_thread_fn, task_container) | |
| except trio.RunFinishedError: | |
| # The entire run finished, so our particular tasks are certainly | |
| # long gone - it must have cancelled. Continue eating the queue. | |
| raise # pass | |
| @trio.hazmat.enable_ki_protection | |
| async def run_in_worker_thread(self, sync_fn, *args, cancellable=False): | |
| await trio.hazmat.checkpoint_if_cancelled() | |
| # Holds a reference to the task that's blocked in this function waiting | |
| # for the result as well as to the cancel callback and the result | |
| # (when not canceled). | |
| if self._thread is None: | |
| self.start_executor() | |
| task_container = [trio.hazmat.current_task(), None, None] | |
| self._exec_queue.put( | |
| (sync_fn, args, task_container, | |
| trio.hazmat.current_trio_token())) | |
| def abort(raise_cancel): | |
| if cancellable: | |
| task_container[1] = raise_cancel | |
| return trio.hazmat.Abort.FAILED | |
| return await trio.hazmat.wait_task_rescheduled(abort) | |
| if __name__ == '__main__': | |
| import time | |
| from timeit import timeit | |
| import trio | |
| from src.playground import Executor | |
| executor = Executor() | |
| def echo_queue_thread(queue, answer): | |
| while True: | |
| val = queue.get(block=True) | |
| if val == 'eof': | |
| return | |
| answer.put(val) | |
| def start_thread(): | |
| queue, answer = stdlib_queue.Queue(), stdlib_queue.Queue() | |
| thread = threading.Thread(target=echo_queue_thread, args=(queue, answer)) | |
| thread.start() | |
| return queue, answer | |
| def get_value(value): | |
| return value | |
| async def run_sequence(): | |
| for i in range(1000): | |
| await executor.run_in_worker_thread(get_value, i) | |
| async def run_sequence_sleep(): | |
| for i in range(1000): | |
| await trio.sleep(0) | |
| get_value(i) | |
| async def run_sequence_base(): | |
| for i in range(1000): | |
| get_value(i) | |
| async def run_sequence_queue(queue, answer): | |
| for i in range(1000): | |
| queue.put(i) | |
| val = answer.get(block=True) | |
| queue.put('eof') | |
| async def run_sequence_time_sleep(): | |
| for i in range(1000): | |
| time.sleep(0) | |
| get_value(i) | |
| async def run_sequence_trio(): | |
| for i in range(1000): | |
| await trio.run_sync_in_worker_thread(get_value, i) | |
| print(timeit(stmt='trio.run(run_sequence)', setup='from __main__ import executor, trio, run_sequence', number=10)) | |
| print(timeit(stmt='trio.run(run_sequence_trio)', setup='from __main__ import executor, trio, run_sequence_trio', number=10)) | |
| print(timeit(stmt='trio.run(run_sequence_sleep)', setup='from __main__ import executor, trio, run_sequence_sleep', number=10)) | |
| print(timeit(stmt='trio.run(run_sequence_base)', setup='from __main__ import executor, trio, run_sequence_base', number=10)) | |
| print(timeit(stmt='trio.run(run_sequence_time_sleep)', setup='from __main__ import executor, trio, run_sequence_time_sleep', number=10)) | |
| print(timeit(stmt='args = start_thread(); trio.run(run_sequence_queue, *args)', setup='from __main__ import executor, trio, run_sequence_queue, start_thread', number=10)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 1.618648644484486 | |
| 3.5385297625561436 | |
| 0.9438036765220037 | |
| 0.030416185514593685 | |
| 0.031911098504258106 | |
| 0.420887142365574 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment