Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Last active December 25, 2024 03:20
Show Gist options
  • Save monotykamary/a0d1a1df9117d053ac0a7661438e407e to your computer and use it in GitHub Desktop.
Save monotykamary/a0d1a1df9117d053ac0a7661438e407e to your computer and use it in GitHub Desktop.
Flow Skiplist in Elixir
defmodule FlowSkiplist do
@chunk_size 1000
def new(data) do
data
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.reduce(fn -> [] end, fn elem, acc -> [elem | acc] end)
|> Enum.to_list()
|> List.flatten()
|> Enum.sort()
|> Enum.chunk_every(@chunk_size)
|> Enum.map(&{List.first(&1), List.last(&1), MapSet.new(&1)})
end
def insert(skiplist, value) do
case Enum.find_index(skiplist, fn {min, max, _} -> value >= min and value <= max end) do
nil ->
# Find closest chunk that could merge
case Enum.find(skiplist, fn {min, max, chunk} ->
MapSet.size(chunk) < @chunk_size and (abs(value - min) <= @chunk_size or abs(value - max) <= @chunk_size)
end) do
nil ->
[{value, value, MapSet.new([value])} | skiplist]
|> Enum.sort_by(fn {min, _, _} -> min end)
{min, max, chunk} ->
new_chunk = MapSet.put(chunk, value)
new_entry = {min(min, value), max(max, value), new_chunk}
skiplist
|> List.delete({min, max, chunk})
|> List.insert_at(0, new_entry)
|> Enum.sort_by(fn {min, _, _} -> min end)
end
idx ->
{min, max, chunk} = Enum.at(skiplist, idx)
new_chunk = MapSet.put(chunk, value)
List.replace_at(skiplist, idx, {min, max, new_chunk})
end
end
def batch_insert(skiplist, values) do
new_chunks = new(values)
(skiplist ++ new_chunks)
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.reduce(fn -> [] end, fn {min, max, chunk}, acc ->
[{min, max, chunk} | acc]
end)
|> Enum.to_list()
|> List.flatten()
|> Enum.sort_by(fn {min, _, _} -> min end)
|> merge_chunks([])
end
defp merge_chunks([], acc), do: Enum.reverse(acc)
defp merge_chunks([chunk], acc), do: merge_chunks([], [chunk | acc])
defp merge_chunks([{min1, max1, chunk1}, {min2, max2, chunk2} | rest], acc) do
if MapSet.size(chunk1) + MapSet.size(chunk2) <= @chunk_size do
merged = MapSet.union(chunk1, chunk2)
merge_chunks([{min(min1, min2), max(max1, max2), merged} | rest], acc)
else
merge_chunks([{min2, max2, chunk2} | rest], [{min1, max1, chunk1} | acc])
end
end
def search(skiplist, value) do
skiplist
|> Flow.from_enumerable()
|> Flow.filter(fn {min, max, _} -> value >= min and value <= max end)
|> Flow.flat_map(fn {_, _, chunk} -> if MapSet.member?(chunk, value), do: [value], else: [] end)
|> Enum.at(0)
|> case do
nil -> :not_found
value -> {:ok, value}
end
end
def delete(skiplist, value) do
skiplist
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.map(fn {min, max, chunk} ->
new_chunk = MapSet.delete(chunk, value)
case Enum.min_max(new_chunk) do
{new_min, new_max} -> {new_min, new_max, new_chunk}
_ -> {min, max, new_chunk}
end
end)
|> Flow.reject(fn {_, _, chunk} -> MapSet.size(chunk) == 0 end)
|> Enum.to_list()
|> Enum.sort_by(fn {min, _, _} -> min end)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment