Skip to content

Instantly share code, notes, and snippets.

@belm0
Last active February 6, 2023 15:10
Show Gist options
  • Save belm0/abbe9acb832eafa10dcbfd2b26eb74fc to your computer and use it in GitHub Desktop.
Save belm0/abbe9acb832eafa10dcbfd2b26eb74fc to your computer and use it in GitHub Desktop.
Structured concurrency and Lua (part 2)

Structured concurrency and Lua (part 2)

John Belmonte, 2022-Sep

In the previous installment, we introduced some basics of structured concurrency, and considered how this paradigm might fit within the Lua programming language.

An important point was encapsulation: the caller of a function shouldn't be concerned whether the implementation employs concurrency or not. We shouldn't have to give up features of the language or runtime just because our program contains concurrency.

A primary example is exception handling. In a basic Lua program without concurrency, we have the following expectations about error propagation:

  • "protected" function calls are possible, using pcall(). If an exception happens within the function scope, even transitively, we can catch it at runtime and decide what to do.

  • if the program has an unhandled exception, a "traceback" will be available so that we can diagnose the problem. It contains not only the origin of the error, but the exact code path travelled to reach it.

However, once concurrency is added to a program, the problems begin. Any concurrency framework allowing coroutines to outlive the function that spawned them will violate these expectations. pcall() becomes useless. Moreover, since each coroutine has its own call stack, and the relation among them is unknown (e.g., a spawned coroutine may in turn spawn another), the code path taken to reach the error is obscured.

use the structure!

The previous installment concluded with an example where a nursery's child task was altered to intentionally raise an error. The traceback exhibited the problems mentioned above, since our fledgling concurrency library was not taking advantage of the hierarchical structure of the tasks. Every task has a known parent—all the way up through the stack frame enclosing the nursery scope, and ultimately to the program's root task. The implementation should use this to propagate errors, and piece together a complete traceback.

Now that such an improvement has been applied, let's run the error example again:

trio = require('trio')

function await_error_example()
    -- child task raises an error
    do
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            trio.await_sleep(1)
            error('oops')
        end)
    end
    print('done')
end

trio.run(await_error_example)
$ lua example_3.lua
lua: example_3.lua:9: oops
task traceback:
	[C]: in function 'error'
	example_3.lua:9: in function <example_3.lua:7>
	example_3.lua:10: in function 'await_error_example'
stack traceback:
	[C]: in function 'error'
	./trio.lua:194: in function 'trio.run'
	example_3.lua:15: in main chunk
	[C]: in ?

In addition to the normal "stack traceback" section, the unhandled exception output is preceded by a "task traceback", covering the path of the error within the concurrency framework. In this example, it includes the culprit lambda function, passed to start_soon(), as well as the exit location of the nursery scope, correctly attributed to await_error_example().

It may seem that any coroutine scheduler could do this easily: just add the traceback of the coroutine that failed resume() to the error string. But that assumes coroutines are only one level deep. Let's try a more complicated example, with nested tasks:

trio = require('trio')

function await_deeply_nested_error()
    do
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            trio.await_sleep(1)
            error('oops')
        end)
    end
end

function await_error_example()
    do
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(await_deeply_nested_error)
    end
    print('done')
end

trio.run(await_error_example)
$ lua example_4.lua
lua: example_4.lua:8: oops
task traceback:
	[C]: in function 'error'
	example_4.lua:8: in function <example_4.lua:6>
	example_4.lua:9: in function 'await_deeply_nested_error'
	example_4.lua:16: in function 'await_error_example'
stack traceback:
	[C]: in function 'error'
	./trio.lua:194: in function 'trio.run'
	example_4.lua:21: in main chunk
	[C]: in ?

The traceback above spans code executed from multiple tasks (coroutines), showing the precise path that resulted in the error. Moreover, since the error propagates through every intermediate task between the error and root of the program, pcall() can intervene at any point:

function await_error_example()
    result, err = pcall(function()
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(await_deeply_nested_error)
        -- ...
    end)
    if not result then
        print('ignoring an error:', err)
    end
    print('done')
end
$ lua example_5.lua
ignoring an error:	example_5.lua:8: oops
done

As far as error handling, we've demonstrated that structured concurrency allows concurrent programs to match the ease of non-concurrent programs.

implementation

Building on the toy structured concurrency module of the first installment, the implementation now propagates errors through the task hierarchy, and appends a "task traceback" to errors raised out of trio.run(). (TODO: such error objects should be a table with __tostring, so that the traceback is generated lazily, only if needed.)

The exception propagation works by deferring any error raised by a task's coroutine.resume(), injecting it into the parent task when the parent is next advanced.

When assembling task tracebacks, care is taken to omit stack frames belonging to the concurrency implementation. This eliminates distracting clutter, especially when an error is traversing multiple tasks.

See code changes relative to the previous article installment.

up next: cancellation

When a task has an exception (or is otherwise cancelled), the children and siblings of that task should, in turn, be cancelled, and allowed to finalize. Currently, that isn't the case. For example:

trio = require('trio')

function await_cancellation_example()
    -- among two parallel tasks, one fails
    pcall(function()
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            print('child 1 start')
            trio.await_sleep(1)
            error('oops')
        end)
        nursery.start_soon(function()
            print('child 2 start')
            trio.await_sleep(2)
            print('child 2 end')
        end)
        print('waiting for child tasks')
    end)
    print('done')
end

trio.run(await_cancellation_example)
$ lua example_6.lua
waiting for child tasks
child 2 start
child 1 start
done
child 2 end

One of the child tasks was allowed to run to completion, despite its sibling having raised an error into the nursery (which happens to be subsequently caught). Even worse, the task lifetime has leaked outside the nursery scope, with "child 2 end" appearing after "done".

We'll address these deficiencies and expand on cancellation—another area that benefits significantly from structured concurrency—in the next installment of this series.


continue reading: Structured concurrency and Lua (part 3)


article © 2022 John Belmonte, all rights reserved

trio = require('trio')
function await_error_example()
-- child task raises an error
do
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
trio.await_sleep(1)
error('oops')
end)
end
print('done')
end
trio.run(await_error_example)
trio = require('trio')
function await_deeply_nested_error()
do
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
trio.await_sleep(1)
error('oops')
end)
end
end
function await_error_example()
do
local nursery <close> = trio.open_nursery()
nursery.start_soon(await_deeply_nested_error)
end
print('done')
end
trio.run(await_error_example)
trio = require('trio')
function await_deeply_nested_error()
do
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
trio.await_sleep(1)
error('oops')
end)
end
end
function await_error_example()
result, err = pcall(function()
local nursery <close> = trio.open_nursery()
nursery.start_soon(await_deeply_nested_error)
-- ...
end)
if not result then
print('ignoring an error:', err)
end
print('done')
end
trio.run(await_error_example)
trio = require('trio')
function await_cancellation_example()
-- among two parallel tasks, one fails
pcall(function()
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
print('child 1 start')
trio.await_sleep(1)
error('oops')
end)
nursery.start_soon(function()
print('child 2 start')
trio.await_sleep(2)
print('child 2 end')
end)
print('waiting for child tasks')
end)
print('done')
end
trio.run(await_cancellation_example)
-- trio - toy structured concurrency implementation based on Python Trio
--
-- requirements:
-- * Lua >= 5.4.3 (with "yield from __close" support)
-- * fractional _time() and _sleep() - see below for placeholder Unix
-- implementations using popen().
--
-- API naming conventions:
-- * functions that may yield are prefixed with "await_"
-- * functions that require storing the result in a to-be-closed variable
-- are prefixed with "open_"
--
-- TODO: cancel scopes
-- TODO: exception groups
-- TODO: lua_trio compatible networking, file I/O, etc. (See cosock for ideas
-- on adapting luasocket.)
local function _sleep(seconds)
os.execute('sleep ' .. tonumber(seconds))
end
-- returns fractional time in seconds from an arbitrary reference point
-- (This implementation requires `date` from GNU coreutils.)
local function _time()
local f = assert(io.popen('date +%s%3N', 'r'))
local s = assert(f:read('*a'))
f:close()
return tonumber(s) / 1000
end
local function _debug_print(...)
if false then
print(...)
end
end
-- generate chained task traceback from error object and ascending coroutine list
-- NOTE: Filtering of internal frames is best-effort, and expects the trio module
-- chunk name to include the substring "trio.lua".
local function _task_traceback(error_obj, error_chain)
local traceback = {tostring(error_obj), 'task traceback:'}
for i, error_coro in ipairs(error_chain) do
local s = debug.traceback(error_coro)
-- remove header
s = s:sub(s:find('[\n\r]') + 1)
-- First task traceback is kept as is, the remaining have
-- internal frames removed.
local elide = i > 1
local internal_frame_observed = false
for line in s:gmatch("[^\r\n]+") do
if elide then
local last_internal_frame_observed = internal_frame_observed
-- TODO: fix fragile matching
internal_frame_observed = string.find(line, 'trio%.lua') ~= nil
if last_internal_frame_observed and not internal_frame_observed then
elide = false
end
end
if not elide then
table.insert(traceback, line)
end
end
end
return table.concat(traceback, '\n')
end
local _TASK_PAUSE = -1
local _NO_ERROR = {}
local trio = nil
trio = {
-- TODO: encapsulate into EventLoop class created by run()
_tasks = {}, -- list sorted by wait_until, nearest time last
_paused_tasks = {}, -- set
_tasks_by_coro = {}, -- map
-- TODO: optional task name
_new_task = function(f, parent_nursery, wait_until)
local task = {
coro=coroutine.create(f),
parent_nursery=parent_nursery,
wait_until=wait_until,
pending_error=_NO_ERROR,
error_chain = nil, -- list of coroutine objects (ascending level), else nil
child_nursuries={},
_done = trio.Event()
}
trio._tasks_by_coro[task.coro] = task
return task
end,
_schedule_task = function(task)
-- TODO: find insertion point by binary search
local tasks = trio._tasks
for i = #tasks, 1, -1 do
if tasks[i].wait_until >= task.wait_until then
table.insert(tasks, i+1, task)
return
end
end
table.insert(tasks, 1, task)
end,
-- TODO: support abort
await_task_rescheduled = function()
-- TODO: encapsulate yield handling
local pending_error = coroutine.yield(_TASK_PAUSE)
if pending_error ~= _NO_ERROR then
-- NOTE: Disable prepending location to error strings, since
-- that already happened when the original error was raised.
error(pending_error, 0)
end
end,
reschedule = function(task)
assert(trio._paused_tasks[task])
trio._paused_tasks[task] = nil
task.wait_until = 0
trio._schedule_task(task)
end,
Event = function()
local _is_set = false
local _waiting_tasks = {}
return {
set = function()
if not _is_set then
_is_set = true
-- reschedule tasks
for _, task in ipairs(_waiting_tasks) do
trio.reschedule(task)
end
end
end,
is_set = function()
return _is_set
end,
await = function()
if not _is_set then
table.insert(_waiting_tasks, trio.current_task())
trio.await_task_rescheduled()
end
end
}
end,
current_time = _time,
current_task = function()
return trio._tasks_by_coro[coroutine.running()]
end,
await_sleep = function(seconds)
assert(seconds >= 0)
local pending_error = coroutine.yield(seconds)
if pending_error ~= _NO_ERROR then
error(pending_error, 0)
end
end,
run = function(f)
do
local main_task_nursery <close> = trio.open_nursery()
-- TODO: use the current coroutine rather than creating a new one?
main_task_nursery.task = trio._new_task(f, main_task_nursery, 0)
trio._schedule_task(main_task_nursery.task)
local tasks = trio._tasks
while #tasks > 0 do
local wait_delta = tasks[#tasks].wait_until - trio.current_time()
if wait_delta > 0 then
_debug_print('run: sleep until next event', wait_delta)
_sleep(wait_delta)
end
-- collect batch of tasks which are ready to run
local current_time = trio.current_time()
local tasks_to_run = {}
while #tasks > 0 and tasks[#tasks].wait_until - current_time <= 0 do
table.insert(tasks_to_run, table.remove(tasks))
end
-- run the ready tasks
-- TODO: shuffle execution to avoid reliance on implementation details
for _, task in ipairs(tasks_to_run) do
local coro = task.coro
local resume_status, resume_result = coroutine.resume(coro, task.pending_error)
if not resume_status then
error_obj = resume_result
-- TODO: Always raise multi-error object. __tostring will assemble the
-- traceback from the dead coroutines if it's called.
local error_chain = {table.unpack(task.error_chain or {})}
table.insert(error_chain, coro)
if task == main_task_nursery.task then
-- raise unhandled error to caller of event loop
error(_task_traceback(error_obj, error_chain), 0)
else
local parent_task = task.parent_nursery.task
parent_task.pending_error = error_obj
parent_task.error_chain = error_chain
end
end
if coroutine.status(coro) == 'dead' then
_debug_print('run: retire', coro)
task.parent_nursery.child_tasks[task] = nil
trio._tasks_by_coro[coro] = nil
task._done.set()
elseif resume_result == _TASK_PAUSE then
_debug_print('run: pausing', coro)
trio._paused_tasks[task] = true
else
task.wait_until = trio.current_time() + resume_result
_debug_print('run:', coro, 'scheduled sleep for', resume_result)
trio._schedule_task(task)
end
end
end
end
end,
open_nursery = function()
-- TODO: needs to attach to active event loop somehow. Global map
-- of Lua thread to event loop?
local this
this = setmetatable({
task = trio.current_task(),
child_tasks = {}, -- set
start_soon = function(f)
local task = trio._new_task(f, this, 0)
trio._schedule_task(task)
this.child_tasks[task] = true
end
}, {
__close = function(a)
_debug_print('__close nursery', a)
-- block until all child tasks done
_debug_print(' running tasks:')
for task, _ in pairs(this.child_tasks) do
_debug_print(' ', task)
task._done.await()
end
end
})
return this
end
}
return trio
--- sc_and_lua_1/trio.lua 2022-09-25 09:34:05.000000000 +0900
+++ sc_and_lua_2/trio.lua 2022-09-25 10:04:55.000000000 +0900
@@ -10,9 +10,8 @@
-- * functions that require storing the result in a to-be-closed variable
-- are prefixed with "open_"
--
--- TODO: nested nurseries
--- TODO: error handling
-- TODO: cancel scopes
+-- TODO: exception groups
-- TODO: lua_trio compatible networking, file I/O, etc. (See cosock for ideas
-- on adapting luasocket.)
@@ -35,7 +34,38 @@
end
end
+-- generate chained task traceback from error object and ascending coroutine list
+-- NOTE: Filtering of internal frames is best-effort, and expects the trio module
+-- chunk name to include the substring "trio.lua".
+local function _task_traceback(error_obj, error_chain)
+ local traceback = {tostring(error_obj), 'task traceback:'}
+ for i, error_coro in ipairs(error_chain) do
+ local s = debug.traceback(error_coro)
+ -- remove header
+ s = s:sub(s:find('[\n\r]') + 1)
+ -- First task traceback is kept as is, the remaining have
+ -- internal frames removed.
+ local elide = i > 1
+ local internal_frame_observed = false
+ for line in s:gmatch("[^\r\n]+") do
+ if elide then
+ local last_internal_frame_observed = internal_frame_observed
+ -- TODO: fix fragile matching
+ internal_frame_observed = string.find(line, 'trio%.lua') ~= nil
+ if last_internal_frame_observed and not internal_frame_observed then
+ elide = false
+ end
+ end
+ if not elide then
+ table.insert(traceback, line)
+ end
+ end
+ end
+ return table.concat(traceback, '\n')
+end
+
local _TASK_PAUSE = -1
+local _NO_ERROR = {}
local trio = nil
@@ -50,6 +80,8 @@
coro=coroutine.create(f),
parent_nursery=parent_nursery,
wait_until=wait_until,
+ pending_error=_NO_ERROR,
+ error_chain = nil, -- list of coroutine objects (ascending level), else nil
child_nursuries={},
_done = trio.Event()
}
@@ -69,7 +101,13 @@
end,
-- TODO: support abort
await_task_rescheduled = function()
- coroutine.yield(_TASK_PAUSE)
+ -- TODO: encapsulate yield handling
+ local pending_error = coroutine.yield(_TASK_PAUSE)
+ if pending_error ~= _NO_ERROR then
+ -- NOTE: Disable prepending location to error strings, since
+ -- that already happened when the original error was raised.
+ error(pending_error, 0)
+ end
end,
reschedule = function(task)
assert(trio._paused_tasks[task])
@@ -113,13 +151,18 @@
await_sleep = function(seconds)
assert(seconds >= 0)
- coroutine.yield(seconds)
+ local pending_error = coroutine.yield(seconds)
+ if pending_error ~= _NO_ERROR then
+ error(pending_error, 0)
+ end
end,
run = function(f)
do
local main_task_nursery <close> = trio.open_nursery()
- trio._schedule_task(trio._new_task(f, main_task_nursery, 0))
+ -- TODO: use the current coroutine rather than creating a new one?
+ main_task_nursery.task = trio._new_task(f, main_task_nursery, 0)
+ trio._schedule_task(main_task_nursery.task)
local tasks = trio._tasks
while #tasks > 0 do
local wait_delta = tasks[#tasks].wait_until - trio.current_time()
@@ -139,18 +182,33 @@
-- TODO: shuffle execution to avoid reliance on implementation details
for _, task in ipairs(tasks_to_run) do
local coro = task.coro
- local _, time_to_sleep = assert(coroutine.resume(coro))
+ local resume_status, resume_result = coroutine.resume(coro, task.pending_error)
+ if not resume_status then
+ error_obj = resume_result
+ -- TODO: Always raise multi-error object. __tostring will assemble the
+ -- traceback from the dead coroutines if it's called.
+ local error_chain = {table.unpack(task.error_chain or {})}
+ table.insert(error_chain, coro)
+ if task == main_task_nursery.task then
+ -- raise unhandled error to caller of event loop
+ error(_task_traceback(error_obj, error_chain), 0)
+ else
+ local parent_task = task.parent_nursery.task
+ parent_task.pending_error = error_obj
+ parent_task.error_chain = error_chain
+ end
+ end
if coroutine.status(coro) == 'dead' then
_debug_print('run: retire', coro)
task.parent_nursery.child_tasks[task] = nil
trio._tasks_by_coro[coro] = nil
task._done.set()
- elseif time_to_sleep == _TASK_PAUSE then
+ elseif resume_result == _TASK_PAUSE then
_debug_print('run: pausing', coro)
trio._paused_tasks[task] = true
else
- task.wait_until = trio.current_time() + time_to_sleep
- _debug_print('run:', coro, 'scheduled sleep for', time_to_sleep)
+ task.wait_until = trio.current_time() + resume_result
+ _debug_print('run:', coro, 'scheduled sleep for', resume_result)
trio._schedule_task(task)
end
end
@@ -163,7 +221,7 @@
-- of Lua thread to event loop?
local this
this = setmetatable({
- -- TODO: track parent_task
+ task = trio.current_task(),
child_tasks = {}, -- set
start_soon = function(f)

article

Copyright 2022 John Belmonte, All rights reserved

code and examples

Copyright 2022 John Belmonte

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@spc476
Copy link

spc476 commented Sep 21, 2022

I tried example_6.lua with some slight modifications:

trio = require('trio')

function async_cancellation_example()
    -- among two parallel tasks, one fails
    pcall(function()
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            print('child 1 start')
            trio.async_sleep(1)
            error('oops')
            print('child 1 end')
        end)
        nursery.start_soon(function()
            print('child 2 start')
            trio.async_sleep(2)
            print('child 2 end')
        end)
        print('waiting for child tasks')
    end)
    print('done')
end

trio.run(async_cancellation_example)
print("here")

If I comment out error('oops') I get the following output:

waiting for child tasks
child 2 start
child 1 start
child 1 end
child 2 end
done
here

Yet if error('oops') isn't commented out, I get:

waiting for child tasks
child 2 start
child 1 start
done
child 2 end
here

I would expect done to always appear just before here, not child 2 end, because it's not done when it prints done. I can see this breaking a scenario I would use this for---making two concurrent requests for data (say, two REST calls) because doing them sequentially would take too much time. If one of them failed, I would think they were both done when that isn't the case.

@belm0
Copy link
Author

belm0 commented Sep 21, 2022

@spc476 thank you-- yes, that's exactly the point I was making at the end of the article:

Even worse, the task lifetime has leaked outside the nursery scope, with "child 2 end" appearing after "done".

We'll fill in the deficiencies as the series (and toy implementation) progresses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment