So what is GenStage? From the official documentation, it is a "specification and computational flow for Elixir", but what does that mean to us?
There is a lot to something that can be described as that vague, and here we'll take a dive in and build something on top of it to understand its goals.
We could go into the technical and theoretical implications of this, but instead lets try a pragmatic approach to really just get it to work.
First, Let's imagine we have a server that constantly emits numbers. It starts at the state of the number we give it, then counts up in one from there onward. This is what we would call our producer. Each time it emits a number, this is an event, and we want to handle it with a consumer. A consumer simply takes what a producer emits and does something to it. In our case, we will display the count. There is a lot more to GenStage at a technical and applied level, but we will build up on the specifics and definitions further in later lessons, for now we just want a running example we can build up on.
We'll begin by generating a simple project that has a supervision tree:
$ mix new genstage_example --sup
$ cd genstage_example
Let's set up some basic things for the future of our application.
Since GenStage is generally used as a transformation pipeline, lets imagine we have a background worker of some sort.
This worker will need to persist whatever it changes, so we should get a database set up, but we can worry about that in a later lesson.
To start, all we need to do is add gen_stage
to our deps.
. . .
defp deps do
[
{:gen_stage, "~> 0.7},
]
end
. . .
Now, we should fetch our dependencies and compile before we start setup:
$ mix do deps.get, compile
Lets build a producer, our simple beginning building block to help us utilize this new tool!
To get started what we want to do is create a producer that emits a constant stream of events for our consumer to handle.
This is quite simple with the rudimentary example of a counter.
Let's create a namespaced directory under lib
and then go from there, this way our module naming matches our names of the modules themselves:
$ mkdir lib/genstage_example
$ touch lib/genstage_example/producer.ex
Now we can add the code:
defmodule GenstageExample.Producer do
alias Experimental.GenStage
use GenStage
def start_link do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
# naming allows us to handle failure
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand - 1)
{:noreply, events, (state + demand)}
end
end
Let's break this down line by line. To begin with, we have our initial declarations:
. . .
defmodule GenstageExample.Producer do
alias Experimental.GenStage
use GenStage
. . .
What this does is a couple simple things.
First, we declare our module, and soon after we alias Experimental.GenStage
.
This is simply because we will be calling it more than once and makes it more convenient.
The use GenStage
line is much akin to use GenServer
.
This line allows us to import the default behaviour and functions to save us from a large amount of boilerplate.
If we go further, we see the first two primary functions for startup:
. . .
def start_link do
GenStage.start_link(__MODULE__, :the_state_doesnt_matter)
end
def init(counter) do
{:producer, counter}
end
. . .
These two functions offer a very simple start.
First, we have our standard start_link/0
function.
Inside here, we useGenStage.start_link/
beginning with our argument __MODULE__
, which will give it the name of our current module.
Next, we set a state, which is arbitrary in this case, and can be any value.
The __MODULE__
argument is used for name registration like any other module.
The second argument is the arguments, which in this case are meaningless as we do not care about it.
In init/1
we simply set the counter as our state, and label ourselves as a producer.
Finally, we have where the real meat of our code's functionality is:
. . .
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand - 1)
{:noreply, events, (state + demand)}
end
. . .
handle_demand/2
must be implemented by all producer type modules that utilize GenStage.
In this case, we are simply sending out an incrementing counter.
This might not make a ton of sense until we build our consumer, so lets move on to that now.
The consumer will handle the events that are broadcasted out by our producer. For now, we wont worry about things like broadcast strategies, or what the internals are truly doing. We'll start by showing all the code and then break it down.
defmodule GenstageExample.Consumer do
alias Experimental.GenStage
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [GenstageExample.Producer]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect {self(), event, state}
end
{:noreply, [], state}
end
end
To start, let's look at the beginning functions just like last time:
defmodule GenstageExample.Consumer do
alias Experimental.GenStage
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [GenstageExample.Producer]}
end
. . .
To begin, much like in our producer, we set up our start_link/0
and init/1
functions.
In start_link
we simple register the module name like last time, and set a state.
The state is arbitrary for the consumer, and can be literally whatever we please, in this case :state_doesnt_matter
.
In init/1
we simply take the state and set up our expected tuple.
It expected use to register our :consumer
atom first, then the state given.
Our subscribe_to
clause is optional.
What this does is subscribe us to our producer module.
The reason for this is if something crashes, it will simply attempt to re-subscribe and then resume receiving emitted events.
. . .
def handle_events(events, _from, state) do
for event <- events do
IO.inspect {self(), event, state}
end
{:noreply, [], state}
end
. . .
This is the meat of our consumer, handle_events/3
.
handle_events/3
must be implemented by any consumer
or producer_consumer
type of GenStage module.
What this does for us is quite simple.
We take a list of events, and iterate through these.
From there, we inspect the pid
of our consumer, the event (in this case the current count), and the state.
After that, we don't reply because we are a consumer and do not handle anything, and we don't emit events to the second argument is empty, then we simply pass on the state.
To get all of this to work we only have to make one simple change.
Open up lib/genstage_example.ex
and we can add them as workers and they will automatically start with our application:
. . .
children = [
worker(GenstageExample.Producer, []),
worker(GenstageExample.Consumer, []),
]
. . .
With this, if things are all correct, we can run IEx and we should see everything working:
iex(1)> {#PID<0.205.0>, 0, :state_doesnt_matter}
{#PID<0.205.0>, 1, :state_doesnt_matter}
{#PID<0.205.0>, 2, :state_doesnt_matter}
{#PID<0.205.0>, 3, :state_doesnt_matter}
{#PID<0.205.0>, 4, :state_doesnt_matter}
{#PID<0.205.0>, 5, :state_doesnt_matter}
{#PID<0.205.0>, 6, :state_doesnt_matter}
{#PID<0.205.0>, 7, :state_doesnt_matter}
{#PID<0.205.0>, 8, :state_doesnt_matter}
{#PID<0.205.0>, 9, :state_doesnt_matter}
{#PID<0.205.0>, 10, :state_doesnt_matter}
{#PID<0.205.0>, 11, :state_doesnt_matter}
{#PID<0.205.0>, 12, :state_doesnt_matter}
. . .
From here, we have a working flow.
There is a producer emitting our counter, and our consumber is displaying all of this and continuing the flow.
Now, what if we wanted multiple consumers?
Right now, if we examine the IO.inspect/1
output, we see that every single event is handled by a single PID.
This isn't very Elixir-y.
We have massive concurrency built-in, we should probably leverage that as much as possible.
Let's make some adjustments so that we can have multiple workers by modifying lib/genstage_example.ex
. . .
children = [
worker(GenstageExample.Producer, []),
worker(GenstageExample.Consumer, [], id: 1),
worker(GenstageExample.Consumer, [], id: 2),
]
. . .
Now, let's fire up IEx again:
$ iex -S mix
iex(1)> {#PID<0.205.0>, 0, :state_doesnt_matter}
{#PID<0.205.0>, 1, :state_doesnt_matter}
{#PID<0.205.0>, 2, :state_doesnt_matter}
{#PID<0.207.0>, 3, :state_doesnt_matter}
. . .
As you can see, we have multiple PIDs now, simply by adding a line of code and giving our consumers IDs. But we can take this even further:
. . .
children = [
worker(GenstageExample.Producer, []),
]
consumers = for id <- 1..(System.schedulers_online * 12) do
# helper to get the number of cores on machine
worker(GenstageExample.Consumer, [], id: id)
end
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children ++ consumers, opts)
. . .
What we are doing here is quite simple.
First, we get the number of core on the machine with System.schedulers_online/0
, and from there we simply create a worker just like we had.
Now we have 12 workers per core. This is much more effective.
. . .
{#PID<0.229.0>, 63697, :state_doesnt_matter}
{#PID<0.224.0>, 53190, :state_doesnt_matter}
{#PID<0.223.0>, 72687, :state_doesnt_matter}
{#PID<0.238.0>, 69688, :state_doesnt_matter}
{#PID<0.196.0>, 62696, :state_doesnt_matter}
{#PID<0.212.0>, 52713, :state_doesnt_matter}
{#PID<0.233.0>, 72175, :state_doesnt_matter}
{#PID<0.214.0>, 51712, :state_doesnt_matter}
{#PID<0.227.0>, 66190, :state_doesnt_matter}
{#PID<0.234.0>, 58694, :state_doesnt_matter}
{#PID<0.211.0>, 55694, :state_doesnt_matter}
{#PID<0.240.0>, 64698, :state_doesnt_matter}
{#PID<0.193.0>, 50692, :state_doesnt_matter}
{#PID<0.207.0>, 56683, :state_doesnt_matter}
{#PID<0.213.0>, 71684, :state_doesnt_matter}
{#PID<0.235.0>, 53712, :state_doesnt_matter}
{#PID<0.208.0>, 51197, :state_doesnt_matter}
{#PID<0.200.0>, 61689, :state_doesnt_matter}
. . .
Though we lack any ordering like we would have with a single core, but every increment is being hit and processed.
We can take this a step further and change our broadcasting strategy from the default in our producer:
. . .
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
. . .
What this does is it accumulates demand from all consumers before broadcasting its events to all of them. If we fire up IEx we can see the implication:
. . .
{#PID<0.200.0>, 1689, :state_doesnt_matter}
{#PID<0.230.0>, 1690, :state_doesnt_matter}
{#PID<0.196.0>, 1679, :state_doesnt_matter}
{#PID<0.215.0>, 1683, :state_doesnt_matter}
{#PID<0.237.0>, 1687, :state_doesnt_matter}
{#PID<0.205.0>, 1682, :state_doesnt_matter}
{#PID<0.206.0>, 1695, :state_doesnt_matter}
{#PID<0.216.0>, 1682, :state_doesnt_matter}
{#PID<0.217.0>, 1689, :state_doesnt_matter}
{#PID<0.233.0>, 1681, :state_doesnt_matter}
{#PID<0.223.0>, 1689, :state_doesnt_matter}
{#PID<0.193.0>, 1194, :state_doesnt_matter}
. . .
Note that some numbers are showing twice now, this is why.
To go further we'll need to bring in a database to store our progress and status.
This is quite simple using Ecto.
To get started let's add it and the Postgresql adapter to mix.exs
:
. . .
defp deps do
[
{:gen_stage, "~> 0.7"},
{:ecto, "~> 2.0"},
{:postgrex, "~> 0.12.1"},
]
end
. . .
Fetch the dependencies and compile:
$ mix do deps.get, compile
And now we can add a repo for setup in lib/repo.ex
:
defmodule GenstageExample.Repo do
use Ecto.Repo,
otp_app: :genstage_example
end
and with this we can set up our config next in config/config.exs
:
use Mix.Config
config :genstage_example, ecto_repos: [GenstageExample.Repo]
config :genstage_example, GenstageExample.Repo,
adapter: Ecto.Adapters.Postgres,
database: "genstage_example",
username: "your_username",
password: "your_password",
hostname: "localhost",
port: "5432"
And if we add a supservisor to lib/genstage_example.ex
we can now start working with the DB:
. . .
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
supervisor(GenstageExample.Repo, []),
worker(GenstageExample.Producer, []),
]
end
. . .
But we should also make an interface to do that, so let's import our query interface and repo to the producer:
. . .
import Ecto.Query
import GenstageExample.Repo
. . .
Now we need to create our migration:
$ mix ecto.gen.migration setup_tasks status:text payload:binary
Now that we have a functional database, we can start storing things. Let's remove our change in Broadcaster, as we only were doing that to demonstrate that there are others outside the normal default in our Producer.
. . .
def init(counter) do
{:producer, counter}
end
. . .
Now that we have all this boilerplate work completed we should come up with a model to run all of this now that we have a simple wired-together producer/consumer model.
At the end of the day we are trying to make a task runner.
To do this, we probably want to abstract the interface for tasks and DB interfacing into their own modules.
To start, let's create our Task
module to model our actual tasks to be run:
defmodule GenstageExample.Task do
def enqueue(status, payload) do
GenstageExample.TaskDBInterface.insert_tasks(status, payload)
end
def take(limit) do
GenstageExample.TaskDBInterface.take_tasks(limit)
end
end
This is a really simple interface to abstract a given task's functionality. We only have 2 functions. Now, the module they are calling doesn't exist yet, it gives us the ideas we need to build a very simple interface. These can be broken down as follows:
enqueue/2
- Enqueue a task to be runtake/1
- Take a given number of tasks to run from the database
Now this gives us the interface we need: we can set things to be run, and grab tasks to be run and we can define the rest of the interface. Let's create an interface with our database in its own module:
defmodule GenstageExample.TaskDBInterface do
import Ecto.Query
def take_tasks(limit) do
{:ok, {count, events}} =
GenstageExample.Repo.transaction fn ->
ids = GenstageExample.Repo.all waiting(limit)
GenstageExample.Repo.update_all by_ids(ids), [set: [status: "running"]], [returning: [:id, :payload]]
end
{count, events}
end
def insert_tasks(status, payload) do
GenstageExample.Repo.insert_all "tasks", [
%{status: status, payload: payload}
]
end
def update_task_status(id, status) do
GenstageExample.Repo.update_all by_ids([id]), set: [status: status]
end
defp by_ids(ids) do
from t in "tasks", where: t.id in ^ids
end
defp waiting(limit) do
from t in "tasks",
where: t.status == "waiting",
limit: ^limit,
select: t.id,
lock: "FOR UPDATE SKIP LOCKED"
end
end
This one is a bit more complex, but we'll break it down piece by piece. We have 3 main functions, and 2 private helpers:
take_tasks/1
insert_tasks/2
update_task_status/2
With take_tasks/1
we have the bulk of our logic.
This function will be called to grab tasks we have queued to run them.
Let's look at the code:
. . .
def take_tasks(limit) do
{:ok, {count, events}} =
GenstageExample.Repo.transaction fn ->
ids = GenstageExample.Repo.all waiting(limit)
GenstageExample.Repo.update_all by_ids(ids), [set: [status: "running"]], [returning: [:id, :payload]]
end
{count, events}
end
. . .
We do a few things here.
First, we go in and we wrap everything in a transaction.
This maintains state in the database so we avoid race conditions and other bad things.
Inside here, we get the ids of all tasks waiting to be executed up to some limit, and set them to running
as their status.
We return the count
of total tasks and the events to be run in the consumer.
Next we have insert_tasks/2
:
. . .
def insert_tasks(status, payload) do
GenstageExample.Repo.insert_all "tasks", [
%{status: status, payload: payload}
]
end
. . .
This one is a bit more simple. We just insert a task to be run with a given payload binary.
Finally, we have update_task_status/2
, which is also quite simple:
. . .
def update_task_status(id, status) do
GenstageExample.Repo.update_all by_ids([id]), set: [status: status]
end
. . .
Here we simple update tasks to the status we want using a given id.
Our helpers are all called primarily inside of take_tasks/1
, but also used elsewhere in the main public API.
. . .
defp by_ids(ids) do
from t in "tasks", where: t.id in ^ids
end
defp waiting(limit) do
from t in "tasks",
where: t.status == "waiting",
limit: ^limit,
select: t.id,
lock: "FOR UPDATE SKIP LOCKED"
end
. . .
Neither of these has a ton of complexity.
by_ids/1
simply grabs all tasks that match in a given list of IDs.
waiting/1
finds all tasks that have the status waiting up to a given limit.
However, there is one note to make on waiting/1
.
We leverage a lock on all tasks being updated so we skip those, a feature available in psql 9.5+.
Outside of this, it is a very simple SELECT
statement.
Now that we have our DB interface defined as it is used in the primary API, we can move onto the producer, consumer, and last bits of configuration.
We will need to do a bit of configuration in lib/genstage_example.ex
to clarify things as well as give us the final functionalities we will need to run jobs.
This is what we will end up with:
. . .
def start(_type, _args) do
import Supervisor.Spec, warn: false
# 12 workers / system core
consumers = for id <- (0..System.schedulers_online * 12) do
worker(GenstageExample.Consumer, [], id: id)
end
producers = [
worker(Producer, []),
]
supervisors = [
supervisor(GenstageExample.Repo, []),
supervisor(Task.Supervisor, [[name: GenstageExample.TaskSupervisor]]),
]
children = supervisors ++ producers ++ consumers
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children, opts)
end
def start_later(module, function, args) do
payload = {module, function, args} |> :erlang.term_to_binary
Repo.insert_all("tasks", [
%{status: "waiting", payload: payload}
])
notify_producer
end
def notify_producer do
send(Producer, :data_inserted)
end
defdelegate enqueue(module, function, args), to: Producer
. . .
Let's tackle this from the top down.
First, start/2
:
. . .
def start(_type, _args) do
import Supervisor.Spec, warn: false
# 12 workers / system core
consumers = for id <- (0..System.schedulers_online * 12) do
worker(GenstageExample.Consumer, [], id: id)
end
producers = [
worker(Producer, []),
]
supervisors = [
supervisor(GenstageExample.Repo, []),
supervisor(Task.Supervisor, [[name: GenstageExample.TaskSupervisor]]),
]
children = supervisors ++ producers ++ consumers
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children, opts)
end
. . .
First of all, you will notice we are now defining producers, consumers, and supervisors separately. I find this convention to work quite well to illustrate the intentions of various processes and trees we are starting here. In these 3 lists we set up 12 consumers / CPU core, set up a single producer, and then our supervisors for the Repo, as well as one new one.
This new supervisor is run through Task.Supervisor
, which is built into Elixir.
We give it a name so it is easily referred to in our GenStage code, GenstageExample.TaskSupervisor
.
Now, we define our children as the concatenation of all these lists.
Next, we have start_later/3
:
. . .
def start_later(module, function, args) do
payload = {module, function, args} |> :erlang.term_to_binary
Repo.insert_all("tasks", [
%{status: "waiting", payload: payload}
])
notify_producer
end
. . .
This function takes a module, a function, and an argument.
It then encodes them as a binary using some built-in erlang magic.
From here, we then insert the task as waiting
, and we notify a producer that a task has been inserted to run.
Now let's check out notify_producer/0
:
. . .
def notify_producer do
send(Producer, :data_inserted)
end
. . .
This method is quite simple.
We send our producer a message, :data_inserted
, simply so that it knows what we did.
The message here is arbitrary, but I chose this atom to make the meaning clear.
Last, but not least we do some simple delegation:
. . .
defdelegate enqueue(module, functions, args), to : Producer
. . .
This simply makes it so if we call GenstageExample.enqueue(module, function, args)
that it will be delegated to the same method in our producer.
Our producer doesn't need a ton of work.
first, we'll alter our handle_demand/2
to actually do something with our events:
. . .
def handle_demand(demand, state) when demand > 0 do
serve_jobs(demand + state)
end
. . .
We haven't defined serve_jobs/2
yet, but we'll get there.
The concept is simple, when we get a demand and demand is > 0, we do some work to the tune of demand + the current state's number of jobs.
Now that we will be sending a message to the producer when we run start_later/3
, we will want to respond to it with a handle_info/2
call:
. . .
def handle_info(:enqueued, state) do
{count, events} = GenstageExample.Task.take(state)
{:noreply, events, state - count}
end
. . .
With this, we simply respond by taking the number of tasks we are told to get ready to run.
Now let's define serve_jobs/1
:
. . .
def serve_jobs limit do
{count, events} = GenstageExample.Task.take(limit)
Process.send_after(@name, :enqueued, 60_000)
{:noreply, events, limit - count}
end
. . .
Now, we are sending a process in one minute that to our producer telling it that it should respond to :enqueued
.
Note that we call the process module with @name
, which we will need to add at the top as a module attribute:
. . .
@name __MODULE__
. . .
Let's define that last function to handle the :enqueued
message now, too:
. . .
def handle_cast(:enqueued, state) do
serve_jobs(state)
end
. . .
This will simply serve jobs when we tell the producer they have state
number of enqueued and to respond.
Our consumer is where we do the work.
Now that we have our producer storing tasks, we want to have the consumer handle this as well.
There is a good bit of work to be done here tying into our work so far.
The core of the consumer is handle_events/3
, lets flesh out the functionality we wish to have there and define it as we go further:
. . .
def handle_events(events, _from, state) do
for event <- events do
%{id: id, payload: payload} = event
{module, function, args} = payload |> deconstruct_payload
task = start_task(module, function, args)
yield_to_and_update_task(task, id)
end
{:noreply, [], state}
end
. . .
At its core, this setup simple just wants to run a task we decode the binary of. To do this we get the data from the event, deconstruct it, and then start and yield to a task. These functions aren't defined yet, so let's create them:
. . .
def deconstruct_payload payload do
payload |> :erlang.binary_to_term
end
. . .
We can use Erlang's built-in inverse of our other term_to_binary/1
function to get our module, function, and args back out.
Now we need to start the task:
. . .
def start_task(mod, func, args) do
Task.Supervisor.async_nolink(TaskSupervisor, mod, func, args)
end
. . .
Here we leverage the supervisor we created at the beginning to run this in a task.
Now we need to define yield_to_and_update_task/2
:
. . .
def yield_to_and_update_task(task, id) do
task
|> Task.yield(1000)
|> yield_to_status(task)
|> update(id)
end
. . .
Now this brings in more pieces we've yet to define, but the core is simple.
We wait 1 second for the task to run.
From here, we respond to the status it returns (which will either be :ok
, :exit
, or nil
) and handle it as such.
After that we update our task via our DB interface to get things current.
Let's define yield_to_status/2
for each of the scenarios we mentioned:
. . .
def yield_to_status({:ok, _}, _) do
"success"
end
def yield_to_status({:exit, _}, _) do
"error"
end
def yield_to_status(nil, task) do
Task.shutdown(task)
"timeout"
end
. . .
These simple handle the atom being returned from the process and respond appropriately. If it takes more than a second, we need to shut it down because otherwise it would just hang forever.
From this, we can see our finalized consumer:
defmodule GenstageExample.Consumer do
alias Experimental.GenStage
use GenStage
alias GenstageExample.{Producer, TaskSupervisor}
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [Producer]}
end
def handle_events(events, _from, state) do
for event <- events do
%{id: id, payload: payload} = event
{module, function, args} = payload |> deconstruct_payload
task = start_task(module, function, args)
yield_to_and_update_task(task, id)
end
{:noreply, [], state}
end
defp yield_to_and_update_task(task, id) do
task
|> Task.yield(1000)
|> yield_to_status(task)
|> update(id)
end
defp start_task(mod, func, args) do
Task.Supervisor.async_nolink(TaskSupervisor, mod , func, args)
end
defp yield_to_status({:ok, _}, _) do
"success"
end
defp yield_to_status({:exit, _}, _) do
"error"
end
defp yield_to_status(nil, task) do
Task.shutdown(task)
"timeout"
end
defp update(status, id) do
GenstageExample.TaskDBInterface.update_task_status(id, status)
end
defp deconstruct_payload payload do
payload |> :erlang.binary_to_term
end
end
Now, if we go into IEx:
$ iex -S mix
iex> GenstageExample.enqueue(IO, :puts, ["wuddup"])
#=>
16:39:31.014 [debug] QUERY OK db=137.4ms
INSERT INTO "tasks" ("payload","status") VALUES ($1,$2) [<<131, 104, 3, 100, 0, 9, 69, 108, 105, 120, 105, 114, 46, 73, 79, 100, 0, 4, 112, 117, 116, 115, 108, 0, 0, 0, 1, 109, 0, 0, 0, 6, 119, 117, 100, 100, 117, 112, 106>>, "waiting"]
:ok
16:39:31.015 [debug] QUERY OK db=0.4ms queue=0.1ms
begin []
16:39:31.025 [debug] QUERY OK source="tasks" db=9.6ms
SELECT t0."id" FROM "tasks" AS t0 WHERE (t0."status" = 'waiting') LIMIT $1 FOR UPDATE SKIP LOCKED [49000]
16:39:31.026 [debug] QUERY OK source="tasks" db=0.8ms
UPDATE "tasks" AS t0 SET "status" = $1 WHERE (t0."id" = ANY($2)) RETURNING t0."id", t0."payload" ["running", [5]]
16:39:31.040 [debug] QUERY OK db=13.5ms
commit []
iex(2)> wuddup
16:39:31.060 [debug] QUERY OK source="tasks" db=1.3ms
UPDATE "tasks" AS t0 SET "status" = $1 WHERE (t0."id" = ANY($2)) ["success", [5]]
It works and we are storing and running tasks!
Here is a link to all code in my own repo.