Created
September 4, 2024 17:24
-
-
Save korczis/7433f2d38f12126d84be6171a6b21817 to your computer and use it in GitHub Desktop.
Query Stream - Transform your Ecto Query into Stream!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Ecto.QueryStream do | |
@moduledoc """ | |
A module to create a stream from an Ecto query. | |
This module provides functionality to stream results from an Ecto query in a paginated manner. | |
It is useful for handling large datasets efficiently by loading records in batches. | |
## Example Usage | |
query = from u in User, where: u.age > 30 | |
stream = Ecto.QueryStream.new(query, Repo) | |
stream | |
|> Enum.take(5) | |
|> IO.inspect() | |
## Functions | |
- `new/2`: Creates a stream from an Ecto query and a repository. | |
- `start_stream/3`: Initializes the stream state. | |
- `next_batch/1`: Fetches the next batch of results. | |
- `end_stream/1`: Cleans up any resources when the stream ends. | |
""" | |
import Ecto.Query | |
@type opts :: Keyword.t() | |
@spec new(Ecto.Query.t(), Ecto.Repo.t(), opts) :: Enumerable.t() | |
@doc """ | |
Creates a stream from the given Ecto query and repository. | |
This function initializes a stream resource that will fetch the results of the query | |
in batches, allowing for efficient processing of large datasets. | |
## Parameters | |
- `query` (Ecto.Query): The Ecto query to be streamed. | |
- `repo` (Ecto.Repo): The Ecto repository to execute the query against. | |
- `opts` (opts): Options for stream configuration. (Optional) | |
Available Options: | |
- `:batch_size` (integer): The size of each batch. Defaults to 100. | |
## Examples | |
iex> query = from u in User, where: u.age > 30 | |
...> stream = Ecto.QueryStream.new(query, Repo) | |
...> Enum.take(stream, 5) | |
[%User{id: 1, age: 35}, %User{id: 2, age: 40}, ...] | |
""" | |
def new(query, repo, opts \\ []) do | |
Stream.resource( | |
fn -> start_stream(query, repo, opts) end, | |
&next_batch/1, | |
&end_stream/1 | |
) | |
end | |
@spec start_stream(Ecto.Query.t(), Ecto.Repo.t(), opts) :: map() | |
@doc """ | |
Initializes the stream state. | |
This function sets up the initial state for streaming, including the query, | |
the repository, the current page, and the batch size. | |
## Parameters | |
- `query` (Ecto.Query): The Ecto query to be streamed. | |
- `repo` (Ecto.Repo): The Ecto repository to execute the query against. | |
- `opts` (opts): Options for stream configuration. | |
""" | |
def start_stream(query, repo, opts \\ []) do | |
batch_size = Keyword.get(opts, :batch_size, 100) | |
%{query: query, page: 1, batch_size: batch_size, repo: repo} | |
end | |
@spec next_batch(map()) :: {:halt, map()} | {Enumerable.t(), map()} | |
@doc """ | |
Fetches the next batch of results. | |
This function retrieves the next set of results from the database based on the current state. | |
If there are no more results, it halts the stream. | |
## Parameters | |
- `state` (map): The current state of the stream, including the query, repository, page, and batch size. | |
""" | |
def next_batch(%{query: query, page: page, batch_size: batch_size, repo: repo} = state) do | |
offset = (page - 1) * batch_size | |
batch = repo.all(from q in query, limit: ^batch_size, offset: ^offset) | |
case batch do | |
[] -> {:halt, state} | |
_ -> {batch, %{state | page: page + 1}} | |
end | |
end | |
@spec end_stream(map()) :: :ok | |
@doc """ | |
Cleans up any resources when the stream ends. | |
This function is called when the stream is finished. It can be used to perform any necessary cleanup. | |
## Parameters | |
- `state` (map): The final state of the stream. | |
""" | |
def end_stream(_state) do | |
:ok | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment