Created
February 23, 2022 01:03
-
-
Save mikeckennedy/033ad92c165a9041fafc5c429e6c3c28 to your computer and use it in GitHub Desktop.
More realistic adaptation from https://gist.github.com/mikeckennedy/c76739766ce072f980aa4df1a6dc9516
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requires the uvloop package | |
import asyncio | |
import threading | |
import time | |
import uuid | |
from typing import Any, Coroutine | |
import uvloop | |
uvloop.install() | |
initialized = False | |
__add_lock = threading.Lock() | |
__receive_lock = threading.Lock() | |
pending_items: dict[uuid.uuid4, Coroutine[Any, Any, Any]] = {} | |
finished_items: dict[uuid.uuid4, Any] = {} | |
def run(async_coroutine: Coroutine[Any, Any, Any]): | |
""" | |
Convert an async method to a synchronous one. | |
Example: | |
async def some_async_method(x, y): ... | |
result = syncify.run( some_async_method(1, 2) ) | |
Args: | |
async_coroutine (): | |
Returns: The value returned by `async_coroutine` | |
""" | |
item_id = __add_work(async_coroutine) | |
while not __is_done(item_id): | |
time.sleep(0.0005) | |
continue | |
result = __get_result(item_id) | |
if isinstance(result, Exception): | |
raise SyncifyRuntimeError() from result | |
return result | |
class SyncifyRuntimeError(Exception): | |
pass | |
def worker_loop(): | |
print(f"Starting syncify background thread.") | |
loop: uvloop.Loop = uvloop.new_event_loop() | |
while True: | |
with __add_lock: | |
count = len(pending_items) | |
if count == 0: | |
time.sleep(0.001) | |
continue | |
try: | |
with __add_lock: | |
work: list[(uuid.uuid4, Coroutine[Any, Any, Any])] = list(pending_items.items()) | |
for k, w in work: | |
del pending_items[k] | |
running: dict[uuid.uuid4, asyncio.Task] = { | |
k: loop.create_task(w) | |
for k, w in work | |
} | |
for k, t in running.items(): | |
try: | |
loop.run_until_complete(asyncio.wait([t])) | |
result = t.result() | |
with __receive_lock: | |
finished_items[k] = result | |
except Exception as x: | |
with __receive_lock: | |
finished_items[k] = x | |
except Exception as x: | |
print("Error processing pending tasks:") | |
print(x) | |
def __add_work(async_coroutine: Coroutine[Any, Any, Any]) -> uuid.uuid4: | |
new_id = uuid.uuid4() | |
with __add_lock: | |
pending_items[new_id] = async_coroutine | |
return new_id | |
def __is_done(item_id: uuid.uuid4) -> bool: | |
with __receive_lock: | |
return item_id in finished_items | |
def __get_result(item_id: uuid.uuid4) -> Any: | |
with __receive_lock: | |
result = finished_items[item_id] | |
del finished_items[item_id] | |
return result | |
worker_thread = threading.Thread(name="syncify-thread", target=worker_loop, daemon=True) | |
worker_thread.start() |
Ah, that's even better and explains why that was a topic on a recent Python Bytes.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It is a weird mix isn't it. This is exactly what I was thinking. Rather than using some kind of set
active_threads
, I was planning on usingactive_thread
as a thread local withthreading.local
see this article.