Skip to content

Instantly share code, notes, and snippets.

View rugyoga's full-sized avatar

Guy Argo rugyoga

View GitHub Profile
@rugyoga
rugyoga / pit_delete.ex
Last active July 21, 2022 08:44
Delete a ES Point In Time ID
@spec delete(pit_t()) :: :ok | {:error, HTTPoison.AsyncResponse | HTTPoison.MaybeRedirect | HTTPoison.Response}
def delete(pit) do
url = HTTP.generate_url("_pit", ES.no_index(), HTTP.no_options())
with {:ok, payload} <- Poison.encode(%{"id" => pit}),
%HTTPoison.Response{status_code: 200} <-
HTTPoison.request!(:delete, url, payload, HTTP.headers(HTTP.json_encoding())) do
:ok
else
error -> {:error, error}
@rugyoga
rugyoga / query_initial.ex
Last active July 21, 2022 08:43
Create the initial ES Query based on PIT ID and a ES Query
@spec initial(PIT.pit_t(), ES.query_t()) :: ES.query_t()
def initial(pit, query) do
%{
query: query,
pit: %{id: pit, keep_alive: PIT.expiration()},
size: ES.max_results(),
sort: %{"_shard_doc" => "desc"}
}
end
@rugyoga
rugyoga / query_update.ex
Created July 19, 2022 22:25
Update the ES query using the most recent result set
@spec update(ES.query_t(), ES.response_t()) :: ES.query_t()
def update(query, %Req.Response{body: %{"hits" => %{"hits" => hits}}, status: 200}) do
Map.put(query, :search_after, List.last(hits)["sort"])
end
@rugyoga
rugyoga / streamer.ex
Last active July 21, 2022 18:42
Use Stream.iterate to process a Stream of ES result sets
@spec streamer(ES.query_t()) :: {:ok, Enumerable.t()} | {:error, any}
def streamer(initial) do
{:ok,
initial
|> search()
|> Stream.iterate(fn previous -> search(Query.update(initial, previous)) end)
}
rescue
error -> {:error, error}
end
@rugyoga
rugyoga / stream_many.ex
Last active July 21, 2022 19:22
Create PIT ID, perform multiple ES searches and then return a Stream of hits.
@spec stream_many(ES.query_t(), ES.index_t(), ES.consumer_t(), non_neg_integer()) :: {:error, any()} | {:ok, Enumerable.t()}
def stream_many(query, index, consumer, count) do
case PIT.create(index) do
{:ok, pit_id} ->
try do
case pit_id |> Query.initial(query) |> streamer() do
{:ok, stream} ->
stream
|> Stream.flat_map(& &1.body["hits"]["hits"])
|> Stream.take(count)
@rugyoga
rugyoga / stream_one.ex
Last active July 21, 2022 19:18
Perform one ES query and return the resulting hits in a Stream
@spec stream_one(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream_one(query, index, consumer) do
query
|> search(index)
|> HTTP.on_200(fn body -> consumer.(body["hits"]["hits"]) end)
end
@rugyoga
rugyoga / stream.ex
Last active July 21, 2022 19:16
Query the size of the result set and choose between stream_one and stream_many accordingly
@spec stream(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream(query, index, consumer) do
case count(query, index) do
{:ok, count} ->
if count > ES.max_results() do
stream_many(query, index, consumer, count)
else
stream_one(query, index, consumer)
end
error ->
@rugyoga
rugyoga / chunker.ex
Created July 19, 2022 22:55
Chunk a stream based on orderedness, sizer and joiner.
@spec chunker(Enumerable.t(), non_neg_integer(), boolean(), (any() -> non_neg_integer()), (Enumerable.t() -> any())) :: Enumerable.t()
def chunker(
chunks,
chunk_size,
ordered \\ true,
sizer \\ &String.length/1,
joiner \\ &Enum.join/1
) do
zero = {0, []}
convert =
@rugyoga
rugyoga / primes.ex
Last active July 20, 2022 17:37
Prime number generator using Elixir Streams
defmodule Primes do
@doc """
Generates a stream of primes.
iex> Primes.primes() |> Enum.take(5)
[2, 3, 5, 7, 11]
iex> Primes.primes() |> Enum.at(25)
101
@rugyoga
rugyoga / natural.ex
Last active July 20, 2022 23:55
A Stream of natural numbers
Stream.iterate(0, fn x -> x + 1 end)