Skip to content

Instantly share code, notes, and snippets.

@ismasan
Last active November 26, 2025 12:19
Show Gist options
  • Select an option

  • Save ismasan/5efc45e2eaaae0bc3aa6069bc50e2c36 to your computer and use it in GitHub Desktop.

Select an option

Save ismasan/5efc45e2eaaae0bc3aa6069bc50e2c36 to your computer and use it in GitHub Desktop.
Example Sourced flow with subtasks as separate streams
# The main worflow
class MarksFlow < Sourced::Actor
include Sourced::CommandMethods
state do |id|
{ id:, status: :new, tasks: {} }
end
command :start_step1 do |state, cmd|
raise "invalid transition" unless state[:status] == :new
event :step1_complete
end
event :step1_complete do |state, event|
state[:status] = :step1_complete
end
command :start_step2 do |state, cmd|
raise "invalid transition" unless state[:status] == :step1_complete
event :step2_complete
end
event :step2_complete do |state, event|
state[:status] = :step2_complete
end
# React to step2_complete and dispatch command to 3 sub-task streams
# include this stream's ID in the payload
# so that the tasks know what stream to reply to
# This could be better!
reaction :step2_complete do |state, event|
wf_id = state[:id]
dispatch(SubTask::Start, wf_id:, name: 'Task1').to("#{event.stream_id}-task1")
dispatch(SubTask::Start, wf_id:, name: 'Task2').to("#{event.stream_id}-task2")
dispatch(SubTask::Start, wf_id:, name: 'Task3').to("#{event.stream_id}-task3")
end
# Sub-tasks notify back to the main flow
# with this command. Here we check how many tasks have reported back
# and if all of them have, we progress to the next step (complete)
command :notify_task, name: String do |state, cmd|
puts "invalid transition" unless state[:status] == :step2_complete
return if state[:tasks].key?(cmd.payload.name)
event :task_notified, cmd.payload
if state[:tasks].keys.uniq.size == 3
puts "Complete!"
event :complete
end
end
event :task_notified, name: String do |state, event|
puts "Task done: #{event.payload.name}"
state[:tasks][event.payload.name] = true
end
event :complete do |state, event|
puts event.inspect
state[:status] = :complete
end
end
# Sub-task spawned by the main workflow
class SubTask < Sourced::Actor
state do |id|
{ id:, name: nil, wf_id: nil }
end
command :start, wf_id: String, name: String do |state, cmd|
event :started, cmd.payload
end
event :started, wf_id: String, name: String do |state, event|
state[:name] = event.payload.name
state[:wf_id] = event.payload.wf_id
end
reaction :started do |state, event|
sleep rand(5)
dispatch(MarksFlow::NotifyTask, name: state[:name]).to(state[:wf_id])
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment