This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec delete(pit_t()) :: :ok | {:error, HTTPoison.AsyncResponse | HTTPoison.MaybeRedirect | HTTPoison.Response} | |
def delete(pit) do | |
url = HTTP.generate_url("_pit", ES.no_index(), HTTP.no_options()) | |
with {:ok, payload} <- Poison.encode(%{"id" => pit}), | |
%HTTPoison.Response{status_code: 200} <- | |
HTTPoison.request!(:delete, url, payload, HTTP.headers(HTTP.json_encoding())) do | |
:ok | |
else | |
error -> {:error, error} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec initial(PIT.pit_t(), ES.query_t()) :: ES.query_t() | |
def initial(pit, query) do | |
%{ | |
query: query, | |
pit: %{id: pit, keep_alive: PIT.expiration()}, | |
size: ES.max_results(), | |
sort: %{"_shard_doc" => "desc"} | |
} | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec update(ES.query_t(), ES.response_t()) :: ES.query_t() | |
def update(query, %Req.Response{body: %{"hits" => %{"hits" => hits}}, status: 200}) do | |
Map.put(query, :search_after, List.last(hits)["sort"]) | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec streamer(ES.query_t()) :: {:ok, Enumerable.t()} | {:error, any} | |
def streamer(initial) do | |
{:ok, | |
initial | |
|> search() | |
|> Stream.iterate(fn previous -> search(Query.update(initial, previous)) end) | |
} | |
rescue | |
error -> {:error, error} | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec stream_many(ES.query_t(), ES.index_t(), ES.consumer_t(), non_neg_integer()) :: {:error, any()} | {:ok, Enumerable.t()} | |
def stream_many(query, index, consumer, count) do | |
case PIT.create(index) do | |
{:ok, pit_id} -> | |
try do | |
case pit_id |> Query.initial(query) |> streamer() do | |
{:ok, stream} -> | |
stream | |
|> Stream.flat_map(& &1.body["hits"]["hits"]) | |
|> Stream.take(count) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec stream_one(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()} | |
def stream_one(query, index, consumer) do | |
query | |
|> search(index) | |
|> HTTP.on_200(fn body -> consumer.(body["hits"]["hits"]) end) | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec stream(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()} | |
def stream(query, index, consumer) do | |
case count(query, index) do | |
{:ok, count} -> | |
if count > ES.max_results() do | |
stream_many(query, index, consumer, count) | |
else | |
stream_one(query, index, consumer) | |
end | |
error -> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@spec chunker(Enumerable.t(), non_neg_integer(), boolean(), (any() -> non_neg_integer()), (Enumerable.t() -> any())) :: Enumerable.t() | |
def chunker( | |
chunks, | |
chunk_size, | |
ordered \\ true, | |
sizer \\ &String.length/1, | |
joiner \\ &Enum.join/1 | |
) do | |
zero = {0, []} | |
convert = |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Primes do | |
@doc """ | |
Generates a stream of primes. | |
iex> Primes.primes() |> Enum.take(5) | |
[2, 3, 5, 7, 11] | |
iex> Primes.primes() |> Enum.at(25) | |
101 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Stream.iterate(0, fn x -> x + 1 end) |