Created
October 9, 2017 09:18
-
-
Save narrowtux/286666711864246d3dbb6859dda0d694 to your computer and use it in GitHub Desktop.
Module for Ecto and GenStage to work together when you want to use Repo.stream in conjunction with a GenStage subscriber or a Flow.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule RepoStream do | |
defmodule Producer do | |
use GenStage | |
defstruct [:demand, :pid] | |
def start_link() do | |
GenStage.start_link(__MODULE__, self()) | |
end | |
def init(caller) do | |
{:producer, %{ | |
demand: 0, | |
pid: caller | |
}, []} | |
end | |
def handle_call({:set_receiver, pid}, state) do | |
{:reply, :ok, [], %{state | pid: pid}} | |
end | |
def handle_demand(d, state) do | |
send state.pid, {:demand, d} | |
{:noreply, [], %{state | demand: d}} | |
end | |
def handle_subscribe(_, _, _, state) do | |
send state.pid, :ready | |
{:automatic, state} | |
end | |
def handle_info({:supply, items}, state) do | |
remaining_demand = | |
(state.demand - length(items)) | |
|> :erlang.max(0) | |
if remaining_demand > 0 do | |
send state.pid, {:demand, remaining_demand} | |
end | |
{:noreply, items, %{state | demand: remaining_demand}} | |
end | |
def terminate(_, state) do | |
:shutdown | |
end | |
end | |
@doc """ | |
Sets up a producer stage that produces the messages from a repo stream | |
Returns the producer stage which you can use for flow stuff | |
""" | |
def query_into_stage(query, repo) do | |
chunk_size = 500 | |
stream = repo.stream(query) | |
pid = self() | |
Task.async(fn -> | |
repo.transaction(fn -> | |
{:ok, producer} = __MODULE__.Producer.start_link() | |
Process.send_after pid, {:release, producer}, 100 | |
forward(stream, producer) | |
end) | |
end) | |
receive do | |
{:release, prod} -> prod | |
end | |
end | |
defp forward(stream, producer) do | |
# type acc :: {acc :: list[any], demand :: positive_integer} | |
stream | |
|> Stream.transform( | |
fn -> | |
receive do | |
:ready -> :ok | |
end | |
{[], 0} | |
end, | |
fn | |
event, {[], 0} -> | |
receive do | |
{:demand, 1} -> | |
send producer, {:supply, [event]} | |
{[], {[], 0}} | |
{:demand, d} -> | |
{[], {[event], d}} | |
end | |
event, {acc, d} when d > length(acc) + 1 -> | |
{[], {[event | acc], d}} | |
event, {acc, d} when d == length(acc) + 1 -> | |
send producer, {:supply, [event | acc]} | |
{[], {[], 0}} | |
end, | |
fn {acc, _} -> | |
if length(acc) > 0 do | |
send producer, {:supply, acc} | |
end | |
GenStage.stop(producer) | |
end | |
) | |
|> Stream.run() | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I was about to spend an evening writing the same thing, thank you!