Skip to content

Instantly share code, notes, and snippets.

@parity3
Last active September 9, 2017 18:07
Show Gist options
  • Save parity3/9f7d41eced8367bcb3fd744c4773b83a to your computer and use it in GitHub Desktop.
Save parity3/9f7d41eced8367bcb3fd744c4773b83a to your computer and use it in GitHub Desktop.
The maybe-you-have-data-ready spin-check solves most problems obnoxiously and robustly with blatant disregard for CPU
import trio,itertools,functools
# noinspection PyProtectedMember
from trio._core._run import Nursery
count = itertools.count()
tasks = []
async def spin_check(nurs : Nursery, event : trio.Event,container : list, func : callable, reset : callable):
while True:
reset()
if container:
print("satisfied already")
return
try:
rsp = func() # will fail if nothing is ready for us
except BlockingIOError:
print("IO would block going back to sleep")
except trio.WouldBlock:
print("would block going back to sleep")
except trio.Cancelled:
print("cancelled!")
raise
else:
container.append(rsp)
print("cancelling other tasks")
nurs.cancel_scope.cancel()
return
await event.wait()
event.clear() # this will not prevent other check tasks from waking up in this cycle
async def socket_read_listener(sock : trio.socket.SocketType, event : trio.Event, reset_container : list):
# responsible for setting the event to wake up the spin_check
current_task=trio.current_task()
def remove_from_container(_):
try:
reset_container.remove(current_task)
except ValueError:
pass
return trio.hazmat.Abort.SUCCEEDED
try:
while True:
# noinspection PyProtectedMember
await trio.hazmat.wait_socket_readable(sock._sock)
event.set()
reset_container.append(current_task)
await trio.hazmat.yield_indefinitely(remove_from_container)
finally:
remove_from_container(None)
async def q_putall(q1,sout):
print("waking up a task")
await q1.put(next(count))
await sout.sendall(b"test")
def bind_event_to_queue(q : trio.Queue, event : trio.Event) -> trio.Queue:
# noinspection PyProtectedMember
_get_semaphore = q._get_semaphore
old_release = _get_semaphore.release
@functools.wraps(old_release)
def notify_getter():
old_release()
event.set()
_get_semaphore.release = notify_getter
return q
def bind_event_to_sock_read(nurs : Nursery,sin : trio.socket.SocketType, event : trio.Event) -> callable:
reset_container = []
nurs.spawn(socket_read_listener, sin, event, reset_container)
def reset_func():
reset_container and trio.hazmat.reschedule(reset_container[0])
return reset_func
async def main_async_q():
open_nursery = trio.open_nursery
results = []
event = trio.Event()
q1 = trio.Queue(10)
socket = trio.socket
sin,sout = socket.socketpair()
async with open_nursery() as nurs: # type: Nursery
nurs.spawn(spin_check, nurs, event, results, q1.get_nowait,lambda:None)
bind_event_to_queue(q1,event)
reset_func = bind_event_to_sock_read(nurs,sin,event)
# noinspection PyProtectedMember
nurs.spawn(spin_check, nurs, event, results, functools.partial(sin._sock.recv, 4096),reset_func)
nurs.spawn(q_putall,q1,sout)
print(f"results (should be 1 in length): {results}")
def main_sync():
run = trio.run
run(main_async_q)
if __name__ == '__main__':
main_sync()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment