Created
November 2, 2019 10:50
-
-
Save cblavier/c8fc29de8f0249d8f212c606c6285a10 to your computer and use it in GitHub Desktop.
GenStage producer streaming / buffering data from an Ecto query. Can be used with Flow.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Flow.EctoProducer do | |
use GenStage | |
import Ecto.Query | |
@default_chunk_size 5000 | |
@default_key :id | |
@stop_delay 10 | |
# Possible options for producer | |
# - chunk_size: minimal number of rows retrieved by Ecto at once. | |
# ignored if chunk size is lower than demand | |
# - key: table key used to sort database entries and perform pagination | |
def init({repo, query, options}) do | |
state = %{ | |
repo: repo, | |
query: from(subquery(query)), | |
buffer: [], | |
chunk_size: Keyword.get(options, :chunk_size, @default_chunk_size), | |
key: Keyword.get(options, :key, @default_key), | |
last_key: nil, | |
demand: 0, | |
exhausted: false | |
} | |
{:producer, state, buffer_size: state.chunk_size} | |
end | |
# Handles demand until supplying query is exhausted. | |
# Synchronously fetches new events from Ecto until demand is covered or query is exhausted. | |
def handle_demand(incoming_demand, state = %{exhausted: false, buffer: buffer, demand: demand}) do | |
{supply, new_buffer, new_demand} = supply_for_demand(incoming_demand + demand, buffer) | |
if new_demand > 0 do | |
{fetched, last_key, exhausted} = fetch_for_demand(state, new_demand) | |
{supply, new_buffer, new_demand} = | |
supply_for_demand(incoming_demand + demand, buffer ++ fetched) | |
if exhausted && Enum.empty?(new_buffer) do | |
self() |> stop() | |
end | |
{:noreply, supply, | |
%{state | demand: new_demand, buffer: new_buffer, last_key: last_key, exhausted: exhausted}} | |
else | |
{:noreply, supply, %{state | demand: 0, buffer: new_buffer}} | |
end | |
end | |
# Handles demand when query is exhausted but buffer is not empty yet | |
def handle_demand(incoming_demand, state = %{exhausted: true, buffer: buffer, demand: demand}) do | |
{supply, new_buffer, new_demand} = supply_for_demand(incoming_demand + demand, buffer) | |
if Enum.empty?(new_buffer) do | |
self() |> stop() | |
end | |
{:noreply, supply, %{state | demand: new_demand, buffer: new_buffer}} | |
end | |
# Stops producer when query is exhausted and buffer is empty | |
def handle_info(:stop, state) do | |
{:stop, :normal, state} | |
end | |
defp supply_for_demand(demand, buffer) do | |
{supply, new_buffer} = Enum.split(buffer, demand) | |
new_demand = demand - Enum.count(supply) | |
{supply, new_buffer, new_demand} | |
end | |
defp fetch_for_demand(state, demand) do | |
new_chunk_size = Enum.max([demand, state.chunk_size]) | |
fetched = fetch_from_db(%{state | chunk_size: new_chunk_size}) | |
{fetched, get_last_key(fetched, state.key), Enum.count(fetched) < new_chunk_size} | |
end | |
defp fetch_from_db(state = %{repo: repo, last_key: nil}) do | |
state | |
|> base_fetch_query() | |
|> repo.all() | |
end | |
defp fetch_from_db(state = %{repo: repo, key: key, last_key: last_key}) do | |
state | |
|> base_fetch_query() | |
|> where([r], field(r, ^key) > ^last_key) | |
|> repo.all() | |
end | |
defp base_fetch_query(%{query: query, key: key, chunk_size: chunk_size}) do | |
query | |
|> order_by([r], field(r, ^key)) | |
|> limit(^chunk_size) | |
end | |
defp get_last_key([], _key), do: nil | |
defp get_last_key(fetched, key) do | |
fetched |> Enum.at(-1) |> Map.get(key) | |
end | |
defp stop(pid) do | |
Process.send_after(pid, :stop, @stop_delay) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment