Skip to content

Instantly share code, notes, and snippets.

@arnm
Last active July 7, 2025 12:56
Show Gist options
  • Save arnm/012bb0244e2d7731ac8db431ccfb3f65 to your computer and use it in GitHub Desktop.
Save arnm/012bb0244e2d7731ac8db431ccfb3f65 to your computer and use it in GitHub Desktop.
DAG-enabled Checklist Tool for CodeCompanion: Lua sources and docs
-- checklist_dag.lua
-- DAG checklist tools with proper type annotations and standardized returns
local dag_manager_module = require('codecompanion.strategies.chat.agents.tools.checklist_lib.dag_manager')
local dag_formatter_module = require('codecompanion.strategies.chat.agents.tools.checklist_lib.dag_formatter')
local dag_executor = require('codecompanion.strategies.chat.agents.tools.checklist_lib.dag_executor')
local storage_module = require('codecompanion.strategies.chat.agents.tools.checklist_lib.storage')
-- Create DAG system instance
---@return table
local function get_dag_system()
local storage = storage_module.new()
local manager = dag_manager_module.new(storage)
local formatter = dag_formatter_module.new()
return {
storage = storage,
manager = manager,
formatter = formatter
}
end
-- Get the shared DAG system instance
local dag_system = nil
---@return table
local function get_shared_dag_system()
if not dag_system then
dag_system = get_dag_system()
end
return dag_system
end
---@class ChecklistDagCreateTool
local ChecklistDagCreateTool = {
name = "checklist_dag_create",
cmds = {
---@param agent table
---@param args table
---@param input string
---@param cb function
function(agent, args, input, cb)
local goal = args.goal
local tasks_input = args.tasks or {}
local subject = args.subject
local body = args.body
if not goal or goal == "" then
return cb({
status = "error",
data = {},
message = "Goal is required"
})
end
if not tasks_input or #tasks_input == 0 then
return cb({
status = "error",
data = {},
message = "At least one task is required"
})
end
if not subject or subject == "" then
return cb({
status = "error",
data = {},
message = "subject is required"
})
end
if not body then
return cb({
status = "error",
data = {},
message = "body is required"
})
end
local system = get_shared_dag_system()
local manager = system.manager
-- Parse tasks with dependencies and modes
local tasks_data = {}
for i, task_input in ipairs(tasks_input) do
if type(task_input) == "string" then
table.insert(tasks_data, {
text = task_input,
dependencies = {},
mode = "readwrite" -- Default to safe mode for string inputs
})
elseif type(task_input) == "table" then
table.insert(tasks_data, {
text = task_input.text or task_input[1] or "",
dependencies = task_input.dependencies or {},
mode = task_input.mode or "readwrite" -- Default to safe mode
})
end
end
-- Get independent tasks for parallel execution
local independent_tasks = manager:get_independent_tasks(tasks_data)
if #independent_tasks > 0 then
-- Prepare tasks for parallel execution
local tasks_to_execute = {}
for _, task_idx in ipairs(independent_tasks) do
table.insert(tasks_to_execute, {
index = task_idx,
text = tasks_data[task_idx].text
})
end
-- Get current chat context
local parent_bufnr = vim.api.nvim_get_current_buf()
local parent_chat = require("codecompanion.strategies.chat").buf_get_chat(parent_bufnr)
-- Execute independent tasks in parallel
dag_executor.execute_tasks_parallel(tasks_to_execute, parent_chat, function(parallel_results)
-- Create checklist with parallel results
local checklist, err = manager:create_checklist(goal, tasks_data, subject, body, parallel_results)
if not checklist then
return cb({
status = "error",
data = {},
message = err
})
end
return cb({
status = "success",
data = {
checklist = checklist,
parallel_results = parallel_results
}
})
end)
-- Return early - don't continue to the else branch
return
else
-- No independent tasks, create checklist normally
local checklist, err = manager:create_checklist(goal, tasks_data, subject, body, {})
if not checklist then
return cb({
status = "error",
data = {},
message = err
})
end
return cb({
status = "success",
data = {
checklist = checklist,
parallel_results = {}
}
})
end
end,
},
function_call = {},
schema = {
type = "function",
["function"] = {
name = "checklist_dag_create",
description = "Create a DAG-enabled checklist with task dependencies and parallel execution of independent tasks",
parameters = {
type = "object",
properties = {
goal = { type = "string", description = "Goal of the checklist" },
tasks = {
type = "array",
items = {
oneOf = {
{ type = "string" },
{
type = "object",
properties = {
text = { type = "string", description = "Task description" },
dependencies = {
type = "array",
items = { type = "integer" },
description = "Array of task indices (1-based) that must complete first"
},
mode = {
type = "string",
enum = { "read", "write", "readwrite" },
description = "Access mode: 'read' (safe for parallel), 'write' or 'readwrite' (requires context)"
}
},
required = { "text" }
}
}
},
description = "Tasks with optional dependencies"
},
subject = { type = "string", description = "Commit subject (summary/title)" },
body = { type = "string", description = "Commit body (detailed explanation)" }
},
required = { "goal", "tasks", "subject", "body" },
additionalProperties = false
},
strict = true
}
},
system_prompt =
[[Use this tool to create and manage a structured checklist for your current coding session. This helps you track progress, organize complex tasks, and demonstrate thoroughness to the user.
When to use:
- For complex multi-step tasks (3 or more steps)
- For non-trivial and complex work
- When the user explicitly requests a checklist
- When the user provides multiple tasks
- After receiving new instructions or requirements
When NOT to use:
- If there is only a single, trivial task
- If the task can be completed in less than 3 trivial steps
- If the task is purely conversational or informational
Checklist behavior:
- The first task will automatically be set to "in_progress".
- Only read-only tasks with no dependencies will be executed in parallel for safety.
Task modes:
- "read": Safe for parallel execution (analysis, search, reading files)
- "write": Requires context (file modifications, destructive operations)
- "readwrite": Requires context (operations that both read and modify)
Usage:
- All fields are required: goal, tasks, subject, body.
- Tasks can specify mode for safety control.
- Returns the created checklist with all tasks and progress.
Examples:
- checklist_dag_create({
goal = "Build authentication system",
tasks = [
{"text": "Analyze current auth code", "mode": "read", "dependencies": []},
{"text": "Design auth schema", "mode": "readwrite", "dependencies": []},
{"text": "Write unit tests", "mode": "write", "dependencies": [2]},
{"text": "Implement auth logic", "mode": "write", "dependencies": [1, 2, 3]}
],
subject = "Auth system implementation",
body = "Build complete authentication system with safe parallel execution."
})
]],
opts = { requires_approval = true },
env = nil,
handlers = {},
output = {
success = function(tool, agent, cmd, stdout)
local response_data = stdout[1]
if response_data and response_data.checklist then
local checklist = response_data.checklist
local parallel_results = response_data.parallel_results or {}
local system = get_shared_dag_system()
local dag_formatter = system.formatter
local manager = system.manager
local progress = manager:get_progress(checklist)
-- LLM gets full structured data including parallel results
local llm_output = vim.inspect({
checklist = checklist,
progress = progress,
parallel_results = parallel_results
})
-- User gets formatted display with parallel results info
local user_formatted = dag_formatter:format_checklist(checklist, progress)
-- Add parallel results info to user display if any exist
if not vim.tbl_isempty(parallel_results) then
user_formatted = user_formatted .. "\n\nParallel execution results:"
for task_idx, result in pairs(parallel_results) do
local truncated = #result > 80 and (result:sub(1, 77) .. "...") or result
user_formatted = user_formatted .. string.format("\n Task %d: %s", task_idx, truncated)
end
end
agent.chat:add_tool_output(tool, llm_output, user_formatted)
else
agent.chat:add_tool_output(tool, "No DAG checklist data available")
end
end,
error = function(tool, agent, cmd, stderr)
local response = stderr[1]
local error_msg = response and response.message or "Unknown error"
agent.chat:add_tool_output(tool, string.format("**Checklist DAG Tool Error**: %s", error_msg))
end,
rejected = function(tool, agent, cmd)
agent.chat:add_tool_output(tool, "**Checklist DAG Tool**: User declined to execute the operation")
end,
},
["output.prompt"] = function(tool, agent)
local tasks_count = tool.args.tasks and #tool.args.tasks or 0
local read_only_count = 0
if tool.args.tasks then
for _, task in ipairs(tool.args.tasks) do
local deps = type(task) == "table" and task.dependencies or {}
local mode = type(task) == "table" and task.mode or "readwrite"
-- Only read-only tasks with no dependencies can execute in parallel
if (#deps == 0) and (mode == "read") then
read_only_count = read_only_count + 1
end
end
end
return string.format(
"Create DAG checklist: '%s' (%d tasks, %d read-only will execute in parallel)?",
tool.args.goal or "(no goal)",
tasks_count,
read_only_count
)
end,
args = {},
tool = {},
}
---@class ChecklistDagStatusTool
local ChecklistDagStatusTool = {
name = "checklist_dag_status",
cmds = {
---@param agent table
---@param args table|nil
---@param input string
---@param cb function
function(agent, args, input, cb)
args = args or {}
local checklist_id = args.checklist_id
local system = get_shared_dag_system()
local manager = system.manager
local checklist, err = manager:get_checklist(checklist_id)
if not checklist then
return cb({
status = "error",
data = {},
message = err
})
end
return cb({
status = "success",
data = checklist
})
end,
},
function_call = {},
schema = {
type = "function",
["function"] = {
name = "checklist_dag_status",
description =
"Use this tool to read the status of a specific DAG checklist. If checklist_id is omitted, the latest incomplete checklist will be used.",
parameters = {
type = "object",
properties = {
checklist_id = {
type = "string",
description = "Checklist ID to show status for (optional, defaults to latest incomplete checklist)"
}
},
required = {},
additionalProperties = false
},
strict = true
}
},
system_prompt =
[[Use this tool to read the status, log, and progress details of a specific checklist.
When to use:
- When you need to see the full details, log, and progress of a specific checklist
- Before making changes, marking tasks complete, or reporting progress
When NOT to use:
- If you want to see all checklists, use checklist_dag_list instead.
Usage:
- checklist_id is optional. If omitted, pass an empty object: {}.
- Returns full checklist details including tasks, log, and progress metrics.
- This is read-only.
Examples:
- checklist_dag_status({}) -- status of latest incomplete DAG checklist
- checklist_dag_status({ checklist_id = "2" }) -- status of DAG checklist with ID 2
]],
opts = {},
env = nil,
handlers = {},
output = {
success = function(tool, agent, cmd, stdout)
local checklist = stdout[1]
if checklist then
local system = get_shared_dag_system()
local dag_formatter = system.formatter
local manager = system.manager
local progress = manager:get_progress(checklist)
-- LLM gets full structured data
local llm_output = vim.inspect({
checklist = checklist,
progress = progress
})
-- User gets formatted display
local user_formatted = dag_formatter:format_checklist(checklist, progress)
agent.chat:add_tool_output(tool, llm_output, user_formatted)
else
agent.chat:add_tool_output(tool, "No DAG checklist data available")
end
end,
error = function(tool, agent, cmd, stderr)
local response = stderr[1]
local error_msg = response and response.message or "Unknown error"
agent.chat:add_tool_output(tool, string.format("**Checklist DAG Status Tool Error**: %s", error_msg))
end,
rejected = function(tool, agent, cmd)
agent.chat:add_tool_output(tool, "**Checklist DAG Status Tool**: User declined to execute the operation")
end,
},
args = {},
tool = {},
}
---@class ChecklistDagListTool
local ChecklistDagListTool = {
name = "checklist_dag_list",
cmds = {
---@param agent table
---@param args table
---@param input string
---@param cb function
function(agent, args, input, cb)
local system = get_shared_dag_system()
local manager = system.manager
local all_checklists = manager:get_all_checklists()
return cb({
status = "success",
data = all_checklists
})
end,
},
function_call = {},
schema = {
type = "function",
["function"] = {
name = "checklist_dag_list",
description = "Use this tool to read the current DAG checklist(s) for the workspace"
}
},
system_prompt =
[[Use this tool to read the current checklist(s) for the workspace.
When to use:
- At the beginning of conversations to see what's pending
- Before starting new tasks to prioritize work
- When the user asks about previous tasks or plans
- Whenever you're uncertain about what to do next
- After completing tasks to update your understanding of remaining work
- After every few messages to ensure you're on track
When NOT to use:
- If you only need the status of a specific checklist, use checklist_dag_status instead.
Usage:
- This tool takes in no parameters. Call it with no arguments.
- Returns a list of checklists with their status, progress, and tasks.
- If no checklists exist yet, an empty list will be returned.
Examples:
- checklist_dag_list()
]],
opts = {},
env = nil,
handlers = {},
output = {
success = function(tool, agent, cmd, stdout)
local checklists = stdout[1]
-- Extract progress for each checklist
local system = get_shared_dag_system()
local manager = system.manager
local checklists_with_progress = {}
for _, checklist in ipairs(checklists) do
local progress = manager:get_progress(checklist)
table.insert(checklists_with_progress, {
checklist = checklist,
progress = progress
})
end
-- LLM sees structured data, user sees formatted list
local llm_output = vim.inspect(checklists_with_progress)
-- Generate detailed user message with numbered entries
local user_msg
if #checklists == 0 then
user_msg = "**Checklist DAG List Tool**: No DAG checklists found"
else
user_msg = string.format("**Checklist DAG List Tool**: Found %d DAG checklist%s:\n",
#checklists, #checklists == 1 and "" or "s")
-- Sort by creation time (newest first) for display
local sorted_for_display = vim.deepcopy(checklists_with_progress)
table.sort(sorted_for_display, function(a, b)
return a.checklist.created_at > b.checklist.created_at
end)
for i, item in ipairs(sorted_for_display) do
local checklist = item.checklist
local progress = item.progress
user_msg = user_msg .. string.format(
"%d. **%s** (ID: %d)\n • Progress: %d/%d tasks complete (%d blocked)\n • Created: %s\n",
i,
checklist.goal or "No goal",
checklist.id,
progress.completed,
progress.total,
progress.blocked,
os.date("%Y-%m-%d %H:%M", checklist.created_at)
)
end
end
agent.chat:add_tool_output(tool, llm_output, user_msg)
end,
error = function(tool, agent, cmd, stderr)
local response = stderr[1]
local error_msg = response and response.message or "Unknown error"
agent.chat:add_tool_output(tool, string.format("**Checklist DAG List Tool Error**: %s", error_msg))
end,
rejected = function(tool, agent, cmd)
agent.chat:add_tool_output(tool, "**Checklist DAG List Tool**: User declined to execute the operation")
end,
},
args = {},
tool = {},
}
---@class ChecklistDagCompleteTaskTool
local ChecklistDagCompleteTaskTool = {
name = "checklist_dag_complete_task",
cmds = {
---@param agent table
---@param args table
---@param input string
---@param cb function
function(agent, args, input, cb)
local checklist_id = args.checklist_id
local task_id = args.task_id
local subject = args.subject
local body = args.body
local system = get_shared_dag_system()
local manager = system.manager
local checklist, err = manager:get_checklist(checklist_id)
if not checklist then
return cb({
status = "error",
data = {},
message = err
})
end
if not task_id then
return cb({
status = "error",
data = {},
message = "task_id is required"
})
end
if not subject or subject == "" then
return cb({
status = "error",
data = {},
message = "subject is required"
})
end
if not body then
return cb({
status = "error",
data = {},
message = "body is required"
})
end
local success, msg = manager:complete_task(agent, checklist, task_id, subject, body)
if not success then
return cb({
status = "error",
data = {},
message = msg
})
end
return cb({
status = "success",
data = checklist
})
end,
},
function_call = {},
schema = {
type = "function",
["function"] = {
name = "checklist_dag_complete_task",
description =
"Use this tool to mark the current in-progress task as complete in a DAG checklist. If checklist_id is omitted, the latest incomplete checklist will be used.",
parameters = {
type = "object",
properties = {
checklist_id = { type = "string", description = "Checklist ID to update (optional, defaults to latest incomplete checklist)" },
task_id = { type = "string", description = "Task ID to mark complete (must be in progress)" },
subject = { type = "string", description = "Commit subject (summary/title)" },
body = { type = "string", description = "Commit body (detailed explanation)" }
},
required = { "task_id", "subject", "body" },
additionalProperties = false
},
strict = true
}
},
system_prompt =
[[Use this tool to mark the current in-progress task as complete in a checklist. Only one task can be completed at a time. When a task is completed, the next pending task (if any) is automatically set to "in_progress". If checklist_id is omitted, the latest incomplete checklist will be used.
When to use:
- Immediately after completing the current in-progress task
- After verification and testing
- When user confirms acceptance
When NOT to use:
- If there is no checklist or no in-progress task
Checklist behavior:
- Only tasks that are "in_progress" can be completed.
- The next pending task will automatically be set to "in_progress".
Usage:
- checklist_id is optional.
- All other fields are required: task_id, subject, body.
- Returns the updated checklist with all tasks and progress.
Examples:
- checklist_dag_complete_task({
task_id = "1",
subject = "Completed auth schema design",
body = "Designed comprehensive authentication schema with user roles."
})
- checklist_dag_complete_task({
checklist_id = "2",
task_id = "3",
subject = "Updated docs",
body = "Documentation updated for new auth flow."
})
]],
opts = { requires_approval = true },
env = nil,
handlers = {},
output = {
success = function(tool, agent, cmd, stdout)
local checklist = stdout[1]
if checklist then
local system = get_shared_dag_system()
local dag_formatter = system.formatter
local manager = system.manager
-- Get next in-progress task or completion message
local next_idx, next_task = manager:get_next_in_progress_task(checklist)
-- LLM gets full structured data
local llm_output = vim.inspect({
checklist = checklist,
next_task_idx = next_idx,
next_task = next_task
})
-- User gets formatted completion message
local user_formatted = dag_formatter:format_task_completion(checklist, next_idx, next_task)
agent.chat:add_tool_output(tool, llm_output, user_formatted)
else
agent.chat:add_tool_output(tool, "No DAG checklist data available")
end
end,
error = function(tool, agent, cmd, stderr)
local response = stderr[1]
local error_msg = response and response.message or "Unknown error"
agent.chat:add_tool_output(tool, string.format("**Checklist DAG Complete Task Tool Error**: %s", error_msg))
end,
rejected = function(tool, agent, cmd)
agent.chat:add_tool_output(tool, "**Checklist DAG Complete Task Tool**: User declined to execute the operation")
end,
},
["output.prompt"] = function(tool, agent)
return string.format(
"Complete DAG task %s in checklist %s?",
tool.args.task_id or "(n/a)",
tool.args.checklist_id or "latest"
)
end,
args = {},
tool = {},
}
local M = {
checklist_dag_list = {
description = "Read the current DAG checklist(s) for the workspace",
callback = ChecklistDagListTool
},
checklist_dag_create = {
description = "Create a DAG-enabled checklist with task dependencies and parallel execution",
callback = ChecklistDagCreateTool
},
checklist_dag_status = {
description = "Read the status of a specific DAG checklist (or latest incomplete)",
callback = ChecklistDagStatusTool
},
checklist_dag_complete_task = {
description = "Mark the current in-progress task as complete in a DAG checklist",
callback = ChecklistDagCompleteTaskTool
},
}
return M
-- checklist/dag_executor.lua
-- Handles parallel execution of independent tasks during DAG creation
local M = {}
-- Execute tasks in parallel using the task tool pattern
function M.execute_tasks_parallel(tasks_to_execute, chat, callback)
if #tasks_to_execute == 0 then
return callback({})
end
local Chat = require("codecompanion.strategies.chat")
local results = {}
local completed_count = 0
local total_count = #tasks_to_execute
local parent = chat
local augroups = {} -- Track all augroups for cleanup
local backup_timers = {} -- Track backup timers for cleanup
local completion_timer = nil -- Debounce timer for completion
local global_timeout = nil -- Global timeout timer
local function cleanup_resources()
-- Clean up augroups
for _, aug_id in pairs(augroups) do
pcall(vim.api.nvim_del_augroup_by_id, aug_id)
end
augroups = {}
-- Clean up backup timers
for _, timer in pairs(backup_timers) do
if timer and not timer:is_closing() then
timer:stop()
timer:close()
end
end
backup_timers = {}
-- Clean up global timeout
if global_timeout and not global_timeout:is_closing() then
global_timeout:stop()
global_timeout:close()
global_timeout = nil
end
end
local function check_completion()
-- Cancel existing timer if any
if completion_timer then
completion_timer:stop()
completion_timer:close()
completion_timer = nil
end
if completed_count >= total_count then
-- Use a small delay to ensure all async operations complete
completion_timer = vim.loop.new_timer()
completion_timer:start(100, 0, vim.schedule_wrap(function()
cleanup_resources()
if completion_timer then
completion_timer:close()
completion_timer = nil
end
callback(results)
end))
end
end
-- Execute each task in parallel
for i, task_info in ipairs(tasks_to_execute) do
local task_idx = task_info.index
local task_text = task_info.text
local system_prompt = [[
You are a fully autonomous agent.
- You cannot interact with a user or ask for clarification.
- You must make all decisions, plan, and execute confidently using any of the available tools.
- You have only one chance to complete the task, so you must keep trying and not end your turn until you have reached a satisfactory result.
- Do not stop or return control until you are confident the task is complete.
]]
local tools_pref = "@{write} @{edit} @{multiedit} @{read} @{grep} @{list} @{glob} "
.. "@{checklist_create} @{checklist_status} @{checklist_complete_task} "
.. "@{webfetch} @{cmd_runner}\n\n"
local messages = {
{ role = "system", content = system_prompt },
{ role = "user", content = tools_pref .. task_text }
}
local child = Chat.new({
messages = messages,
adapter = {
name = "copilot",
model = {
name = "gpt-4.1"
}
}
})
local id = child.id
local aug = vim.api.nvim_create_augroup("DagTaskChat_" .. id, { clear = true })
augroups[task_idx] = aug -- Store augroup for later cleanup
local function latest_llm_reply(child_chat)
-- has to have stopped, could have gotten even from another chat
if child_chat.current_request then
return nil
end
local msgs = child_chat.messages
for j = #msgs, 1, -1 do
local m = msgs[j]
if m.role == "llm" and m.content and m.content ~= "" then
return m.content
end
if m.role == "tool" then
-- tool arrived but no LLM yet: keep waiting
return nil
end
end
return nil
end
local function try_complete_task()
local reply = latest_llm_reply(child)
if not reply then
return false -- Not ready yet
end
-- Double-check we haven't already processed this result
if results[task_idx] then
return true -- Already completed
end
-- Store the result and increment counter
results[task_idx] = reply
completed_count = completed_count + 1
-- Check if all tasks are complete
check_completion()
return true
end
-- Create multiple event handlers for better reliability
vim.api.nvim_create_autocmd("User", {
group = aug,
pattern = "CodeCompanionRequestFinished",
callback = function(ev)
-- Add small delay to ensure message processing is complete
vim.defer_fn(function()
try_complete_task()
end, 50)
end,
})
-- Backup completion check via timer (in case event is missed)
local backup_timer = vim.loop.new_timer()
backup_timers[task_idx] = backup_timer -- Track for cleanup
backup_timer:start(1000, 1000, vim.schedule_wrap(function()
if try_complete_task() then
backup_timer:stop()
backup_timer:close()
backup_timers[task_idx] = nil -- Remove from tracking
end
end))
child:submit()
child.ui:hide()
end
-- Global timeout to prevent hanging (60 seconds)
global_timeout = vim.loop.new_timer()
global_timeout:start(60000, 0, vim.schedule_wrap(function()
cleanup_resources()
-- Return whatever results we have so far
callback(results)
end))
-- If parent UI exists, keep it open
if parent and parent.ui then
parent.ui:open({})
end
end
return M
-- checklist/dag_formatter.lua
-- Handles DAG checklist output formatting and dependency visualization
local dag_types = require('codecompanion.strategies.chat.agents.tools.checklist_lib.dag_types')
local M = {}
---@class DagChecklistFormatter
local DagChecklistFormatter = {}
DagChecklistFormatter.__index = DagChecklistFormatter
-- Create a new DagChecklistFormatter instance
function DagChecklistFormatter.new()
local self = setmetatable({}, DagChecklistFormatter)
return self
end
-- Get status icon for a task status (includes blocked status)
function DagChecklistFormatter:get_status_icon(status)
if status == dag_types.TASK_STATUS.COMPLETED then
return "[✓]"
elseif status == dag_types.TASK_STATUS.IN_PROGRESS then
return "[~]"
elseif status == dag_types.TASK_STATUS.BLOCKED then
return "[!]"
else
return "[ ]"
end
end
-- Format dependency information for a task
function DagChecklistFormatter:format_dependencies(task, task_idx, checklist)
if not task.dependencies or #task.dependencies == 0 then
return ""
end
local dep_strs = {}
for _, dep_idx in ipairs(task.dependencies) do
local dep_task = checklist.tasks[dep_idx]
local dep_status = dep_task and self:get_status_icon(dep_task.status) or "?"
table.insert(dep_strs, string.format("%d%s", dep_idx, dep_status))
end
return string.format(" (deps: %s)", table.concat(dep_strs, ","))
end
-- Get mode icon for a task access mode
function DagChecklistFormatter:get_mode_icon(mode)
if mode == dag_types.TASK_MODE.READ then
return "R"
elseif mode == dag_types.TASK_MODE.WRITE then
return "W"
elseif mode == dag_types.TASK_MODE.READWRITE then
return "RW"
else
return "?"
end
end
-- Format a single DAG checklist for display
function DagChecklistFormatter:format_checklist(checklist, progress)
if not checklist then
return "No checklist data"
end
local output = string.format("DAG CHECKLIST %d: %s\nCreated: %s\n\nTasks:",
checklist.id,
checklist.goal or "No goal",
os.date("%m/%d %H:%M", checklist.created_at)
)
if #checklist.tasks == 0 then
output = output .. "\n (none)"
else
-- Show tasks in execution order if available, otherwise by index
local display_order = checklist.execution_order and #checklist.execution_order > 0
and checklist.execution_order
or {}
-- If no execution order, fall back to index order
if #display_order == 0 then
for i = 1, #checklist.tasks do
table.insert(display_order, i)
end
end
for _, i in ipairs(display_order) do
local task = checklist.tasks[i]
if task then
local status_icon = self:get_status_icon(task.status)
local mode_icon = self:get_mode_icon(task.mode)
local deps_info = self:format_dependencies(task, i, checklist)
output = output .. string.format("\n%d. %s [%s] %s%s",
i, status_icon, mode_icon, task.text, deps_info)
end
end
end
-- Show dependency graph
if checklist.dependency_graph and not vim.tbl_isempty(checklist.dependency_graph) then
output = output .. "\n\nDependency graph:"
for task_idx, dependents in pairs(checklist.dependency_graph) do
if #dependents > 0 then
output = output .. string.format("\n %d enables: %s",
task_idx, table.concat(dependents, ","))
end
end
end
if checklist.log and #checklist.log > 0 then
output = output .. "\n\nLog:"
local sorted_log = vim.deepcopy(checklist.log)
table.sort(sorted_log, function(a, b)
return a.timestamp > b.timestamp
end)
for _, entry in ipairs(sorted_log) do
local details = {}
if entry.subject and entry.subject ~= "" then
table.insert(details, entry.subject)
end
if entry.completed_task_ids and #entry.completed_task_ids > 0 then
table.insert(details, "completed: " .. table.concat(entry.completed_task_ids, ","))
end
if entry.file_paths and #entry.file_paths > 0 then
table.insert(details, "files: " .. table.concat(entry.file_paths, ","))
end
if entry.parallel_results and not vim.tbl_isempty(entry.parallel_results) then
local results = {}
for task_idx, result in pairs(entry.parallel_results) do
local truncated = #result > 30 and (result:sub(1, 27) .. "...") or result
table.insert(results, string.format("%d:%s", task_idx, truncated))
end
table.insert(details, "parallel: " .. table.concat(results, ";"))
end
local detail_str = #details > 0 and (" (" .. table.concat(details, "; ") .. ")") or ""
output = output .. string.format("\n %s %s%s",
os.date("%m/%d %H:%M", entry.timestamp),
entry.action,
detail_str
)
end
end
if progress then
output = output .. string.format("\n\nProgress: %d/%d complete",
progress.completed, progress.total)
if progress.in_progress > 0 then
output = output .. string.format(", %d in progress", progress.in_progress)
end
if progress.blocked > 0 then
output = output .. string.format(", %d blocked", progress.blocked)
end
end
return output
end
-- Format a DAG checklist summary for list view
function DagChecklistFormatter:format_checklist_summary(checklist, progress)
local blocked_str = progress.blocked > 0 and string.format(", %d blocked", progress.blocked) or ""
return string.format("%d. %s (%d/%d%s) - %s [DAG]",
checklist.id,
checklist.goal or "No goal",
progress.completed,
progress.total,
blocked_str,
os.date("%m/%d %H:%M", checklist.created_at)
)
end
-- Format multiple DAG checklists for list view
function DagChecklistFormatter:format_checklist_list(checklists_with_progress)
if vim.tbl_isempty(checklists_with_progress) then
return "No DAG checklists found. Use create tool to make one."
end
-- Sort by creation time (newest first)
local sorted_summaries = vim.deepcopy(checklists_with_progress)
table.sort(sorted_summaries, function(a, b)
return a.checklist.created_at > b.checklist.created_at
end)
local output = string.format("DAG Checklists (%d):\n", #sorted_summaries)
for _, item in ipairs(sorted_summaries) do
output = output .. self:format_checklist_summary(item.checklist, item.progress) .. "\n"
end
return output
end
-- Format task completion result for DAG
function DagChecklistFormatter:format_task_completion(checklist, next_task_idx, next_task)
if next_task then
local deps_info = self:format_dependencies(next_task, next_task_idx, checklist)
return string.format("Next: %d. %s%s", next_task_idx, next_task.text, deps_info)
else
return "DAG checklist complete."
end
end
-- Factory function
function M.new()
return DagChecklistFormatter.new()
end
return M
-- checklist/dag_manager.lua
-- Handles DAG checklist business logic and dependency management
local dag_types = require('codecompanion.strategies.chat.agents.tools.checklist_lib.dag_types')
local storage_module = require('codecompanion.strategies.chat.agents.tools.checklist_lib.storage')
local M = {}
---@class DagChecklistManager
---@field storage ChecklistStorage
---@field checklists table<integer, DagChecklist>
---@field next_id integer
local DagChecklistManager = {}
DagChecklistManager.__index = DagChecklistManager
-- Create a new DagChecklistManager instance
function DagChecklistManager.new(storage)
local self = setmetatable({}, DagChecklistManager)
self.storage = storage or storage_module.new()
self.checklists, self.next_id = self.storage:load()
return self
end
-- Validate dependencies and detect cycles
function DagChecklistManager:validate_dependencies(tasks_data)
local num_tasks = #tasks_data
local visited = {}
local rec_stack = {}
local function has_cycle(task_idx, adj_list)
visited[task_idx] = true
rec_stack[task_idx] = true
local deps = adj_list[task_idx] or {}
for _, dep_idx in ipairs(deps) do
if not visited[dep_idx] then
if has_cycle(dep_idx, adj_list) then
return true
end
elseif rec_stack[dep_idx] then
return true
end
end
rec_stack[task_idx] = false
return false
end
-- Build adjacency list
local adj_list = {}
for i, task_data in ipairs(tasks_data) do
local deps = task_data.dependencies or {}
for _, dep in ipairs(deps) do
if dep < 1 or dep > num_tasks then
return false, string.format("Invalid dependency: task %d depends on non-existent task %d", i, dep)
end
if dep == i then
return false, string.format("Self-dependency detected: task %d depends on itself", i)
end
end
adj_list[i] = deps
end
-- Check for cycles
for i = 1, num_tasks do
if not visited[i] then
if has_cycle(i, adj_list) then
return false, "Circular dependency detected"
end
end
end
return true, nil
end
-- Get topological sort order for task execution
function DagChecklistManager:get_execution_order(tasks)
local num_tasks = #tasks
local in_degree = {}
local adj_list = {}
local order = {}
-- Initialize
for i = 1, num_tasks do
in_degree[i] = 0
adj_list[i] = {}
end
-- Build graph and calculate in-degrees
for i, task in ipairs(tasks) do
local deps = task.dependencies or {}
for _, dep_idx in ipairs(deps) do
table.insert(adj_list[dep_idx], i)
in_degree[i] = in_degree[i] + 1
end
end
-- Kahn's algorithm
local queue = {}
for i = 1, num_tasks do
if in_degree[i] == 0 then
table.insert(queue, i)
end
end
while #queue > 0 do
local current = table.remove(queue, 1)
table.insert(order, current)
for _, neighbor in ipairs(adj_list[current]) do
in_degree[neighbor] = in_degree[neighbor] - 1
if in_degree[neighbor] == 0 then
table.insert(queue, neighbor)
end
end
end
return order
end
-- Get tasks with no dependencies that are safe for parallel execution
function DagChecklistManager:get_independent_tasks(tasks)
local independent = {}
for i, task in ipairs(tasks) do
local deps = task.dependencies or {}
local mode = task.mode or dag_types.TASK_MODE.READWRITE -- Default to safe mode
-- Only allow parallel execution for read-only tasks with no dependencies
if #deps == 0 and mode == dag_types.TASK_MODE.READ then
table.insert(independent, i)
end
end
return independent
end
-- Check if a task's dependencies are satisfied
function DagChecklistManager:are_dependencies_satisfied(checklist, task_idx)
local task = checklist.tasks[task_idx]
if not task or not task.dependencies then
return true
end
for _, dep_idx in ipairs(task.dependencies) do
local dep_task = checklist.tasks[dep_idx]
if not dep_task or dep_task.status ~= dag_types.TASK_STATUS.COMPLETED then
return false
end
end
return true
end
-- Get next tasks that can be started (dependencies satisfied)
function DagChecklistManager:get_ready_tasks(checklist)
local ready = {}
for i, task in ipairs(checklist.tasks) do
if task.status == dag_types.TASK_STATUS.PENDING or task.status == dag_types.TASK_STATUS.BLOCKED then
if self:are_dependencies_satisfied(checklist, i) then
table.insert(ready, i)
end
end
end
return ready
end
-- Update task statuses based on dependency resolution
function DagChecklistManager:update_task_statuses(checklist)
local ready_tasks = self:get_ready_tasks(checklist)
-- Update blocked tasks to pending if dependencies are satisfied
for _, task_idx in ipairs(ready_tasks) do
local task = checklist.tasks[task_idx]
if task.status == dag_types.TASK_STATUS.BLOCKED then
task.status = dag_types.TASK_STATUS.PENDING
end
end
-- Set blocked status for tasks with unsatisfied dependencies
for i, task in ipairs(checklist.tasks) do
if task.status == dag_types.TASK_STATUS.PENDING then
if not self:are_dependencies_satisfied(checklist, i) then
task.status = dag_types.TASK_STATUS.BLOCKED
end
end
end
end
-- Get progress statistics for a DAG checklist
function DagChecklistManager:get_progress(checklist)
local total = #checklist.tasks
local completed = 0
local pending = 0
local in_progress = 0
local blocked = 0
for _, task in ipairs(checklist.tasks) do
if task.status == dag_types.TASK_STATUS.COMPLETED then
completed = completed + 1
elseif task.status == dag_types.TASK_STATUS.PENDING then
pending = pending + 1
elseif task.status == dag_types.TASK_STATUS.IN_PROGRESS then
in_progress = in_progress + 1
elseif task.status == dag_types.TASK_STATUS.BLOCKED then
blocked = blocked + 1
end
end
return {
total = total,
completed = completed,
pending = pending,
in_progress = in_progress,
blocked = blocked
}
end
-- Get the latest incomplete checklist
function DagChecklistManager:get_latest_incomplete()
local latest = nil
for _, checklist in pairs(self.checklists) do
local progress = self:get_progress(checklist)
if progress.completed < progress.total then
if not latest or checklist.created_at > latest.created_at then
latest = checklist
end
end
end
return latest
end
-- Get a checklist by ID or latest incomplete if no ID provided
function DagChecklistManager:get_checklist(id)
if not id or id == "" then
local latest = self:get_latest_incomplete()
if not latest then
return nil, "No incomplete checklist found"
end
return latest, nil
end
local checklist_id = tonumber(id)
if not checklist_id then
return nil, "Invalid checklist ID format"
end
local checklist = self.checklists[checklist_id]
if not checklist then
return nil, "Checklist not found"
end
return checklist, nil
end
-- Get all checklists as an array
function DagChecklistManager:get_all_checklists()
local checklists_array = {}
for _, checklist in pairs(self.checklists) do
table.insert(checklists_array, checklist)
end
return checklists_array
end
-- Create a new DAG checklist
function DagChecklistManager:create_checklist(goal, tasks_data, subject, body, parallel_results)
local id = self.next_id
self.next_id = self.next_id + 1
-- Validate dependencies
local valid, err = self:validate_dependencies(tasks_data)
if not valid then
return nil, err
end
local checklist = {
id = id,
goal = goal,
created_at = os.time(),
tasks = {},
log = {
{
action = dag_types.LOG_ACTIONS.CREATE,
subject = subject,
body = body,
timestamp = os.time(),
parallel_results = parallel_results
}
},
dependency_graph = {},
execution_order = {}
}
-- Process and add tasks
for i, task_data in ipairs(tasks_data or {}) do
local task_text = task_data.text or task_data
if task_text and task_text:match("%S") then
local task = {
text = task_text:gsub("^%s*[-*+]?%s*", ""),
dependencies = task_data.dependencies or {},
mode = task_data.mode or dag_types.TASK_MODE.READWRITE, -- Default to safe mode
created_at = os.time()
}
-- Handle parallel execution results (mark as completed but don't store result in task)
if parallel_results and parallel_results[i] then
task.status = dag_types.TASK_STATUS.COMPLETED
task.completed_at = os.time()
else
task.status = dag_types.TASK_STATUS.PENDING
end
table.insert(checklist.tasks, task)
end
end
-- Calculate execution order
checklist.execution_order = self:get_execution_order(checklist.tasks)
-- Update task statuses based on dependencies
self:update_task_statuses(checklist)
-- Set first ready task to in_progress
local ready_tasks = self:get_ready_tasks(checklist)
if #ready_tasks > 0 then
checklist.tasks[ready_tasks[1]].status = dag_types.TASK_STATUS.IN_PROGRESS
end
self.checklists[id] = checklist
self:save()
return checklist, nil
end
-- Find the next in-progress task
function DagChecklistManager:get_next_in_progress_task(checklist)
for i, task in ipairs(checklist.tasks) do
if task.status == dag_types.TASK_STATUS.IN_PROGRESS then
return i, task
end
end
return nil, nil
end
-- Extract file paths from agent references
local function extract_file_paths_from_refs(agent)
local paths = {}
local seen = {}
if agent and agent.chat and agent.chat.refs then
for _, ref in pairs(agent.chat.refs) do
local path = nil
if ref.path then
path = ref.path
elseif ref.bufnr then
path = vim.api.nvim_buf_get_name(ref.bufnr)
end
if path and not seen[path] then
table.insert(paths, path)
seen[path] = true
end
end
end
return paths
end
-- Complete a task in DAG checklist
function DagChecklistManager:complete_task(agent, checklist, task_id, subject, body)
local task_id_num = tonumber(task_id)
if not task_id_num or task_id_num < 1 or task_id_num > #checklist.tasks then
return false, "Invalid task ID"
end
local task = checklist.tasks[task_id_num]
if task.status ~= dag_types.TASK_STATUS.IN_PROGRESS then
return false, "Only tasks that are in progress can be completed"
end
task.status = dag_types.TASK_STATUS.COMPLETED
task.completed_at = os.time()
-- Update task statuses based on new completion
self:update_task_statuses(checklist)
-- Set next ready task to in_progress
local ready_tasks = self:get_ready_tasks(checklist)
local next_in_progress = nil
for _, ready_idx in ipairs(ready_tasks) do
if checklist.tasks[ready_idx].status == dag_types.TASK_STATUS.PENDING then
checklist.tasks[ready_idx].status = dag_types.TASK_STATUS.IN_PROGRESS
next_in_progress = ready_idx
break
end
end
table.insert(checklist.log, {
action = dag_types.LOG_ACTIONS.COMPLETE_TASK,
subject = subject,
body = body,
timestamp = os.time(),
file_paths = extract_file_paths_from_refs(agent),
completed_task_ids = { task_id_num },
started_task_id = next_in_progress,
})
self:save()
return true, "Task marked complete"
end
-- Save checklists to storage
function DagChecklistManager:save()
return self.storage:save(self.checklists, self.next_id)
end
-- Reload checklists from storage
function DagChecklistManager:reload()
self.checklists, self.next_id = self.storage:load()
end
-- Factory function
function M.new(storage)
return DagChecklistManager.new(storage)
end
return M
-- checklist/dag_types.lua
-- Type definitions for DAG checklist system
---@class DagChecklistTask
---@field text string
---@field status "pending"|"in_progress"|"completed"|"blocked"
---@field dependencies integer[] -- Array of task indices that must complete first
---@field mode "read"|"write"|"readwrite" -- Access mode for safety during parallel execution
---@field created_at integer
---@field completed_at? integer
---@class DagChecklistLogEntry
---@field action string
---@field subject string
---@field body string
---@field timestamp integer
---@field file_paths? string[]
---@field completed_task_ids? integer[]
---@field started_task_id? integer
---@field parallel_results? table<integer, string> -- Results from parallel task execution
---@class DagChecklist
---@field id integer
---@field goal string
---@field created_at integer
---@field tasks DagChecklistTask[]
---@field log DagChecklistLogEntry[]
---@field dependency_graph table<integer, integer[]> -- task_id -> dependent_task_ids
---@field execution_order integer[] -- Topologically sorted order for execution
---@class DagChecklistProgress
---@field total integer
---@field completed integer
---@field pending integer
---@field in_progress integer
---@field blocked integer
local M = {}
-- Task status constants (extends base types)
M.TASK_STATUS = {
PENDING = "pending",
IN_PROGRESS = "in_progress",
COMPLETED = "completed",
BLOCKED = "blocked" -- New status for tasks waiting on dependencies
}
-- Log action constants (extends base types)
M.LOG_ACTIONS = {
CREATE = "create",
COMPLETE_TASK = "complete_task",
UPDATE = "update",
PARALLEL_EXECUTION = "parallel_execution" -- New action for parallel task execution
}
-- Task access mode constants for safety
M.TASK_MODE = {
READ = "read", -- Safe for parallel execution - read-only operations
WRITE = "write", -- Requires context - modifies files/state
READWRITE = "readwrite" -- Requires context - both reads and writes
}
-- Dependency resolution constants
M.DEPENDENCY_STATUS = {
SATISFIED = "satisfied",
UNSATISFIED = "unsatisfied",
CIRCULAR = "circular"
}
return M
-- checklist/shared_types.lua
-- Minimal shared type definitions for standardized command responses
---@class StandardToolResponse
---@field status "success" | "error"
---@field data any -- Actual data varies by tool, error message string on error
local M = {}
-- Tool response status constants
M.RESPONSE_STATUS = {
SUCCESS = "success",
ERROR = "error"
}
-- Create a standardized tool response
---@param status "success" | "error"
---@param data any
---@return StandardToolResponse
function M.create_response(status, data)
return {
status = status,
data = data
}
end
return M
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment