|
defmodule Workflow do |
|
@moduledoc false |
|
|
|
alias Oban.Job |
|
alias Oban.Test.Repo |
|
|
|
import Ecto.Query |
|
|
|
defmacro __using__(opts) do |
|
quote location: :keep do |
|
use Oban.Worker, |
|
unquote(opts) |
|
|> Keyword.put_new(:unique, period: 60 * 60) |
|
|> Keyword.delete(:depends_on) |
|
|> Keyword.delete(:flows_into) |
|
|
|
import Workflow |
|
|
|
@depends_on Keyword.get(unquote(opts), :depends_on, []) |
|
@flows_into Keyword.get(unquote(opts), :flows_into, []) |
|
|
|
@impl Oban.Worker |
|
def perform(%{"ref" => ref} = args, _job) do |
|
print_stage(__MODULE__, "executing", ref) |
|
with_deps(@depends_on, @flows_into, args, &random_sleep/0) |
|
print_stage(__MODULE__, "completed", ref) |
|
|
|
:ok |
|
end |
|
|
|
# Hard coded for demonstration purposes |
|
@impl Oban.Worker |
|
def backoff(_attempt), do: 1 |
|
end |
|
end |
|
|
|
# Dependency Helpers |
|
|
|
def with_deps(depends_on, flows_into, args, fun) do |
|
wait_for_deps(depends_on, args) |
|
fun.() |
|
insert_deps(flows_into, args) |
|
end |
|
|
|
defp wait_for_deps(workers, args) do |
|
workers |
|
|> Task.async_stream(&wait_loop(&1, args), ordered: false, timeout: 60_000) |
|
|> Enum.to_list() |
|
end |
|
|
|
defp wait_loop(worker, args, interval \\ 1_000) do |
|
case dep_state(worker, args) do |
|
# The job doesn't exist and needs to be enqueued |
|
nil -> |
|
insert_deps([worker], args) |
|
|
|
Process.sleep(interval) |
|
|
|
wait_loop(worker, args) |
|
|
|
# The job is finished, move along |
|
"completed" -> |
|
:ok |
|
|
|
# The job is executing or currently scheduled, wait a bit |
|
_ -> |
|
Process.sleep(interval) |
|
|
|
wait_loop(worker, args) |
|
end |
|
end |
|
|
|
defp dep_state(worker, %{"ref" => ref}) do |
|
worker = |
|
worker |
|
|> to_string() |
|
|> String.trim_leading("Elixir.") |
|
|
|
Job |
|
|> where([j], j.worker == ^worker) |
|
|> where([j], fragment("?->'ref' = ?", j.args, ^ref)) |
|
|> select([j], j.state) |
|
|> limit(1) |
|
|> Repo.one() |
|
end |
|
|
|
defp insert_deps(flows_into, args) do |
|
for worker <- flows_into do |
|
args |
|
|> worker.new() |
|
|> Oban.insert!() |
|
end |
|
end |
|
|
|
# Demonstration Helpers |
|
|
|
@min_sleep 100 |
|
@max_sleep 5_000 |
|
@raise_chance 20 |
|
|
|
def random_sleep(min \\ @min_sleep, max \\ @max_sleep) do |
|
maybe_raise!() |
|
|
|
min..max |
|
|> Enum.random() |
|
|> Process.sleep() |
|
end |
|
|
|
defp maybe_raise!(chance \\ @raise_chance) do |
|
if :rand.uniform(100) < chance, do: raise(RuntimeError, "Something went wrong!") |
|
end |
|
|
|
def print_stage(module, label, ref) do |
|
stage = module |> Module.split() |> List.last() |
|
|
|
IO.inspect(stage: stage, label: label, ref: ref) |
|
end |
|
end |