Skip to content

Instantly share code, notes, and snippets.

@slapers
Created November 18, 2017 18:19
Show Gist options
  • Save slapers/4b3e58a38e4c1203acbe193e950611e6 to your computer and use it in GitHub Desktop.
Save slapers/4b3e58a38e4c1203acbe193e950611e6 to your computer and use it in GitHub Desktop.
Build a direct acyclic graph to run a simple workflow in elixir
defmodule WorkStep do
defstruct [:name, :in, :out, :mod, :fun, :arg]
require Logger
def add_to_graph(%Graph{} = graph, %__MODULE__{} = step) do
graph
|> Graph.add_vertex(step)
|> Graph.add_edges(step |> edges_in)
|> Graph.add_edges(step |> edges_out)
end
def run(%WorkStep{} = step, vars \\ []) do
args = parse_args(step.arg, vars)
[{step.out, apply(step.mod, step.fun, args)}]
end
defp edges_in(%__MODULE__{} = step) do
step.in |> Enum.map(&({&1, step}))
end
defp edges_out(%__MODULE__{} = step) do
[{step, step.out}]
end
defp parse_args(args, vars)
when is_list(args)
when is_list(vars)
do
parse_args(args, vars, [])
end
defp parse_args([], _vars, acc), do: acc |> Enum.reverse
defp parse_args([{:in, varname} | t], vars, acc) do
parse_args(t, vars, [Keyword.get(vars, varname) | acc])
end
defp parse_args([h | t], vars, acc) do
parse_args(t, vars, [h | acc])
end
end
defmodule Workflow do
defstruct [:name, :in, :out, :steps]
def to_graph(%__MODULE__{} = wf) do
wf.steps
|> Enum.reduce(Graph.new(type: :directed), fn(step, acc) ->
WorkStep.add_to_graph(acc, step)
end)
end
end
defmodule WorkflowExecution do
defstruct [:workflow, :graph, :vars]
def new(%Workflow{} = wf, vars \\ []) do
we = %__MODULE__{
workflow: wf,
graph: wf |> Workflow.to_graph,
vars: vars
}
we.graph
|> Graph.is_acyclic?
|> case do
true -> we
false -> raise("Workflow should be a DAG")
end
end
def run(%WorkflowExecution{} = we, vars \\ []) do
run_linear(%{we | vars: Keyword.merge(we.vars, vars)})
end
defp run_linear(we) do
we.graph
|> Graph.topsort
|> run_linear(we)
end
defp run_linear([], we), do: we.vars
defp run_linear([atm], we) when is_atom(atm), do: we.vars
defp run_linear([%WorkStep{} = step | t], we) do
vars = WorkStep.run(step, we.vars)
run_linear(t, %{we | vars: Keyword.merge(we.vars, vars)})
end
defp run_linear([varname | t], we) do
case Keyword.has_key?(we.vars, varname) do
true -> run_linear(t, we)
false -> raise("Workflow has missing var #{inspect varname}")
end
end
end
defmodule WorkFlowTest do
use ExUnit.Case, async: false
@sample %Workflow{
name: "Create 2 random numbers, sum them and add one",
steps: [
%WorkStep{
name: "Create random :a",
in: [],
out: :a,
mod: :random,
fun: :uniform,
arg: []
},
%WorkStep{
name: "Sum :a and :b",
in: [:a, :b],
out: :sum,
mod: Kernel,
fun: :+,
arg: [{:in, :a}, {:in, :b}]
},
%WorkStep{
name: "Add one to :sum",
in: [:sum],
out: :sumplusone,
mod: Kernel,
fun: :+,
arg: [{:in, :sum}, 1]
},
%WorkStep{
name: "Create random :b",
in: [],
out: :b,
mod: :random,
fun: :uniform,
arg: []
}
]
}
test "execute linearly" do
assert([
a: 0.4435846174457203,
b: 0.7230402056221108,
sum: 1.166624823067831,
sumplusone: 2.1666248230678313
]) === @sample
|> WorkflowExecution.new
|> WorkflowExecution.run
end
end
@slapers
Copy link
Author

slapers commented Nov 18, 2017

Execution looks like:
screen shot 2017-11-18 at 19 34 39

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment