Skip to content

Instantly share code, notes, and snippets.

@BruOp
Last active March 14, 2017 16:11
Show Gist options
  • Save BruOp/fdf6513e2df4274f9266c9cb5ee8a7fb to your computer and use it in GitHub Desktop.
Save BruOp/fdf6513e2df4274f9266c9cb5ee8a7fb to your computer and use it in GitHub Desktop.
Elixir GenStage Example
require Logger
defmodule JobWorkerPool do
def start_link(worker_count, subscribe_options) do
# Start the producer
{:ok, producer_pid} = GenStage.start_link(__MODULE__.JobProducer, :ok)
# Override consumer subscribe options
subscribe_options = Keyword.put(subscribe_options, :to, producer_pid)
# Start the worker consumers
Enum.each(1..worker_count, fn _ ->
# Start consumer
{:ok, consumer_pid} = GenStage.start_link(__MODULE__.Worker, :ok)
# Subscribe to producer
GenStage.sync_subscribe(consumer_pid, subscribe_options)
end)
{:ok, producer_pid}
end
@doc """
A method called to enqueue new jobs
"""
def enqueue_jobs(pid, jobs) do
:ok = GenStage.call(pid, {:enqueue_jobs, jobs})
end
defmodule JobProducer do
use GenStage
def init(:ok), do: {:producer, {:queue.new(), 0}}
def handle_call({:enqueue_jobs, jobs}, _from, {queue, pending_demand}) do
Logger.info "Enqueued #{length jobs} jobs"
# Take all the jobs and add them to the queue
queue = Enum.reduce(jobs, queue, &:queue.in(&1, &2))
# After we increased our queue, let's take our jobs based on our pending demand
# and process the backlog
{reversed_jobs, state} = take_jobs(queue, pending_demand, [])
{:reply, :ok, Enum.reverse(reversed_jobs), state}
end
def handle_demand(demand, {queue, pending_demand}) do
Logger.info "Handling #{demand} demand with a queue of size #{:queue.len(queue)}"
{reversed_jobs, state} = take_jobs(queue, pending_demand + demand, [])
{:noreply, Enum.reverse(reversed_jobs), state}
end
@doc """
If there isn't any demand, just return the jobs and the queue with 0 pending demand
"""
defp take_jobs(queue, 0, jobs), do: {jobs, {queue, 0}}
@doc """
If there is pending_demand, process the queue
"""
defp take_jobs(queue, n, jobs) when n > 0 do
# Take an item out of the queue one at a time until we take the demand out of the queue
case :queue.out(queue) do
# If the queue is empty, store the queue and the demand
{:empty, ^queue} -> {jobs, {queue, n}}
# If the queue is not empty, take the single job out of the queue
# prepend it to an array, and keep calling the same method to take a job out
# until the number of jobs taken out meets the demand
{{:value, job}, queue} -> take_jobs(queue, n - 1, [job | jobs])
end
end
end
defmodule Worker do
use GenStage
def init(:ok), do: {:consumer, nil}
def handle_events(jobs, _from, nil) do
Logger.info "Handling #{length jobs} job events"
Enum.each(jobs, &(&1.()))
{:noreply, [], nil}
end
end
end
{:ok, pid} = JobWorkerPool.start_link(4, max_demand: 10)
jobs = Enum.map(1..100, fn i ->
fn -> IO.puts "performed job #{i}" end
end)
JobWorkerPool.enqueue_jobs(pid, jobs)
Process.sleep(:infinity)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment