Created
November 18, 2017 18:19
-
-
Save slapers/4b3e58a38e4c1203acbe193e950611e6 to your computer and use it in GitHub Desktop.
Build a direct acyclic graph to run a simple workflow in elixir
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule WorkStep do | |
defstruct [:name, :in, :out, :mod, :fun, :arg] | |
require Logger | |
def add_to_graph(%Graph{} = graph, %__MODULE__{} = step) do | |
graph | |
|> Graph.add_vertex(step) | |
|> Graph.add_edges(step |> edges_in) | |
|> Graph.add_edges(step |> edges_out) | |
end | |
def run(%WorkStep{} = step, vars \\ []) do | |
args = parse_args(step.arg, vars) | |
[{step.out, apply(step.mod, step.fun, args)}] | |
end | |
defp edges_in(%__MODULE__{} = step) do | |
step.in |> Enum.map(&({&1, step})) | |
end | |
defp edges_out(%__MODULE__{} = step) do | |
[{step, step.out}] | |
end | |
defp parse_args(args, vars) | |
when is_list(args) | |
when is_list(vars) | |
do | |
parse_args(args, vars, []) | |
end | |
defp parse_args([], _vars, acc), do: acc |> Enum.reverse | |
defp parse_args([{:in, varname} | t], vars, acc) do | |
parse_args(t, vars, [Keyword.get(vars, varname) | acc]) | |
end | |
defp parse_args([h | t], vars, acc) do | |
parse_args(t, vars, [h | acc]) | |
end | |
end | |
defmodule Workflow do | |
defstruct [:name, :in, :out, :steps] | |
def to_graph(%__MODULE__{} = wf) do | |
wf.steps | |
|> Enum.reduce(Graph.new(type: :directed), fn(step, acc) -> | |
WorkStep.add_to_graph(acc, step) | |
end) | |
end | |
end | |
defmodule WorkflowExecution do | |
defstruct [:workflow, :graph, :vars] | |
def new(%Workflow{} = wf, vars \\ []) do | |
we = %__MODULE__{ | |
workflow: wf, | |
graph: wf |> Workflow.to_graph, | |
vars: vars | |
} | |
we.graph | |
|> Graph.is_acyclic? | |
|> case do | |
true -> we | |
false -> raise("Workflow should be a DAG") | |
end | |
end | |
def run(%WorkflowExecution{} = we, vars \\ []) do | |
run_linear(%{we | vars: Keyword.merge(we.vars, vars)}) | |
end | |
defp run_linear(we) do | |
we.graph | |
|> Graph.topsort | |
|> run_linear(we) | |
end | |
defp run_linear([], we), do: we.vars | |
defp run_linear([atm], we) when is_atom(atm), do: we.vars | |
defp run_linear([%WorkStep{} = step | t], we) do | |
vars = WorkStep.run(step, we.vars) | |
run_linear(t, %{we | vars: Keyword.merge(we.vars, vars)}) | |
end | |
defp run_linear([varname | t], we) do | |
case Keyword.has_key?(we.vars, varname) do | |
true -> run_linear(t, we) | |
false -> raise("Workflow has missing var #{inspect varname}") | |
end | |
end | |
end | |
defmodule WorkFlowTest do | |
use ExUnit.Case, async: false | |
@sample %Workflow{ | |
name: "Create 2 random numbers, sum them and add one", | |
steps: [ | |
%WorkStep{ | |
name: "Create random :a", | |
in: [], | |
out: :a, | |
mod: :random, | |
fun: :uniform, | |
arg: [] | |
}, | |
%WorkStep{ | |
name: "Sum :a and :b", | |
in: [:a, :b], | |
out: :sum, | |
mod: Kernel, | |
fun: :+, | |
arg: [{:in, :a}, {:in, :b}] | |
}, | |
%WorkStep{ | |
name: "Add one to :sum", | |
in: [:sum], | |
out: :sumplusone, | |
mod: Kernel, | |
fun: :+, | |
arg: [{:in, :sum}, 1] | |
}, | |
%WorkStep{ | |
name: "Create random :b", | |
in: [], | |
out: :b, | |
mod: :random, | |
fun: :uniform, | |
arg: [] | |
} | |
] | |
} | |
test "execute linearly" do | |
assert([ | |
a: 0.4435846174457203, | |
b: 0.7230402056221108, | |
sum: 1.166624823067831, | |
sumplusone: 2.1666248230678313 | |
]) === @sample | |
|> WorkflowExecution.new | |
|> WorkflowExecution.run | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Execution looks like:
