Last active
February 26, 2018 18:24
-
-
Save wfgilman/699ab84cf24a9d59ddb893a21a5d2a79 to your computer and use it in GitHub Desktop.
GenStage Pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Supervisor which starts pipeline. | |
defmodule Pipeline.Supervisor do | |
use Supervisor | |
alias Pipeline.{Collector, Requestor, Loader} | |
def start_link do | |
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) | |
end | |
@impl true | |
def init(:ok) do | |
children = [ | |
supervisor(Task.Supervisor, [[name: Pipeline.Task.Supervisor]]), | |
worker(Collector, []), | |
worker(Requestor, []), | |
worker(Loader, []) | |
] | |
opts = [strategy: :one_for_one, name: __MODULE__, max_restarts: 5] | |
supervise(children, opts) | |
end | |
end | |
# Stage 1: Collect jobs (events) from Database | |
defmodule Pipeline.Collector do | |
use GenStage | |
@poll_interval 60_000 | |
def start_link do | |
GenStage.start_link(__MODULE__, 0, name: __MODULE__) | |
end | |
@impl true | |
def init(initial_demand) do | |
{:producer, initial_demand} | |
end | |
@impl true | |
def handle_demand(demand, current_demand) do | |
new_demand = demand + current_demand | |
send(__MODULE__, :check_for_jobs) | |
{:noreply, [], new_demand} | |
end | |
@impl true | |
def handle_info(:check_for_jobs, current_demand) do | |
Process.send_after(__MODULE__, :check_for_jobs, @poll_interval) | |
jobs = Repo.Job.all_waiting(current_demand) | |
new_demand = current_demand - Enum.count(jobs) | |
{:noreply, jobs, new_demand} | |
end | |
end | |
# Stage 2: Make HTTP Request for each job (event). Returns data. | |
defmodule Pipeline.Requestor do | |
use GenStage | |
alias Pipeline.{Collector, Task.Supervisor} | |
def start_link do | |
GenStage.start_link(__MODULE__, :ok, name: __MODULE__) | |
end | |
@impl true | |
def init(:ok) do | |
{:producer_consumer, :ok, subscribe_to: [{Collector, max_demand: 10}]} | |
end | |
@impl true | |
def handle_events(jobs, _from, state) do | |
data = | |
for job <- jobs do | |
job | |
|> start_task() | |
|> yield_reply(job) | |
end | |
{:noreply, filter(data), state} | |
end | |
# Helper functions. | |
defp start_task(job) do | |
Task.Supervisor.async_nolink(Supervisor, fn -> | |
# Make request to 3rd party API. | |
end) | |
end | |
defp yield_reply(task, job) do | |
case Task.yield(task) do | |
{:ok, data} -> | |
{datum, job} | |
_ -> | |
handle_failure(job) | |
end | |
end | |
end | |
# Stage 3: Loads data (event) into database. | |
defmodule Pipeline.Loader do | |
use GenStage | |
alias Pipeline.{Requestor, Task.Supervisor} | |
def start_link do | |
GenStage.start_link(__MODULE__, :ok, name: __MODULE__) | |
end | |
@impl true | |
def init(:ok) do | |
{:consumer, :ok, subscribe_to: [{Requestor, max_demand: 10}]} | |
end | |
@impl true | |
def handle_events(data, _from, state) do | |
for {datum, job} <- data do | |
datum | |
|> start_task() | |
|> yield_reply(job) | |
end | |
{:noreply, [], state} | |
end | |
## Helper functions. | |
defp start_task(datum) do | |
Task.Supervisor.async_nolink(Supervisor, fn -> | |
# Load datum into database. | |
end) | |
end | |
defp yield_reply(task, job) do | |
case Task.yield(task) do | |
{:ok, _} -> | |
put_success(job) | |
_ -> | |
hanldle_failure(job) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment