Last active
November 26, 2025 12:19
-
-
Save ismasan/5efc45e2eaaae0bc3aa6069bc50e2c36 to your computer and use it in GitHub Desktop.
Example Sourced flow with subtasks as separate streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # The main worflow | |
| class MarksFlow < Sourced::Actor | |
| include Sourced::CommandMethods | |
| state do |id| | |
| { id:, status: :new, tasks: {} } | |
| end | |
| command :start_step1 do |state, cmd| | |
| raise "invalid transition" unless state[:status] == :new | |
| event :step1_complete | |
| end | |
| event :step1_complete do |state, event| | |
| state[:status] = :step1_complete | |
| end | |
| command :start_step2 do |state, cmd| | |
| raise "invalid transition" unless state[:status] == :step1_complete | |
| event :step2_complete | |
| end | |
| event :step2_complete do |state, event| | |
| state[:status] = :step2_complete | |
| end | |
| # React to step2_complete and dispatch command to 3 sub-task streams | |
| # include this stream's ID in the payload | |
| # so that the tasks know what stream to reply to | |
| # This could be better! | |
| reaction :step2_complete do |state, event| | |
| wf_id = state[:id] | |
| dispatch(SubTask::Start, wf_id:, name: 'Task1').to("#{event.stream_id}-task1") | |
| dispatch(SubTask::Start, wf_id:, name: 'Task2').to("#{event.stream_id}-task2") | |
| dispatch(SubTask::Start, wf_id:, name: 'Task3').to("#{event.stream_id}-task3") | |
| end | |
| # Sub-tasks notify back to the main flow | |
| # with this command. Here we check how many tasks have reported back | |
| # and if all of them have, we progress to the next step (complete) | |
| command :notify_task, name: String do |state, cmd| | |
| puts "invalid transition" unless state[:status] == :step2_complete | |
| return if state[:tasks].key?(cmd.payload.name) | |
| event :task_notified, cmd.payload | |
| if state[:tasks].keys.uniq.size == 3 | |
| puts "Complete!" | |
| event :complete | |
| end | |
| end | |
| event :task_notified, name: String do |state, event| | |
| puts "Task done: #{event.payload.name}" | |
| state[:tasks][event.payload.name] = true | |
| end | |
| event :complete do |state, event| | |
| puts event.inspect | |
| state[:status] = :complete | |
| end | |
| end | |
| # Sub-task spawned by the main workflow | |
| class SubTask < Sourced::Actor | |
| state do |id| | |
| { id:, name: nil, wf_id: nil } | |
| end | |
| command :start, wf_id: String, name: String do |state, cmd| | |
| event :started, cmd.payload | |
| end | |
| event :started, wf_id: String, name: String do |state, event| | |
| state[:name] = event.payload.name | |
| state[:wf_id] = event.payload.wf_id | |
| end | |
| reaction :started do |state, event| | |
| sleep rand(5) | |
| dispatch(MarksFlow::NotifyTask, name: state[:name]).to(state[:wf_id]) | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment