Skip to content

Instantly share code, notes, and snippets.

@njsmith
Last active May 13, 2023 18:59
Show Gist options
  • Select an option

  • Save njsmith/3494ebd27f32c2ac168c71de116ccc5d to your computer and use it in GitHub Desktop.

Select an option

Save njsmith/3494ebd27f32c2ac168c71de116ccc5d to your computer and use it in GitHub Desktop.
# Rough draft of a Queue object that can be used simultaneously simultaneously from
# sync threads + *multiple* trio threads, all at once.
#
# If you don't have multiple threads each doing their own separate calls to trio.run,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
#
# Also note: completely untested, probably has bugs
from collections import OrderedDict
from functools import partial
import outcome
import threading
import trio
class CrossThreadUnbufferedFIFOQueue:
def __init__(self):
self._lock = attr.ib(factory=threading.Lock)
# Used as FIFO queues; value is always None
self._putters = OrderedDict()
self._getters = OrderedDict()
def sync_put(self, value):
with self._lock:
if self._getters:
getter, _ = self._getters.popitem(last=False)
getter(value)
return
# Blocking path
waker = threading.Lock()
waker.acquire()
def putter():
waker.release()
return value
self._putters[putter] = None
waker.acquire()
def sync_get(self):
with self._lock:
if self._putters:
putter, _ = self._putters.popitem(last=False)
return putter()
# Blocking path
waker = threading.Lock()
waker.acquire()
value_shared = None
def getter(value):
nonlocal value_shared
value_shared = value
waker.release()
self._getters[getter] = None
waker.acquire()
return value_shared
async def async_put(self, value):
with self._lock:
if self._getters:
getter, _ = self._getters.popitem(last=False)
getter(value)
return
# Blocking path
token = trio.lowlevel.current_token()
task = trio.lowlevel.current_task()
def putter():
if trio.lowlevel.current_token() is token:
trio.lowlevel.reschedule(task)
return value
else:
trio.from_thread.run_sync(partial(
trio.lowlevel.reschedule, task, trio_token=token
))
return value
self._putters[putter] = None
def abort_fn(_):
with self._lock:
if putter in self._putters:
del self._putters
return trio.lowlevel.Abort.SUCCEEDED
else:
return trio.lowlevel.Abort.FAILED
await trio.wait_task_rescheduled(abort_fn)
async def async_get(self):
with self._lock:
if self._putters:
putter, _ = self._putters.popitem(last=False)
return putter()
# Blocking path
token = trio.lowlevel.current_token()
task = trio.lowlevel.current_task()
def getter(value):
if trio.lowlevel.current_token() is token:
trio.lowlevel.reschedule(task, outcome.Value(value))
else:
trio.from_thread.run_sync(partial(
trio.lowlevel.reschedule,
task,
outcome.Value(value),
trio_token=token
))
self._getters[getter] = None
def abort_fn(_):
with self._lock:
if getter in self._getters:
del self._getters
return trio.lowlevel.Abort.SUCCEEDED
else:
return trio.lowlevel.Abort.FAILED
return await trio.wait_task_rescheduled(abort_fn)
@richardsheridan
Copy link
Copy Markdown

@njsmith
Copy link
Copy Markdown
Author

njsmith commented Jan 24, 2021

Oh haha whoops, those first two classes are vestigial leftovers from an earlier draft, theyr'e not actually used at all :-) I'll delete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment