Skip to content

Instantly share code, notes, and snippets.

@rinaldifonseca
Forked from ybur-yug/project.md
Created July 28, 2024 23:25

Revisions

  1. @ybur-yug ybur-yug revised this gist Oct 31, 2016. 1 changed file with 1 addition and 2 deletions.
    3 changes: 1 addition & 2 deletions project.md
    Original file line number Diff line number Diff line change
    @@ -940,7 +940,6 @@ UPDATE "tasks" AS t0 SET "status" = $1 WHERE (t0."id" = ANY($2)) ["success", [5]
    It works and we are storing and running tasks!
    ## TODO publish to hex
    [Here](https://github.com/ybur-yug/genstage_example/tree/87c5f96c74e8fa90cd5b5fd108cd9ba104f78a65) is a link to all code thus far.
    [Here](https://github.com/ybur-yug/genstage_example/tree/87c5f96c74e8fa90cd5b5fd108cd9ba104f78a65) is a link to all code in my own repo.
  2. @ybur-yug ybur-yug created this gist Oct 31, 2016.
    946 changes: 946 additions & 0 deletions project.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,946 @@
    # GenStage Tutorial

    ## Introduction
    So what is GenStage? From the official documentation, it is a "specification and computational flow for Elixir", but what does that mean to us?
    There is a lot to something that can be described as that vague, and here we'll take a dive in and build something on top of it to understand its goals.
    We could go into the technical and theoretical implications of this, but instead lets try a pragmatic approach to really just get it to work.

    First, Let's imagine we have a server that constantly emits numbers.
    It starts at the state of the number we give it, then counts up in one from there onward.
    This is what we would call our producer.
    Each time it emits a number, this is an event, and we want to handle it with a consumer.
    A consumer simply takes what a producer emits and does something to it.
    In our case, we will display the count.
    There is a lot more to GenStage at a technical and applied level, but we will build up on the specifics and definitions further in later lessons, for now we just want a running example we can build up on.

    ## Getting Started: A Sample GenStage Project
    We'll begin by generating a simple project that has a supervision tree:

    ```shell
    $ mix new genstage_example --sup
    $ cd genstage_example
    ```

    Let's set up some basic things for the future of our application.
    Since GenStage is generally used as a transformation pipeline, lets imagine we have a background worker of some sort.
    This worker will need to persist whatever it changes, so we should get a database set up, but we can worry about that in a later lesson.
    To start, all we need to do is add `gen_stage` to our deps.

    ```elixir
    . . .
    defp deps do
    [
    {:gen_stage, "~> 0.7},
    ]
    end
    . . .
    ```
    Now, we should fetch our dependencies and compile before we start setup:
    ```shell
    $ mix do deps.get, compile
    ```
    Lets build a producer, our simple beginning building block to help us utilize this new tool!
    ## Building A Producer
    To get started what we want to do is create a producer that emits a constant stream of events for our consumer to handle.
    This is quite simple with the rudimentary example of a counter.
    Let's create a namespaced directory under `lib` and then go from there, this way our module naming matches our names of the modules themselves:
    ```shell
    $ mkdir lib/genstage_example
    $ touch lib/genstage_example/producer.ex
    ```
    Now we can add the code:
    ```elixir
    defmodule GenstageExample.Producer do
    alias Experimental.GenStage
    use GenStage
    def start_link do
    GenStage.start_link(__MODULE__, 0, name: __MODULE__)
    # naming allows us to handle failure
    end
    def init(counter) do
    {:producer, counter}
    end
    def handle_demand(demand, state) do
    events = Enum.to_list(state..state + demand - 1)
    {:noreply, events, (state + demand)}
    end
    end
    ```
    Let's break this down line by line.
    To begin with, we have our initial declarations:
    ```elixir
    . . .
    defmodule GenstageExample.Producer do
    alias Experimental.GenStage
    use GenStage
    . . .
    ```
    What this does is a couple simple things.
    First, we declare our module, and soon after we alias `Experimental.GenStage`.
    This is simply because we will be calling it more than once and makes it more convenient.
    The `use GenStage` line is much akin to `use GenServer`.
    This line allows us to import the default behaviour and functions to save us from a large amount of boilerplate.
    If we go further, we see the first two primary functions for startup:
    ```elixir
    . . .
    def start_link do
    GenStage.start_link(__MODULE__, :the_state_doesnt_matter)
    end
    def init(counter) do
    {:producer, counter}
    end
    . . .
    ```
    These two functions offer a very simple start.
    First, we have our standard `start_link/0` function.
    Inside here, we use`GenStage.start_link/` beginning with our argument `__MODULE__`, which will give it the name of our current module.
    Next, we set a state, which is arbitrary in this case, and can be any value.
    The `__MODULE__` argument is used for name registration like any other module.
    The second argument is the arguments, which in this case are meaningless as we do not care about it.
    In `init/1` we simply set the counter as our state, and label ourselves as a producer.
    Finally, we have where the real meat of our code's functionality is:
    ```elixir
    . . .
    def handle_demand(demand, state) do
    events = Enum.to_list(state..state + demand - 1)
    {:noreply, events, (state + demand)}
    end
    . . .
    ```
    `handle_demand/2` must be implemented by all producer type modules that utilize GenStage.
    In this case, we are simply sending out an incrementing counter.
    This might not make a ton of sense until we build our consumer, so lets move on to that now.
    ## Building A Consumer
    The consumer will handle the events that are broadcasted out by our producer.
    For now, we wont worry about things like broadcast strategies, or what the internals are truly doing.
    We'll start by showing all the code and then break it down.
    ```elixir
    defmodule GenstageExample.Consumer do
    alias Experimental.GenStage
    use GenStage
    def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
    end
    def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.Producer]}
    end
    def handle_events(events, _from, state) do
    for event <- events do
    IO.inspect {self(), event, state}
    end
    {:noreply, [], state}
    end
    end
    ```
    To start, let's look at the beginning functions just like last time:
    ```elixir
    defmodule GenstageExample.Consumer do
    alias Experimental.GenStage
    use GenStage
    def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
    end
    def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.Producer]}
    end
    . . .
    ```
    To begin, much like in our producer, we set up our `start_link/0` and `init/1` functions.
    In `start_link` we simple register the module name like last time, and set a state.
    The state is arbitrary for the consumer, and can be literally whatever we please, in this case `:state_doesnt_matter`.
    In `init/1` we simply take the state and set up our expected tuple.
    It expected use to register our `:consumer` atom first, then the state given.
    Our `subscribe_to` clause is optional.
    What this does is subscribe us to our producer module.
    The reason for this is if something crashes, it will simply attempt to re-subscribe and then resume receiving emitted events.
    ```elixir
    . . .
    def handle_events(events, _from, state) do
    for event <- events do
    IO.inspect {self(), event, state}
    end
    {:noreply, [], state}
    end
    . . .
    ```
    This is the meat of our consumer, `handle_events/3`.
    `handle_events/3` must be implemented by any `consumer` or `producer_consumer` type of GenStage module.
    What this does for us is quite simple.
    We take a list of events, and iterate through these.
    From there, we inspect the `pid` of our consumer, the event (in this case the current count), and the state.
    After that, we don't reply because we are a consumer and do not handle anything, and we don't emit events to the second argument is empty, then we simply pass on the state.
    ## Wiring It Together
    To get all of this to work we only have to make one simple change.
    Open up `lib/genstage_example.ex` and we can add them as workers and they will automatically start with our application:
    ```elixir
    . . .
    children = [
    worker(GenstageExample.Producer, []),
    worker(GenstageExample.Consumer, []),
    ]
    . . .
    ```
    With this, if things are all correct, we can run IEx and we should see everything working:
    ```elixir
    iex(1)> {#PID<0.205.0>, 0, :state_doesnt_matter}
    {#PID<0.205.0>, 1, :state_doesnt_matter}
    {#PID<0.205.0>, 2, :state_doesnt_matter}
    {#PID<0.205.0>, 3, :state_doesnt_matter}
    {#PID<0.205.0>, 4, :state_doesnt_matter}
    {#PID<0.205.0>, 5, :state_doesnt_matter}
    {#PID<0.205.0>, 6, :state_doesnt_matter}
    {#PID<0.205.0>, 7, :state_doesnt_matter}
    {#PID<0.205.0>, 8, :state_doesnt_matter}
    {#PID<0.205.0>, 9, :state_doesnt_matter}
    {#PID<0.205.0>, 10, :state_doesnt_matter}
    {#PID<0.205.0>, 11, :state_doesnt_matter}
    {#PID<0.205.0>, 12, :state_doesnt_matter}
    . . .
    ```
    ## Tinkering: For Science and Understanding
    From here, we have a working flow.
    There is a producer emitting our counter, and our consumber is displaying all of this and continuing the flow.
    Now, what if we wanted multiple consumers?
    Right now, if we examine the `IO.inspect/1` output, we see that every single event is handled by a single PID.
    This isn't very Elixir-y.
    We have massive concurrency built-in, we should probably leverage that as much as possible.
    Let's make some adjustments so that we can have multiple workers by modifying `lib/genstage_example.ex`
    ```elixir
    . . .
    children = [
    worker(GenstageExample.Producer, []),
    worker(GenstageExample.Consumer, [], id: 1),
    worker(GenstageExample.Consumer, [], id: 2),
    ]
    . . .
    ```
    Now, let's fire up IEx again:
    ```elixir
    $ iex -S mix
    iex(1)> {#PID<0.205.0>, 0, :state_doesnt_matter}
    {#PID<0.205.0>, 1, :state_doesnt_matter}
    {#PID<0.205.0>, 2, :state_doesnt_matter}
    {#PID<0.207.0>, 3, :state_doesnt_matter}
    . . .
    ```
    As you can see, we have multiple PIDs now, simply by adding a line of code and giving our consumers IDs.
    But we can take this even further:
    ```elixir
    . . .
    children = [
    worker(GenstageExample.Producer, []),
    ]
    consumers = for id <- 1..(System.schedulers_online * 12) do
    # helper to get the number of cores on machine
    worker(GenstageExample.Consumer, [], id: id)
    end
    opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
    Supervisor.start_link(children ++ consumers, opts)
    . . .
    ```
    What we are doing here is quite simple.
    First, we get the number of core on the machine with `System.schedulers_online/0`, and from there we simply create a worker just like we had.
    Now we have 12 workers per core. This is much more effective.
    ```elixir
    . . .
    {#PID<0.229.0>, 63697, :state_doesnt_matter}
    {#PID<0.224.0>, 53190, :state_doesnt_matter}
    {#PID<0.223.0>, 72687, :state_doesnt_matter}
    {#PID<0.238.0>, 69688, :state_doesnt_matter}
    {#PID<0.196.0>, 62696, :state_doesnt_matter}
    {#PID<0.212.0>, 52713, :state_doesnt_matter}
    {#PID<0.233.0>, 72175, :state_doesnt_matter}
    {#PID<0.214.0>, 51712, :state_doesnt_matter}
    {#PID<0.227.0>, 66190, :state_doesnt_matter}
    {#PID<0.234.0>, 58694, :state_doesnt_matter}
    {#PID<0.211.0>, 55694, :state_doesnt_matter}
    {#PID<0.240.0>, 64698, :state_doesnt_matter}
    {#PID<0.193.0>, 50692, :state_doesnt_matter}
    {#PID<0.207.0>, 56683, :state_doesnt_matter}
    {#PID<0.213.0>, 71684, :state_doesnt_matter}
    {#PID<0.235.0>, 53712, :state_doesnt_matter}
    {#PID<0.208.0>, 51197, :state_doesnt_matter}
    {#PID<0.200.0>, 61689, :state_doesnt_matter}
    . . .
    ```
    Though we lack any ordering like we would have with a single core, but every increment is being hit and processed.
    We can take this a step further and change our broadcasting strategy from the default in our producer:
    ```elixir
    . . .
    def init(counter) do
    {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
    end
    . . .
    ```
    What this does is it accumulates demand from all consumers before broadcasting its events to all of them.
    If we fire up IEx we can see the implication:
    ```elixir
    . . .
    {#PID<0.200.0>, 1689, :state_doesnt_matter}
    {#PID<0.230.0>, 1690, :state_doesnt_matter}
    {#PID<0.196.0>, 1679, :state_doesnt_matter}
    {#PID<0.215.0>, 1683, :state_doesnt_matter}
    {#PID<0.237.0>, 1687, :state_doesnt_matter}
    {#PID<0.205.0>, 1682, :state_doesnt_matter}
    {#PID<0.206.0>, 1695, :state_doesnt_matter}
    {#PID<0.216.0>, 1682, :state_doesnt_matter}
    {#PID<0.217.0>, 1689, :state_doesnt_matter}
    {#PID<0.233.0>, 1681, :state_doesnt_matter}
    {#PID<0.223.0>, 1689, :state_doesnt_matter}
    {#PID<0.193.0>, 1194, :state_doesnt_matter}
    . . .
    ```
    Note that some numbers are showing twice now, this is why.
    ## Setting Up Postgres to Extend Our Producer
    To go further we'll need to bring in a database to store our progress and status.
    This is quite simple using [Ecto](LINKTOLESSON).
    To get started let's add it and the Postgresql adapter to `mix.exs`:
    ```elixir
    . . .
    defp deps do
    [
    {:gen_stage, "~> 0.7"},
    {:ecto, "~> 2.0"},
    {:postgrex, "~> 0.12.1"},
    ]
    end
    . . .
    ```
    Fetch the dependencies and compile:
    ```shell
    $ mix do deps.get, compile
    ```
    And now we can add a repo for setup in `lib/repo.ex`:
    ```elixir
    defmodule GenstageExample.Repo do
    use Ecto.Repo,
    otp_app: :genstage_example
    end
    ```
    and with this we can set up our config next in `config/config.exs`:
    ```elixir
    use Mix.Config
    config :genstage_example, ecto_repos: [GenstageExample.Repo]
    config :genstage_example, GenstageExample.Repo,
    adapter: Ecto.Adapters.Postgres,
    database: "genstage_example",
    username: "your_username",
    password: "your_password",
    hostname: "localhost",
    port: "5432"
    ```
    And if we add a supservisor to `lib/genstage_example.ex` we can now start working with the DB:
    ```elixir
    . . .
    def start(_type, _args) do
    import Supervisor.Spec, warn: false
    children = [
    supervisor(GenstageExample.Repo, []),
    worker(GenstageExample.Producer, []),
    ]
    end
    . . .
    ```
    But we should also make an interface to do that, so let's import our query interface and repo to the producer:
    ```elixir
    . . .
    import Ecto.Query
    import GenstageExample.Repo
    . . .
    ```
    Now we need to create our migration:
    ```shell
    $ mix ecto.gen.migration setup_tasks status:text payload:binary
    ```
    Now that we have a functional database, we can start storing things.
    Let's remove our change in Broadcaster, as we only were doing that to demonstrate that there are others outside the normal default in our Producer.
    ```elixir
    . . .
    def init(counter) do
    {:producer, counter}
    end
    . . .
    ```
    ### Modelling the Rest of the Functionality
    Now that we have all this boilerplate work completed we should come up with a model to run all of this now that we have a simple wired-together producer/consumer model.
    At the end of the day we are trying to make a task runner.
    To do this, we probably want to abstract the interface for tasks and DB interfacing into their own modules.
    To start, let's create our `Task` module to model our actual tasks to be run:
    ```elixir
    defmodule GenstageExample.Task do
    def enqueue(status, payload) do
    GenstageExample.TaskDBInterface.insert_tasks(status, payload)
    end
    def take(limit) do
    GenstageExample.TaskDBInterface.take_tasks(limit)
    end
    end
    ```
    This is a _really_ simple interface to abstract a given task's functionality.
    We only have 2 functions.
    Now, the module they are calling doesn't exist yet, it gives us the ideas we need to build a very simple interface.
    These can be broken down as follows:
    1. `enqueue/2` - Enqueue a task to be run
    3. `take/1` - Take a given number of tasks to run from the database
    Now this gives us the interface we need: we can set things to be run, and grab tasks to be run and we can define the rest of the interface.
    Let's create an interface with our database in its own module:
    ```elixir
    defmodule GenstageExample.TaskDBInterface do
    import Ecto.Query
    def take_tasks(limit) do
    {:ok, {count, events}} =
    GenstageExample.Repo.transaction fn ->
    ids = GenstageExample.Repo.all waiting(limit)
    GenstageExample.Repo.update_all by_ids(ids), [set: [status: "running"]], [returning: [:id, :payload]]
    end
    {count, events}
    end
    def insert_tasks(status, payload) do
    GenstageExample.Repo.insert_all "tasks", [
    %{status: status, payload: payload}
    ]
    end
    def update_task_status(id, status) do
    GenstageExample.Repo.update_all by_ids([id]), set: [status: status]
    end
    defp by_ids(ids) do
    from t in "tasks", where: t.id in ^ids
    end
    defp waiting(limit) do
    from t in "tasks",
    where: t.status == "waiting",
    limit: ^limit,
    select: t.id,
    lock: "FOR UPDATE SKIP LOCKED"
    end
    end
    ```
    This one is a bit more complex, but we'll break it down piece by piece.
    We have 3 main functions, and 2 private helpers:
    #### Main Functions
    1. `take_tasks/1`
    2. `insert_tasks/2`
    3. `update_task_status/2`
    With `take_tasks/1` we have the bulk of our logic.
    This function will be called to grab tasks we have queued to run them.
    Let's look at the code:
    ```elixir
    . . .
    def take_tasks(limit) do
    {:ok, {count, events}} =
    GenstageExample.Repo.transaction fn ->
    ids = GenstageExample.Repo.all waiting(limit)
    GenstageExample.Repo.update_all by_ids(ids), [set: [status: "running"]], [returning: [:id, :payload]]
    end
    {count, events}
    end
    . . .
    ```
    We do a few things here.
    First, we go in and we wrap everything in a transaction.
    This maintains state in the database so we avoid race conditions and other bad things.
    Inside here, we get the ids of all tasks waiting to be executed up to some limit, and set them to `running` as their status.
    We return the `count` of total tasks and the events to be run in the consumer.
    Next we have `insert_tasks/2`:
    ```elixir
    . . .
    def insert_tasks(status, payload) do
    GenstageExample.Repo.insert_all "tasks", [
    %{status: status, payload: payload}
    ]
    end
    . . .
    ```
    This one is a bit more simple.
    We just insert a task to be run with a given payload binary.
    Finally, we have `update_task_status/2`, which is also quite simple:
    ```elixir
    . . .
    def update_task_status(id, status) do
    GenstageExample.Repo.update_all by_ids([id]), set: [status: status]
    end
    . . .
    ```
    Here we simple update tasks to the status we want using a given id.
    #### Helpers
    Our helpers are all called primarily inside of `take_tasks/1`, but also used elsewhere in the main public API.
    ```elixir
    . . .
    defp by_ids(ids) do
    from t in "tasks", where: t.id in ^ids
    end
    defp waiting(limit) do
    from t in "tasks",
    where: t.status == "waiting",
    limit: ^limit,
    select: t.id,
    lock: "FOR UPDATE SKIP LOCKED"
    end
    . . .
    ```
    Neither of these has a ton of complexity.
    `by_ids/1` simply grabs all tasks that match in a given list of IDs.
    `waiting/1` finds all tasks that have the status waiting up to a given limit.
    However, there is one note to make on `waiting/1`.
    We leverage a lock on all tasks being updated so we skip those, a feature available in psql 9.5+.
    Outside of this, it is a very simple `SELECT` statement.
    Now that we have our DB interface defined as it is used in the primary API, we can move onto the producer, consumer, and last bits of configuration.
    ### Producer, Consumer, and Final Configuration
    #### Final Config
    We will need to do a bit of configuration in `lib/genstage_example.ex` to clarify things as well as give us the final functionalities we will need to run jobs.
    This is what we will end up with:
    ```elixir
    . . .
    def start(_type, _args) do
    import Supervisor.Spec, warn: false
    # 12 workers / system core
    consumers = for id <- (0..System.schedulers_online * 12) do
    worker(GenstageExample.Consumer, [], id: id)
    end
    producers = [
    worker(Producer, []),
    ]
    supervisors = [
    supervisor(GenstageExample.Repo, []),
    supervisor(Task.Supervisor, [[name: GenstageExample.TaskSupervisor]]),
    ]
    children = supervisors ++ producers ++ consumers
    opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
    Supervisor.start_link(children, opts)
    end
    def start_later(module, function, args) do
    payload = {module, function, args} |> :erlang.term_to_binary
    Repo.insert_all("tasks", [
    %{status: "waiting", payload: payload}
    ])
    notify_producer
    end
    def notify_producer do
    send(Producer, :data_inserted)
    end
    defdelegate enqueue(module, function, args), to: Producer
    . . .
    ```
    Let's tackle this from the top down.
    First, `start/2`:
    ```elixir
    . . .
    def start(_type, _args) do
    import Supervisor.Spec, warn: false
    # 12 workers / system core
    consumers = for id <- (0..System.schedulers_online * 12) do
    worker(GenstageExample.Consumer, [], id: id)
    end
    producers = [
    worker(Producer, []),
    ]
    supervisors = [
    supervisor(GenstageExample.Repo, []),
    supervisor(Task.Supervisor, [[name: GenstageExample.TaskSupervisor]]),
    ]
    children = supervisors ++ producers ++ consumers
    opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
    Supervisor.start_link(children, opts)
    end
    . . .
    ```
    First of all, you will notice we are now defining producers, consumers, and supervisors separately.
    I find this convention to work quite well to illustrate the intentions of various processes and trees we are starting here.
    In these 3 lists we set up 12 consumers / CPU core, set up a single producer, and then our supervisors for the Repo, as well as one new one.
    This new supervisor is run through `Task.Supervisor`, which is built into Elixir.
    We give it a name so it is easily referred to in our GenStage code, `GenstageExample.TaskSupervisor`.
    Now, we define our children as the concatenation of all these lists.
    Next, we have `start_later/3`:
    ```elixir
    . . .
    def start_later(module, function, args) do
    payload = {module, function, args} |> :erlang.term_to_binary
    Repo.insert_all("tasks", [
    %{status: "waiting", payload: payload}
    ])
    notify_producer
    end
    . . .
    ```
    This function takes a module, a function, and an argument.
    It then encodes them as a binary using some built-in erlang magic.
    From here, we then insert the task as `waiting`, and we notify a producer that a task has been inserted to run.
    Now let's check out `notify_producer/0`:
    ```elixir
    . . .
    def notify_producer do
    send(Producer, :data_inserted)
    end
    . . .
    ```
    This method is quite simple.
    We send our producer a message, `:data_inserted`, simply so that it knows what we did.
    The message here is arbitrary, but I chose this atom to make the meaning clear.
    Last, but not least we do some simple delegation:
    ```
    . . .
    defdelegate enqueue(module, functions, args), to : Producer
    . . .
    ```
    This simply makes it so if we call `GenstageExample.enqueue(module, function, args)` that it will be delegated to the same method in our producer.
    ### Producer Setup
    Our producer doesn't need a ton of work.
    first, we'll alter our `handle_demand/2` to actually do something with our events:
    ```elixir
    . . .
    def handle_demand(demand, state) when demand > 0 do
    serve_jobs(demand + state)
    end
    . . .
    ```
    We haven't defined `serve_jobs/2` yet, but we'll get there.
    The concept is simple, when we get a demand and demand is > 0, we do some work to the tune of demand + the current state's number of jobs.
    Now that we will be sending a message to the producer when we run `start_later/3`, we will want to respond to it with a `handle_info/2` call:
    ```elixir
    . . .
    def handle_info(:enqueued, state) do
    {count, events} = GenstageExample.Task.take(state)
    {:noreply, events, state - count}
    end
    . . .
    ```
    With this, we simply respond by taking the number of tasks we are told to get ready to run.
    Now let's define `serve_jobs/1`:
    ```elixir
    . . .
    def serve_jobs limit do
    {count, events} = GenstageExample.Task.take(limit)
    Process.send_after(@name, :enqueued, 60_000)
    {:noreply, events, limit - count}
    end
    . . .
    ```
    Now, we are sending a process in one minute that to our producer telling it that it should respond to `:enqueued`.
    Note that we call the process module with `@name`, which we will need to add at the top as a module attribute:
    ```elixir
    . . .
    @name __MODULE__
    . . .
    ```
    Let's define that last function to handle the `:enqueued` message now, too:
    ```elixir
    . . .
    def handle_cast(:enqueued, state) do
    serve_jobs(state)
    end
    . . .
    ```
    This will simply serve jobs when we tell the producer they have `state` number of enqueued and to respond.
    ## Setting Up the Consumer for Real Work
    Our consumer is where we do the work.
    Now that we have our producer storing tasks, we want to have the consumer handle this as well.
    There is a good bit of work to be done here tying into our work so far.
    The core of the consumer is `handle_events/3`, lets flesh out the functionality we wish to have there and define it as we go further:
    ```elixir
    . . .
    def handle_events(events, _from, state) do
    for event <- events do
    %{id: id, payload: payload} = event
    {module, function, args} = payload |> deconstruct_payload
    task = start_task(module, function, args)
    yield_to_and_update_task(task, id)
    end
    {:noreply, [], state}
    end
    . . .
    ```
    At its core, this setup simple just wants to run a task we decode the binary of.
    To do this we get the data from the event, deconstruct it, and then start and yield to a task.
    These functions aren't defined yet, so let's create them:
    ```elixir
    . . .
    def deconstruct_payload payload do
    payload |> :erlang.binary_to_term
    end
    . . .
    ```
    We can use Erlang's built-in inverse of our other `term_to_binary/1` function to get our module, function, and args back out.
    Now we need to start the task:
    ```elixir
    . . .
    def start_task(mod, func, args) do
    Task.Supervisor.async_nolink(TaskSupervisor, mod, func, args)
    end
    . . .
    ```
    Here we leverage the supervisor we created at the beginning to run this in a task.
    Now we need to define `yield_to_and_update_task/2`:
    ```elixir
    . . .
    def yield_to_and_update_task(task, id) do
    task
    |> Task.yield(1000)
    |> yield_to_status(task)
    |> update(id)
    end
    . . .
    ```
    Now this brings in more pieces we've yet to define, but the core is simple.
    We wait 1 second for the task to run.
    From here, we respond to the status it returns (which will either be `:ok`, `:exit`, or `nil`) and handle it as such.
    After that we update our task via our DB interface to get things current.
    Let's define `yield_to_status/2` for each of the scenarios we mentioned:
    ```elixir
    . . .
    def yield_to_status({:ok, _}, _) do
    "success"
    end
    def yield_to_status({:exit, _}, _) do
    "error"
    end
    def yield_to_status(nil, task) do
    Task.shutdown(task)
    "timeout"
    end
    . . .
    ```
    These simple handle the atom being returned from the process and respond appropriately.
    If it takes more than a second, we need to shut it down because otherwise it would just hang forever.
    From this, we can see our finalized consumer:
    ```elixir
    defmodule GenstageExample.Consumer do
    alias Experimental.GenStage
    use GenStage
    alias GenstageExample.{Producer, TaskSupervisor}
    def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
    end
    def init(state) do
    {:consumer, state, subscribe_to: [Producer]}
    end
    def handle_events(events, _from, state) do
    for event <- events do
    %{id: id, payload: payload} = event
    {module, function, args} = payload |> deconstruct_payload
    task = start_task(module, function, args)
    yield_to_and_update_task(task, id)
    end
    {:noreply, [], state}
    end
    defp yield_to_and_update_task(task, id) do
    task
    |> Task.yield(1000)
    |> yield_to_status(task)
    |> update(id)
    end
    defp start_task(mod, func, args) do
    Task.Supervisor.async_nolink(TaskSupervisor, mod , func, args)
    end
    defp yield_to_status({:ok, _}, _) do
    "success"
    end
    defp yield_to_status({:exit, _}, _) do
    "error"
    end
    defp yield_to_status(nil, task) do
    Task.shutdown(task)
    "timeout"
    end
    defp update(status, id) do
    GenstageExample.TaskDBInterface.update_task_status(id, status)
    end
    defp deconstruct_payload payload do
    payload |> :erlang.binary_to_term
    end
    end
    ```
    Now, if we go into IEx:
    ```elixir
    $ iex -S mix
    iex> GenstageExample.enqueue(IO, :puts, ["wuddup"])
    #=>
    16:39:31.014 [debug] QUERY OK db=137.4ms
    INSERT INTO "tasks" ("payload","status") VALUES ($1,$2) [<<131, 104, 3, 100, 0, 9, 69, 108, 105, 120, 105, 114, 46, 73, 79, 100, 0, 4, 112, 117, 116, 115, 108, 0, 0, 0, 1, 109, 0, 0, 0, 6, 119, 117, 100, 100, 117, 112, 106>>, "waiting"]
    :ok
    16:39:31.015 [debug] QUERY OK db=0.4ms queue=0.1ms
    begin []
    16:39:31.025 [debug] QUERY OK source="tasks" db=9.6ms
    SELECT t0."id" FROM "tasks" AS t0 WHERE (t0."status" = 'waiting') LIMIT $1 FOR UPDATE SKIP LOCKED [49000]
    16:39:31.026 [debug] QUERY OK source="tasks" db=0.8ms
    UPDATE "tasks" AS t0 SET "status" = $1 WHERE (t0."id" = ANY($2)) RETURNING t0."id", t0."payload" ["running", [5]]
    16:39:31.040 [debug] QUERY OK db=13.5ms
    commit []
    iex(2)> wuddup
    16:39:31.060 [debug] QUERY OK source="tasks" db=1.3ms
    UPDATE "tasks" AS t0 SET "status" = $1 WHERE (t0."id" = ANY($2)) ["success", [5]]
    ```
    It works and we are storing and running tasks!
    ## TODO publish to hex
    [Here](https://github.com/ybur-yug/genstage_example/tree/87c5f96c74e8fa90cd5b5fd108cd9ba104f78a65) is a link to all code thus far.