Skip to content

Instantly share code, notes, and snippets.

@sorentwo
Created February 18, 2020 21:54
Show Gist options
  • Save sorentwo/f45dace3c2060eb6fca027571e10aa2c to your computer and use it in GitHub Desktop.
Save sorentwo/f45dace3c2060eb6fca027571e10aa2c to your computer and use it in GitHub Desktop.
Oban Bi-Directional Workflow

This models a generic "workflow" module that can resolve dependencies, fan-in and fan-out in a declarative way.

Some general notes about the approach:

  • A ref is used as a key to glue jobs together (any unique argument would do)
  • Args are passed through to each worker to customize worker behavior or output (i.e. to disable quoting)
  • Each job is guaranteed to only run once per-hour (the unique period can be changed to anything)
  • Starting any job in the flow will resolve the full dependency graph (this example models a fully connected graph)
  • Dependencies are checked in parallel, while dependencies execute the depenent will wait for a maximum of 60s (could be any sane value)

There are some demonstration specific bits of code in here as well:

  • The backoff period is hard coded to 1s
  • Each job sleeps randomly for 100ms to 5s to simulate "work"
  • Each job will fail randomly 20% of the time to simulate flakiness and resiliency
# Output, starting from Worker.D
[stage: "D", label: "executing", ref: 1]
[stage: "A", label: "executing", ref: 1]
[stage: "B", label: "executing", ref: 1]
[stage: "A", label: "completed", ref: 1]
[stage: "C", label: "executing", ref: 1]
[stage: "B", label: "completed", ref: 1]
[stage: "G", label: "executing", ref: 1]
[stage: "D", label: "completed", ref: 1]
[stage: "E", label: "executing", ref: 1]
[stage: "E", label: "completed", ref: 1]
[stage: "F", label: "executing", ref: 1]
[stage: "C", label: "completed", ref: 1]
[stage: "H", label: "executing", ref: 1]
[stage: "F", label: "completed", ref: 1]
[stage: "H", label: "executing", ref: 1]
[stage: "G", label: "executing", ref: 1]
[stage: "H", label: "executing", ref: 1]
[stage: "G", label: "executing", ref: 1]
[stage: "H", label: "completed", ref: 1]
[stage: "G", label: "completed", ref: 1]
defmodule Workflow.A do
use Workflow,
depends_on: [],
flows_into: [Workflow.B, Workflow.D]
end
defmodule Workflow.B do
use Workflow,
depends_on: [Workflow.A],
flows_into: [Workflow.C, Workflow.G]
end
defmodule Workflow.C do
use Workflow,
depends_on: [Workflow.B],
flows_into: []
end
defmodule Workflow.D do
use Workflow,
depends_on: [Workflow.A],
flows_into: [Workflow.E, Workflow.F, Workflow.H]
end
defmodule Workflow.E do
use Workflow,
depends_on: [Workflow.D],
flows_into: [Workflow.G, Workflow.H]
end
defmodule Workflow.F do
use Workflow,
depends_on: [Workflow.D],
flows_into: [Workflow.G]
end
defmodule Workflow.G do
use Workflow,
depends_on: [Workflow.E],
flows_into: []
end
defmodule Workflow.H do
use Workflow,
depends_on: [Workflow.D, Workflow.E],
flows_into: []
end
defmodule Workflow do
@moduledoc false
alias Oban.Job
alias Oban.Test.Repo
import Ecto.Query
defmacro __using__(opts) do
quote location: :keep do
use Oban.Worker,
unquote(opts)
|> Keyword.put_new(:unique, period: 60 * 60)
|> Keyword.delete(:depends_on)
|> Keyword.delete(:flows_into)
import Workflow
@depends_on Keyword.get(unquote(opts), :depends_on, [])
@flows_into Keyword.get(unquote(opts), :flows_into, [])
@impl Oban.Worker
def perform(%{"ref" => ref} = args, _job) do
print_stage(__MODULE__, "executing", ref)
with_deps(@depends_on, @flows_into, args, &random_sleep/0)
print_stage(__MODULE__, "completed", ref)
:ok
end
# Hard coded for demonstration purposes
@impl Oban.Worker
def backoff(_attempt), do: 1
end
end
# Dependency Helpers
def with_deps(depends_on, flows_into, args, fun) do
wait_for_deps(depends_on, args)
fun.()
insert_deps(flows_into, args)
end
defp wait_for_deps(workers, args) do
workers
|> Task.async_stream(&wait_loop(&1, args), ordered: false, timeout: 60_000)
|> Enum.to_list()
end
defp wait_loop(worker, args, interval \\ 1_000) do
case dep_state(worker, args) do
# The job doesn't exist and needs to be enqueued
nil ->
insert_deps([worker], args)
Process.sleep(interval)
wait_loop(worker, args)
# The job is finished, move along
"completed" ->
:ok
# The job is executing or currently scheduled, wait a bit
_ ->
Process.sleep(interval)
wait_loop(worker, args)
end
end
defp dep_state(worker, %{"ref" => ref}) do
worker =
worker
|> to_string()
|> String.trim_leading("Elixir.")
Job
|> where([j], j.worker == ^worker)
|> where([j], fragment("?->'ref' = ?", j.args, ^ref))
|> select([j], j.state)
|> limit(1)
|> Repo.one()
end
defp insert_deps(flows_into, args) do
for worker <- flows_into do
args
|> worker.new()
|> Oban.insert!()
end
end
# Demonstration Helpers
@min_sleep 100
@max_sleep 5_000
@raise_chance 20
def random_sleep(min \\ @min_sleep, max \\ @max_sleep) do
maybe_raise!()
min..max
|> Enum.random()
|> Process.sleep()
end
defp maybe_raise!(chance \\ @raise_chance) do
if :rand.uniform(100) < chance, do: raise(RuntimeError, "Something went wrong!")
end
def print_stage(module, label, ref) do
stage = module |> Module.split() |> List.last()
IO.inspect(stage: stage, label: label, ref: ref)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment