Skip to content

Instantly share code, notes, and snippets.

@treble37
Forked from evadne/messages.ex
Created October 10, 2020 17:41
Show Gist options
  • Save treble37/a40889c2778754f6addb2ef8367f18c1 to your computer and use it in GitHub Desktop.
Save treble37/a40889c2778754f6addb2ef8367f18c1 to your computer and use it in GitHub Desktop.
Stream Iterator
defmodule StreamIterator.Messages do
@moduledoc """
The Stream Iterator can be used to consume items from an Elixir Stream incrementally.
## Examples
### Infinite Streams
When used on infinite streams such as the ones returned by `Stream.cycle/1`, the Stream
Iterator will always return the next item.
iex(1)> stream = Stream.cycle([1, 2, 3])
iex(2)> iterator = StreamIterator.start(stream)
iex(3)> StreamIterator.next(iterator)
{:next, 1}
iex(4)> StreamIterator.next(iterator)
{:next, 2}
iex(5)> StreamIterator.next(iterator)
{:next, 3}
iex(6)> StreamIterator.next(iterator)
{:next, 1}
### Finite Streams
When used on finite streams such as the ones returned by `Stream.map/1` against an existing
List, the Stream Iterator will return each item, then return `:eof`, then `{:error, :executed}`
to mark the end of its iteration.
iex(1)> stream = Stream.map([1, 2, 3], & &1 + 1)
iex(2)> iterator = StreamIterator.start(stream)
iex(3)> StreamIterator.next(iterator)
{:next, 2}
iex(4)> StreamIterator.next(iterator)
{:next, 3}
iex(5)> StreamIterator.next(iterator)
{:next, 4}
iex(6)> StreamIterator.next(iterator)
:eof
iex(7)> StreamIterator.next(iterator)
{:error, :executed}
"""
@opaque t :: %__MODULE__{buffer_pid: pid(), reader_pid: pid()}
@enforce_keys ~w(buffer_pid reader_pid)a
defstruct buffer_pid: nil, reader_pid: nil
@spec start(Enumerable.t()) :: t
@spec next(t(), timeout()) :: {:next, item :: term()} | :eof | {:error, reason :: term()}
@spec stop(t(), timeout()) :: :ok
@doc """
Prepares a Stream Iterator. Note that the Iterator must be run through, or `stop/2` should be
called in order to clean up internal state.
"""
def start(stream) do
buffer_pid = spawn(fn -> run_buffer() end)
reader_pid = spawn(fn -> run_reader(stream, buffer_pid) end)
%__MODULE__{buffer_pid: buffer_pid, reader_pid: reader_pid}
end
@doc """
Obtains the upcoming item from the iterated Stream.
If the Stream has not been completely iterated, this returns `{:next, item}`. Otherwise this may
return `:eof` in case the Stream was finite and has been iterated, or `{:error, :executed}` if
the Stream was finite, has been iterated, and `:eof` has previously been returned.
"""
def next(state, timeout \\ 5000) do
cond do
alive?(state) -> run_next(state, timeout)
true -> {:error, :executed}
end
end
@doc """
Cleans up the Stream Iterator. This should be called in case the Iterator has not completely run
through the underlying Stream.
"""
def stop(state, timeout \\ 500) do
:ok = stop_pid(state.buffer_pid, timeout)
:ok = stop_pid(state.reader_pid, timeout)
:ok
end
defp run_next(state, timeout) do
_ = send(state.buffer_pid, {:get, self()})
receive do
{:next, item} -> {:next, item}
:eof -> run_next_eof(state)
after
timeout -> run_next_timeout(state)
end
end
defp run_next_eof(state) do
:ok = stop(state)
:eof
end
defp run_next_timeout(state) do
:ok = stop(state)
{:error, :timeout}
end
defp run_buffer do
receive do
{:put, item, reader_pid} -> run_buffer_put_item(item, reader_pid)
{:put, :eof} -> run_buffer_put_eof()
:exit -> :ok
end
end
defp run_buffer_put_item(item, reader_pid) do
receive do
{:get, caller_pid} ->
_ = send(caller_pid, {:next, item})
_ = send(reader_pid, :next)
run_buffer()
end
end
defp run_buffer_put_eof do
receive do
{:get, caller_pid} ->
send(caller_pid, :eof)
run_buffer()
end
end
defp run_reader(stream, buffer_pid) do
Enum.each(stream, fn item ->
_ = send(buffer_pid, {:put, item, self()})
:ok = run_reader_receive(:next)
end)
_ = send(buffer_pid, {:put, :eof})
:ok = run_reader_receive(:exit)
end
defp run_reader_receive(message) do
receive do
^message -> :ok
end
end
defp stop_pid(pid, timeout) do
with true <- Process.alive?(pid),
ref = Process.monitor(pid),
_ = send(pid, :exit) do
receive do
{:DOWN, _, :process, ^pid, _} -> :ok
after
timeout -> Process.exit(pid, :kill)
end
_ = Process.demonitor(ref)
:ok
else
false -> :ok
end
end
defp alive?(state) do
cond do
not Process.alive?(state.buffer_pid) -> false
not Process.alive?(state.reader_pid) -> false
true -> true
end
end
end
defmodule StreamIterator.Suspension do
@moduledoc """
The Stream Iterator can be used to consume items from an Elixir Stream incrementally.
## Examples
### Infinite Streams
When used on infinite streams such as the ones returned by `Stream.cycle/1`, the Stream
Iterator will always return the next item.
iex(1)> stream = Stream.cycle([1, 2, 3])
iex(2)> iterator = StreamIterator.Suspension.start(stream)
iex(3)> {:next, 1, iterator} = StreamIterator.Suspension.next(iterator)
iex(4)> {:next, 2, iterator} = StreamIterator.Suspension.next(iterator)
iex(5)> {:next, 3, iterator} = StreamIterator.Suspension.next(iterator)
iex(6)> {:next, 1, _iterator} = StreamIterator.Suspension.next(iterator)
### Finite Streams
When used on finite streams such as the ones returned by `Stream.map/1` against an existing
List, the Stream Iterator will return each item, then return `{:eof, state}` to mark the end
of its iteration.
iex(1)> stream = Stream.map([1, 2, 3], & &1 + 1)
iex(2)> iterator = StreamIterator.Suspension.start(stream)
iex(3)> {:next, 2, iterator} = StreamIterator.Suspension.next(iterator)
iex(4)> {:next, 3, iterator} = StreamIterator.Suspension.next(iterator)
iex(5)> {:next, 4, iterator} = StreamIterator.Suspension.next(iterator)
iex(6)> {:eof, _iterator} = StreamIterator.Suspension.next(iterator)
"""
@opaque t :: %__MODULE__{continuation: function | nil}
@enforce_keys ~w(continuation)a
defstruct continuation: nil
@spec start(Enumerable.t()) :: t
@spec next(t()) :: {:next, item :: term(), t} | {:eof, t}
@spec stop(t()) :: {:ok, t}
@doc """
Prepares the Iterator.
"""
def start(stream) do
reduce_fun = fn item, _acc -> {:suspend, item} end
{:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun)
%__MODULE__{continuation: continuation}
end
@doc """
Obtains the upcoming item from the iterated Stream.
If the Stream has not been completely iterated, this returns `{:next, item, state}`. Otherwise, this returns `{:eof, state}`.
"""
def next(state)
def next(%{continuation: nil} = state) do
{:eof, state}
end
def next(state) do
case state.continuation.({:cont, nil}) do
{:suspended, item, continuation} -> {:next, item, %{state | continuation: continuation}}
{:halted, item} -> {item, %{state | continuation: nil}}
{:done, nil} -> {:eof, %{state | continuation: nil}}
end
end
@doc """
Cleans up the Stream Iterator.
"""
def stop(state) do
{:ok, %{state | continuation: nil}}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment