Skip to content

Instantly share code, notes, and snippets.

@belm0
Last active September 20, 2023 07:42
Show Gist options
  • Save belm0/20bc069ca676fcfc591f29930069f788 to your computer and use it in GitHub Desktop.
Save belm0/20bc069ca676fcfc591f29930069f788 to your computer and use it in GitHub Desktop.
Structured concurrency and Lua (part 3)

Structured concurrency and Lua (part 3)

John Belmonte, 2022-Oct

In the previous installment, we demonstrated how structured concurrency enables magical tracebacks, spanning both stack frames and task hierarchy. Not only do we regain the ability to diagnose errors, but since exceptions propagate through the nursery of each task, we also have Lua's full protected-call facility at our disposal—just as if the code was not using coroutines.

Nurseries are powerful because they let us orchestrate a set of tasks, ensuring that their lifetime is well-understood, and allowing every task to properly finalize on cancellation of the group. Cancellation can happen if one of the child tasks has an exception, or is otherwise explicitly requested. Explicit cancellation turns out to be fairly common—done just as often as exiting imperative code loops.

Here, we'll try to gain a better understanding of how cancellation happens, and introduce the relevant API.

fixing task cancellation

The previous installment concluded by uncovering a few holes our concurrency library regarding cancellation. Unexpectedly, a nursery's child task ran to completion, despite its sibling raising an error. The task's lifetime even leaked beyond the nursery's scope. Let's run the same example after some library corrections:

trio = require('trio')

function await_cancellation_example()
    -- among two parallel tasks, one fails
    pcall(function()
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            print('child 1 start')
            trio.await_sleep(1)
            error('oops')
        end)
        nursery.start_soon(function()
            print('child 2 start')
            trio.await_sleep(2)
            print('child 2 end')
        end)
        print('waiting for child tasks')
    end)
    print('done')
end

trio.run(await_cancellation_example)
$ time lua example_6.lua
waiting for child tasks
child 2 start
child 1 start
done

real    0m1.093s
user    0m0.031s
sys     0m0.050s

Now, as expected, the program exits after one second, prompted by the task raising an error. Its sibling is cancelled. Though the error is raised up through the nursery, it's finally suppressed by the wrapping pcall.

We'll focus on the cancelled task. How do things look from its point of view?

the Cancelled exception

In the concurrency framework's design, task cancellation is signaled by an exception, called Cancelled. Exceptions are well-suited for this job because:

  • we've already worked hard to propagate exceptions up the task hierarchy
  • in the simple case, a task doesn't need to do anything special to support being cancelled
  • cancelled tasks can perform finalization, utilizing all of Lua's associated features

Here, we instrument "child 2" of the previous example, to see what's going on:

function await_cancellation_example()
    pcall(function()
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            print('child 1 start')
            trio.await_sleep(1)
            error('oops')
        end)
        nursery.start_soon(function()
            print('child 2 start')
            local status, err = pcall(function() trio.await_sleep(2) end)
            assert(not status)
            print('child 2 did not finish:', err)
            error(err)  -- important: re-raise
        end)
        print('waiting for child tasks')
    end)
    print('done')
end
$ lua example_7.lua
waiting for child tasks
child 2 start
child 1 start
child 2 did not finish:	Cancelled
done

This confirms "child 2" received the Cancelled exception. Upon receiving an error from one of its children, the nursery enters a cancelled state, and arranges for Cancelled to be injected into the remaining active children. The coroutine of each such child is soon resumed, encounters the injected exception, and has its stack unwound. The final destination of any Cancelled exception is the nursery that triggered it. The nursery treats this as a normal event, and doesn't raise the exception to its parent.

As shown in the example, if your code happens to be catching errors, it's required to always re-raise Cancelled. In Lua, this kind of operation is prone to mistakes, so care is needed. (Hint: make utilities like finally(f), on_error(f), or on_success(f) that use to-be-closed variables to call the given finalizer, automatically propagating errors.)

nursery cancel()

Nurseries represent a set of spawned tasks that can be cancelled as a unit. You may be expecting that tasks would explicitly cancel their parent nursery by raising Cancelled, but actually, this is not allowed. Rather, the nursery object has an idempotent cancel() method for this purpose. The rationale is as follows:

  • it's useful for actors outside of a nursery to cancel it
  • if the body of a nursery itself raised Cancelled, it would cancel not only that nursery's children, but any parent nursery as well (of which it is a child)
  • nursery.cancel() allows an entire tree of the task hierarchy to be queued for cancellation atomically. Otherwise, propagation of the cancellation may take several scheduler passes, with some children continuing for a brief time, even though an ancestor is cancelled. (NOTE: not yet implemented)

So given cancel(), let's implement await_any(), complementing the await_all() utility introduced in part 1 of this series:

function await_any(...)
    local nursery <close> = trio.open_nursery()
    for _, await_f in ipairs({...}) do
        nursery.start_soon(function()
            await_f()
            nursery.cancel()
        end)
    end
end

Like await_all(), this function is intended for a set of heterogeneous tasks, run for their side effects. It has short-circuit behavior, returning as soon as one of the given awaitables returns. This is implemented by a child task calling cancel() on the nursery once its corresponding awaitable completes.

Synopsis:

event = trio.Event()
-- (then hand the event to some other task ...)

await_any(
    event.await,
    await_serve_foo,
)

Note that cancel() is a synchronous function, only queueing a request to cancel the nursery. It won't propagate until the code calling cancel() yields execution. Even then, there is no guarantee that each nursery task will receive a Cancelled exception. For example, some tasks may have already exited normally (or by error) in the same scheduler pass as the task requesting cancel.

exceptions can be concurrent

On the subject of exceptions, there's an elephant in the room: if you have concurrency, then errors can be concurrent, and it's feasible to have multiple, unrelated exceptions in flight.

There are two ways this might happen. The first is where the exceptions are truly simultaneous, happening within the same scheduler pass. Since tasks within a pass are resumed in arbitrary order, it follows that the exception initially encountered will also be arbitrary. The second way is related to allowing tasks to finalize when there is an error or cancellation. The nursery will wait for all children to finalize, and may be presented with a set of exceptions: the error that induced cancellation, Cancelled from the corresponding sibling tasks, and other errors that may arise during finalization of each task.

How serious is this? It depends on the application. If errors tend to go unhandled, then it probably doesn't matter which one surfaced. In contrast, if a program includes the catching of specific errors, then such handling may be significant for maintaining invariants, and some errors may be more important than others. In such applications, choosing the winning exception arbitrarily may cause problems. Python has acknowledged concurrent exceptions as worthy of investment, recently introducing ExceptionGroup and concurrency-aware catch syntax.

As for our Lua concurrency library, we'll keep the implementation simple, with a model of "propagate the first exception". (Internally, Cancelled will be given lower priority than other exceptions.) But this is an area to keep in mind for improvement.

implementation

Various changes to the implementation were needed to fill out task cancellation support. There are several cases to get right: an error or cancel() call can arrive during the nursery's body, or after it's already blocked in the finalizer.

Use of the Cancelled exception exposed correctness issues in the scheduler. Rather than keeping only a list of active tasks sorted by wake time, it's also necessary to have a collection of tasks that are pending error injection, so that the errors can be promptly delivered on the next scheduler pass.

Proper unwinding of coroutines ending in error is now performed, via coroutine.close(), so that to-be-closed variables within tasks will be finalized. I learned that debug.traceback() must be called before such unwinding. Unfortunately, this traceback overhead is necessary even if the error is ultimately caught and suppressed.

See code changes relative to the previous article installment.

up next: timeouts and cancel scopes

Managing timeouts is something that is surprisingly hard in a program. It requires forethought by the author of every API you use. Even when timeout control is available, composing timeouts correctly across a sequence of blocking calls presents a challenge. There's a way to solve this generally, without explicitly plumbing timeout and cancel support down the call stack, or intricate bookkeeping. And while we're at it, it would be nice to cancel such operations on any condition, not only elapsed time. For this we'll introduce cancel scopes—only possible thanks to structured concurrency—in the next installment of this series.


article © 2022 John Belmonte, all rights reserved

trio = require('trio')
function await_cancellation_example()
-- among two parallel tasks, one fails
pcall(function()
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
print('child 1 start')
trio.await_sleep(1)
error('oops')
end)
nursery.start_soon(function()
print('child 2 start')
trio.await_sleep(2)
print('child 2 end')
end)
print('waiting for child tasks')
end)
print('done')
end
trio.run(await_cancellation_example)
trio = require('trio')
function await_cancellation_example()
pcall(function()
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
print('child 1 start')
trio.await_sleep(1)
error('oops')
end)
nursery.start_soon(function()
print('child 2 start')
local status, err = pcall(function() trio.await_sleep(2) end)
assert(not status)
print('child 2 did not finish', err)
error(err) -- important: re-raise
end)
print('waiting for child tasks')
end)
print('done')
end
trio.run(await_cancellation_example)
-- trio - toy structured concurrency implementation based on Python Trio
--
-- requirements:
-- * Lua >= 5.4.3 (with "yield from __close" support)
-- * fractional _time() and _sleep() - see below for placeholder Unix
-- implementations using popen().
--
-- API naming conventions:
-- * functions that may yield are prefixed with "await_"
-- * functions that require storing the result in a to-be-closed variable
-- are prefixed with "open_"
--
-- TODO: cancel scopes, shielding
-- TODO: exception groups
-- TODO: consider service nursery semantics (see https://tricycle.readthedocs.io/en/latest/reference.html#cancellation-helpers)
-- TODO: autojump for testing (see https://trio.readthedocs.io/en/stable/reference-testing.html#trio.testing.MockClock.autojump_threshold)
-- TODO: task and run context variables (see trio.lowlevel.RunVar, etc.)
-- TODO: lua_trio compatible networking, file I/O, etc. (See cosock for ideas
-- on adapting luasocket.)
local function _sleep(seconds)
os.execute('sleep ' .. tonumber(seconds))
end
-- returns fractional time in seconds from an arbitrary reference point
-- (This implementation requires `date` from GNU coreutils.)
local function _time()
local f = assert(io.popen('date +%s%3N', 'r'))
local s = assert(f:read('*a'))
f:close()
return tonumber(s) / 1000
end
local function _debug_print(...)
if false then
print(...)
end
end
-- Generate aggregate task traceback, given the error object and
-- ascending coroutine traceback list.
-- NOTE: Filtering of internal frames is best-effort, and expects the trio module
-- chunk name to include the substring "trio.lua".
local function _task_traceback(error_obj, error_chain)
local traceback = {tostring(error_obj), 'task traceback:'}
for i, coro_traceback in ipairs(error_chain) do
-- remove header
local index = coro_traceback:find('[\n\r]')
if not index then
goto continue
end
coro_traceback = coro_traceback:sub(index + 1)
-- First task traceback is kept as is, the remaining have
-- internal frames removed.
local elide = i > 1
local internal_frame_observed = false
for line in coro_traceback:gmatch("[^\r\n]+") do
if elide then
local last_internal_frame_observed = internal_frame_observed
-- TODO: fix fragile matching
internal_frame_observed = string.find(line, 'trio%.lua') ~= nil
if last_internal_frame_observed and not internal_frame_observed then
elide = false
end
end
if not elide then
table.insert(traceback, line)
end
end
::continue::
end
return table.concat(traceback, '\n')
end
local _TASK_PAUSE = -1
local _NO_ERROR = setmetatable({}, {__tostring=function() return 'NoError' end})
-- TODO: make Cancelled public
local _CANCELLED = setmetatable({}, {__tostring=function() return 'Cancelled' end})
local trio = nil
trio = {
-- TODO: Eliminate this module-level state, it should be local to a run()
-- invocation. Currently, run() is not reentrant.
_tasks = {}, -- list sorted by wait_until, nearest time last
_paused_tasks = {}, -- set
_pending_error_tasks = {}, -- list
_tasks_by_coro = {}, -- map
-- TODO: optional task name
_new_task = function(f, parent_nursery, wait_until)
local task = {
coro=coroutine.create(f),
parent_nursery=parent_nursery,
--child_nursuries={},
-- TODO: the following fields should be private
wait_until=wait_until,
pending_error=_NO_ERROR,
error_chain = nil, -- list of coroutine tracebacks (ascending level), else nil
_done = trio.Event()
}
_debug_print('run: spawn', task.coro)
trio._tasks_by_coro[task.coro] = task
return task
end,
_schedule_task = function(task)
-- TODO: find insertion point by binary search
local tasks = trio._tasks
for i = #tasks, 1, -1 do
if tasks[i].wait_until >= task.wait_until then
table.insert(tasks, i+1, task)
return
end
end
table.insert(tasks, 1, task)
end,
-- TODO: support abort
await_task_rescheduled = function()
-- TODO: encapsulate yield handling
if trio.current_task().pending_error ~= _NO_ERROR then
error(_CANCELLED, 0)
end
local pending_error = coroutine.yield(_TASK_PAUSE)
if pending_error ~= _NO_ERROR then
-- NOTE: Disable prepending location to error strings, since
-- that already happened when the original error was raised.
error(pending_error, 0)
end
end,
reschedule = function(task)
assert(trio._paused_tasks[task])
trio._paused_tasks[task] = nil
task.wait_until = 0
trio._schedule_task(task)
end,
Event = function()
local _is_set = false
local _waiting_tasks = {}
return {
set = function()
if not _is_set then
_is_set = true
-- reschedule tasks
for _, task in ipairs(_waiting_tasks) do
trio.reschedule(task)
end
end
end,
is_set = function()
return _is_set
end,
await = function()
if not _is_set then
table.insert(_waiting_tasks, trio.current_task())
trio.await_task_rescheduled()
end
end
}
end,
current_time = _time,
current_task = function()
return trio._tasks_by_coro[coroutine.running()]
end,
await_until_time = function(until_time)
if trio.current_task().pending_error ~= _NO_ERROR then
error(_CANCELLED, 0)
end
local pending_error = coroutine.yield(until_time)
if pending_error ~= _NO_ERROR then
error(pending_error, 0)
end
end,
-- TODO: consider renaming to await_seconds
await_sleep = function(seconds)
assert(seconds >= 0)
trio.await_until_time(trio.current_time() + seconds)
end,
await_forever = function()
trio.await_until_time(math.huge)
end,
run = function(f)
do
local main_task_nursery <close> = trio.open_nursery()
-- TODO: use the current coroutine rather than creating a new one?
main_task_nursery.task = trio._new_task(f, main_task_nursery, 0)
trio._schedule_task(main_task_nursery.task)
local tasks = trio._tasks
while #tasks + #trio._pending_error_tasks > 0 do
-- sleep until next event, unless there are tasks with pending errors
if #trio._pending_error_tasks == 0 then
local wait_delta = tasks[#tasks].wait_until - trio.current_time()
if wait_delta > 0 then
_debug_print('run: sleep until next event', wait_delta)
_sleep(wait_delta)
end
end
-- collect batch of tasks which are ready to run or have pending error
local current_time = trio.current_time()
local tasks_to_run = trio._pending_error_tasks
trio._pending_error_tasks = {}
while #tasks > 0 and tasks[#tasks].wait_until - current_time <= 0 do
local task = table.remove(tasks)
-- NOTE: filtering pending_error tasks, since these were already added
if task.pending_error == _NO_ERROR then
table.insert(tasks_to_run, task)
end
end
-- run the ready tasks
-- TODO: shuffle execution to avoid reliance on implementation details
for _, task in ipairs(tasks_to_run) do
local coro = task.coro
_debug_print('run: resuming', coro)
local resume_status, resume_result = coroutine.resume(coro, task.pending_error)
if not resume_status and resume_result == _CANCELLED then
coroutine.close(coro)
_debug_print('run: `cancelled` from', coro)
elseif not resume_status then
error_obj = resume_result
-- must grab the traceback before unwinding stack
-- (even though the traceback may never be needed...)
local traceback = debug.traceback(coro)
-- ensure to-be-closed finalization
-- TODO: this may raise a new error?
coroutine.close(coro)
_debug_print('run: error from', coro, error_obj)
-- TODO: Always raise multi-error object. __tostring will assemble the
-- aggregate traceback, if it's called.
local error_chain = {table.unpack(task.error_chain or {})}
table.insert(error_chain, traceback)
if task == main_task_nursery.task then
-- raise unhandled error to caller of event loop
-- TODO: still have to wait for finalization...?
error(_task_traceback(error_obj, error_chain), 0)
elseif not task.parent_nursery._cancel_requested then
-- TODO: If there is an existing Cancelled error, and the new
-- error is not Cancelled, give priority to the new error.
task.parent_nursery.cancel()
local parent_task = task.parent_nursery.parent_task
parent_task.pending_error = error_obj
parent_task.error_chain = error_chain
table.insert(trio._pending_error_tasks, parent_task)
end
end
if coroutine.status(coro) == 'dead' then
_debug_print('run: retire', coro)
-- TODO: If the task was woken by pending error, it will linger in
-- the `tasks` list even after being retired. This is sloppy,
-- so consider removing it, at cost of O(log(N)).
task.parent_nursery.child_tasks[task] = nil
trio._tasks_by_coro[coro] = nil
task._done.set()
-- remove task from pending error list
-- (case where task exited normally before pending error injected)
-- TODO: make _pending_error_tasks a set, to avoid O(N)
for i, err_task in ipairs(trio._pending_error_tasks) do
if err_task == task then
table.remove(trio._pending_error_tasks, i)
break
end
end
elseif resume_result == _TASK_PAUSE then
_debug_print('run: pausing', coro)
trio._paused_tasks[task] = true
else
task.wait_until = resume_result
local dt = task.wait_until - trio.current_time()
_debug_print('run:', coro, 'scheduled sleep for', dt)
trio._schedule_task(task)
end
end
-- remove any tasks at the head that ended by error
while #tasks > 0 and tasks[#tasks]._done.is_set() do
table.remove(tasks)
end
end
end
end,
open_nursery = function()
-- TODO: needs to attach to active event loop somehow. Global map
-- of Lua thread to event loop?
local this
this = setmetatable({
parent_task = trio.current_task(),
child_tasks = {}, -- set
_cancel_requested = false,
-- TODO: consider accepting function args
start_soon = function(f)
local task = trio._new_task(f, this, 0)
trio._schedule_task(task)
this.child_tasks[task] = true
end,
-- NOTE: Nursery cancel() is synchronous, and will not propagate until
-- the caller yields execution.
cancel = function()
if this._cancel_requested then
return
end
-- TODO: eagerly cancel all descendants, since otherwise some tasks
-- may continue running for several scheduler passes.
for task, _ in pairs(this.child_tasks) do
-- TODO: Once we have multi-error, add Cancelled even when there
-- is a pending error?
if task.pending_error == _NO_ERROR then
task.pending_error = _CANCELLED
table.insert(trio._pending_error_tasks, task)
end
end
this._cancel_requested = true
end
}, {
__close = function(_, err)
_debug_print('nursery: __close', err == nil)
_debug_print(' pending child tasks: {')
for task, _ in pairs(this.child_tasks) do
_debug_print(' ', task.coro)
end
_debug_print(' }')
-- on local error in nursery body, cancel all children
-- TODO: perhaps this cancel() should be invoked from the scheduler,
-- otherwise children of a cancelled tree may still run briefly.
-- NOTE: __close can't distinguish between nil error and no error
if err ~= nil then
this.cancel()
end
-- block until all child tasks done
-- If there is an exception, it's from a child task. In that
-- case, defer the exception, and wait again.
-- (This is "propagate first exception" approach-- eventually we want
-- exception groups instead.)
local status, err = pcall(function()
for task, _ in pairs(this.child_tasks) do
task._done.await()
end
end)
if not status then
_debug_print('nursery: child exception during __close', err)
-- NOTE: the run loop will have already called cancel()
-- clear pending error so we can block again
this.parent_task.pending_error = _NO_ERROR
for task, _ in pairs(this.child_tasks) do
task._done.await()
end
error(err, 0)
end
end
})
return this
end
}
return trio
--- sc_and_lua_2/trio.lua 2022-09-25 22:45:52.000000000 +0900
+++ sc_and_lua_3/trio.lua 2022-10-02 21:45:56.000000000 +0900
@@ -10,8 +10,11 @@
-- * functions that require storing the result in a to-be-closed variable
-- are prefixed with "open_"
--
--- TODO: cancel scopes
+-- TODO: cancel scopes, shielding
-- TODO: exception groups
+-- TODO: consider service nursery semantics (see https://tricycle.readthedocs.io/en/latest/reference.html#cancellation-helpers)
+-- TODO: autojump for testing (see https://trio.readthedocs.io/en/stable/reference-testing.html#trio.testing.MockClock.autojump_threshold)
+-- TODO: task and run context variables (see trio.lowlevel.RunVar, etc.)
-- TODO: lua_trio compatible networking, file I/O, etc. (See cosock for ideas
-- on adapting luasocket.)
@@ -34,20 +37,24 @@
end
end
--- generate chained task traceback from error object and ascending coroutine list
+-- Generate aggregate task traceback, given the error object and
+-- ascending coroutine traceback list.
-- NOTE: Filtering of internal frames is best-effort, and expects the trio module
-- chunk name to include the substring "trio.lua".
local function _task_traceback(error_obj, error_chain)
local traceback = {tostring(error_obj), 'task traceback:'}
- for i, error_coro in ipairs(error_chain) do
- local s = debug.traceback(error_coro)
+ for i, coro_traceback in ipairs(error_chain) do
-- remove header
- s = s:sub(s:find('[\n\r]') + 1)
+ local index = coro_traceback:find('[\n\r]')
+ if not index then
+ goto continue
+ end
+ coro_traceback = coro_traceback:sub(index + 1)
-- First task traceback is kept as is, the remaining have
-- internal frames removed.
local elide = i > 1
local internal_frame_observed = false
- for line in s:gmatch("[^\r\n]+") do
+ for line in coro_traceback:gmatch("[^\r\n]+") do
if elide then
local last_internal_frame_observed = internal_frame_observed
-- TODO: fix fragile matching
@@ -60,31 +67,40 @@
table.insert(traceback, line)
end
end
+ ::continue::
end
return table.concat(traceback, '\n')
end
local _TASK_PAUSE = -1
-local _NO_ERROR = {}
+local _NO_ERROR = setmetatable({}, {__tostring=function() return 'NoError' end})
+-- TODO: make Cancelled public
+local _CANCELLED = setmetatable({}, {__tostring=function() return 'Cancelled' end})
local trio = nil
trio = {
- -- TODO: encapsulate into EventLoop class created by run()
+ -- TODO: Eliminate this module-level state, it should be local to a run()
+ -- invocation. Currently, run() is not reentrant.
_tasks = {}, -- list sorted by wait_until, nearest time last
_paused_tasks = {}, -- set
+ _pending_error_tasks = {}, -- list
_tasks_by_coro = {}, -- map
+
-- TODO: optional task name
_new_task = function(f, parent_nursery, wait_until)
local task = {
coro=coroutine.create(f),
parent_nursery=parent_nursery,
+ --child_nursuries={},
+
+ -- TODO: the following fields should be private
wait_until=wait_until,
pending_error=_NO_ERROR,
- error_chain = nil, -- list of coroutine objects (ascending level), else nil
- child_nursuries={},
+ error_chain = nil, -- list of coroutine tracebacks (ascending level), else nil
_done = trio.Event()
}
+ _debug_print('run: spawn', task.coro)
trio._tasks_by_coro[task.coro] = task
return task
end,
@@ -102,6 +118,9 @@
-- TODO: support abort
await_task_rescheduled = function()
-- TODO: encapsulate yield handling
+ if trio.current_task().pending_error ~= _NO_ERROR then
+ error(_CANCELLED, 0)
+ end
local pending_error = coroutine.yield(_TASK_PAUSE)
if pending_error ~= _NO_ERROR then
-- NOTE: Disable prepending location to error strings, since
@@ -149,14 +168,26 @@
return trio._tasks_by_coro[coroutine.running()]
end,
- await_sleep = function(seconds)
- assert(seconds >= 0)
- local pending_error = coroutine.yield(seconds)
+ await_until_time = function(until_time)
+ if trio.current_task().pending_error ~= _NO_ERROR then
+ error(_CANCELLED, 0)
+ end
+ local pending_error = coroutine.yield(until_time)
if pending_error ~= _NO_ERROR then
error(pending_error, 0)
end
end,
+ -- TODO: consider renaming to await_seconds
+ await_sleep = function(seconds)
+ assert(seconds >= 0)
+ trio.await_until_time(trio.current_time() + seconds)
+ end,
+
+ await_forever = function()
+ trio.await_until_time(math.huge)
+ end,
+
run = function(f)
do
local main_task_nursery <close> = trio.open_nursery()
@@ -164,54 +195,95 @@
main_task_nursery.task = trio._new_task(f, main_task_nursery, 0)
trio._schedule_task(main_task_nursery.task)
local tasks = trio._tasks
- while #tasks > 0 do
- local wait_delta = tasks[#tasks].wait_until - trio.current_time()
- if wait_delta > 0 then
- _debug_print('run: sleep until next event', wait_delta)
- _sleep(wait_delta)
+ while #tasks + #trio._pending_error_tasks > 0 do
+ -- sleep until next event, unless there are tasks with pending errors
+ if #trio._pending_error_tasks == 0 then
+ local wait_delta = tasks[#tasks].wait_until - trio.current_time()
+ if wait_delta > 0 then
+ _debug_print('run: sleep until next event', wait_delta)
+ _sleep(wait_delta)
+ end
end
- -- collect batch of tasks which are ready to run
+ -- collect batch of tasks which are ready to run or have pending error
local current_time = trio.current_time()
- local tasks_to_run = {}
+ local tasks_to_run = trio._pending_error_tasks
+ trio._pending_error_tasks = {}
while #tasks > 0 and tasks[#tasks].wait_until - current_time <= 0 do
- table.insert(tasks_to_run, table.remove(tasks))
+ local task = table.remove(tasks)
+ -- NOTE: filtering pending_error tasks, since these were already added
+ if task.pending_error == _NO_ERROR then
+ table.insert(tasks_to_run, task)
+ end
end
-- run the ready tasks
-- TODO: shuffle execution to avoid reliance on implementation details
for _, task in ipairs(tasks_to_run) do
local coro = task.coro
+ _debug_print('run: resuming', coro)
local resume_status, resume_result = coroutine.resume(coro, task.pending_error)
- if not resume_status then
+ if not resume_status and resume_result == _CANCELLED then
+ coroutine.close(coro)
+ _debug_print('run: `cancelled` from', coro)
+ elseif not resume_status then
error_obj = resume_result
+ -- must grab the traceback before unwinding stack
+ -- (even though the traceback may never be needed...)
+ local traceback = debug.traceback(coro)
+ -- ensure to-be-closed finalization
+ -- TODO: this may raise a new error?
+ coroutine.close(coro)
+ _debug_print('run: error from', coro, error_obj)
-- TODO: Always raise multi-error object. __tostring will assemble the
- -- traceback from the dead coroutines if it's called.
+ -- aggregate traceback, if it's called.
local error_chain = {table.unpack(task.error_chain or {})}
- table.insert(error_chain, coro)
+ table.insert(error_chain, traceback)
if task == main_task_nursery.task then
-- raise unhandled error to caller of event loop
+ -- TODO: still have to wait for finalization...?
error(_task_traceback(error_obj, error_chain), 0)
- else
- local parent_task = task.parent_nursery.task
+ elseif not task.parent_nursery._cancel_requested then
+ -- TODO: If there is an existing Cancelled error, and the new
+ -- error is not Cancelled, give priority to the new error.
+ task.parent_nursery.cancel()
+ local parent_task = task.parent_nursery.parent_task
parent_task.pending_error = error_obj
parent_task.error_chain = error_chain
+ table.insert(trio._pending_error_tasks, parent_task)
end
end
if coroutine.status(coro) == 'dead' then
_debug_print('run: retire', coro)
+ -- TODO: If the task was woken by pending error, it will linger in
+ -- the `tasks` list even after being retired. This is sloppy,
+ -- so consider removing it, at cost of O(log(N)).
task.parent_nursery.child_tasks[task] = nil
trio._tasks_by_coro[coro] = nil
task._done.set()
+ -- remove task from pending error list
+ -- (case where task exited normally before pending error injected)
+ -- TODO: make _pending_error_tasks a set, to avoid O(N)
+ for i, err_task in ipairs(trio._pending_error_tasks) do
+ if err_task == task then
+ table.remove(trio._pending_error_tasks, i)
+ break
+ end
+ end
elseif resume_result == _TASK_PAUSE then
_debug_print('run: pausing', coro)
trio._paused_tasks[task] = true
else
- task.wait_until = trio.current_time() + resume_result
- _debug_print('run:', coro, 'scheduled sleep for', resume_result)
+ task.wait_until = resume_result
+ local dt = task.wait_until - trio.current_time()
+ _debug_print('run:', coro, 'scheduled sleep for', dt)
trio._schedule_task(task)
end
end
+ -- remove any tasks at the head that ended by error
+ while #tasks > 0 and tasks[#tasks]._done.is_set() do
+ table.remove(tasks)
+ end
end
end
end,
@@ -221,22 +293,69 @@
-- of Lua thread to event loop?
local this
this = setmetatable({
- task = trio.current_task(),
+ parent_task = trio.current_task(),
child_tasks = {}, -- set
+ _cancel_requested = false,
+ -- TODO: consider accepting function args
start_soon = function(f)
local task = trio._new_task(f, this, 0)
trio._schedule_task(task)
this.child_tasks[task] = true
+ end,
+
+ -- NOTE: Nursery cancel() is synchronous, and will not propagate until
+ -- the caller yields execution.
+ cancel = function()
+ if this._cancel_requested then
+ return
+ end
+ -- TODO: eagerly cancel all descendants, since otherwise some tasks
+ -- may continue running for several scheduler passes.
+ for task, _ in pairs(this.child_tasks) do
+ -- TODO: Once we have multi-error, add Cancelled even when there
+ -- is a pending error?
+ if task.pending_error == _NO_ERROR then
+ task.pending_error = _CANCELLED
+ table.insert(trio._pending_error_tasks, task)
+ end
+ end
+ this._cancel_requested = true
end
}, {
- __close = function(a)
- _debug_print('__close nursery', a)
- -- block until all child tasks done
- _debug_print(' running tasks:')
+ __close = function(_, err)
+ _debug_print('nursery: __close', err == nil)
+ _debug_print(' pending child tasks: {')
for task, _ in pairs(this.child_tasks) do
- _debug_print(' ', task)
- task._done.await()
+ _debug_print(' ', task.coro)
+ end
+ _debug_print(' }')
+ -- on local error in nursery body, cancel all children
+ -- TODO: perhaps this cancel() should be invoked from the scheduler,
+ -- otherwise children of a cancelled tree may still run briefly.
+ -- NOTE: __close can't distinguish between nil error and no error
+ if err ~= nil then
+ this.cancel()
+ end
+ -- block until all child tasks done
+ -- If there is an exception, it's from a child task. In that
+ -- case, defer the exception, and wait again.
+ -- (This is "propagate first exception" approach-- eventually we want
+ -- exception groups instead.)
+ local status, err = pcall(function()
+ for task, _ in pairs(this.child_tasks) do
+ task._done.await()
+ end
+ end)
+ if not status then
+ _debug_print('nursery: child exception during __close', err)
+ -- NOTE: the run loop will have already called cancel()
+ -- clear pending error so we can block again
+ this.parent_task.pending_error = _NO_ERROR
+ for task, _ in pairs(this.child_tasks) do
+ task._done.await()
+ end
+ error(err, 0)
end
end
})

article

Copyright 2022 John Belmonte, All rights reserved

code and examples

Copyright 2022 John Belmonte

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@svermeulen
Copy link

For anyone interested in using this - I continued @belm0's work here and packaged it into a library

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment