Skip to content

Instantly share code, notes, and snippets.

@korczis
Created September 4, 2024 17:24
Show Gist options
  • Save korczis/7433f2d38f12126d84be6171a6b21817 to your computer and use it in GitHub Desktop.
Save korczis/7433f2d38f12126d84be6171a6b21817 to your computer and use it in GitHub Desktop.
Query Stream - Transform your Ecto Query into Stream!
defmodule Ecto.QueryStream do
@moduledoc """
A module to create a stream from an Ecto query.
This module provides functionality to stream results from an Ecto query in a paginated manner.
It is useful for handling large datasets efficiently by loading records in batches.
## Example Usage
query = from u in User, where: u.age > 30
stream = Ecto.QueryStream.new(query, Repo)
stream
|> Enum.take(5)
|> IO.inspect()
## Functions
- `new/2`: Creates a stream from an Ecto query and a repository.
- `start_stream/3`: Initializes the stream state.
- `next_batch/1`: Fetches the next batch of results.
- `end_stream/1`: Cleans up any resources when the stream ends.
"""
import Ecto.Query
@type opts :: Keyword.t()
@spec new(Ecto.Query.t(), Ecto.Repo.t(), opts) :: Enumerable.t()
@doc """
Creates a stream from the given Ecto query and repository.
This function initializes a stream resource that will fetch the results of the query
in batches, allowing for efficient processing of large datasets.
## Parameters
- `query` (Ecto.Query): The Ecto query to be streamed.
- `repo` (Ecto.Repo): The Ecto repository to execute the query against.
- `opts` (opts): Options for stream configuration. (Optional)
Available Options:
- `:batch_size` (integer): The size of each batch. Defaults to 100.
## Examples
iex> query = from u in User, where: u.age > 30
...> stream = Ecto.QueryStream.new(query, Repo)
...> Enum.take(stream, 5)
[%User{id: 1, age: 35}, %User{id: 2, age: 40}, ...]
"""
def new(query, repo, opts \\ []) do
Stream.resource(
fn -> start_stream(query, repo, opts) end,
&next_batch/1,
&end_stream/1
)
end
@spec start_stream(Ecto.Query.t(), Ecto.Repo.t(), opts) :: map()
@doc """
Initializes the stream state.
This function sets up the initial state for streaming, including the query,
the repository, the current page, and the batch size.
## Parameters
- `query` (Ecto.Query): The Ecto query to be streamed.
- `repo` (Ecto.Repo): The Ecto repository to execute the query against.
- `opts` (opts): Options for stream configuration.
"""
def start_stream(query, repo, opts \\ []) do
batch_size = Keyword.get(opts, :batch_size, 100)
%{query: query, page: 1, batch_size: batch_size, repo: repo}
end
@spec next_batch(map()) :: {:halt, map()} | {Enumerable.t(), map()}
@doc """
Fetches the next batch of results.
This function retrieves the next set of results from the database based on the current state.
If there are no more results, it halts the stream.
## Parameters
- `state` (map): The current state of the stream, including the query, repository, page, and batch size.
"""
def next_batch(%{query: query, page: page, batch_size: batch_size, repo: repo} = state) do
offset = (page - 1) * batch_size
batch = repo.all(from q in query, limit: ^batch_size, offset: ^offset)
case batch do
[] -> {:halt, state}
_ -> {batch, %{state | page: page + 1}}
end
end
@spec end_stream(map()) :: :ok
@doc """
Cleans up any resources when the stream ends.
This function is called when the stream is finished. It can be used to perform any necessary cleanup.
## Parameters
- `state` (map): The final state of the stream.
"""
def end_stream(_state) do
:ok
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment