-
-
Save treble37/a40889c2778754f6addb2ef8367f18c1 to your computer and use it in GitHub Desktop.
Stream Iterator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule StreamIterator.Messages do | |
@moduledoc """ | |
The Stream Iterator can be used to consume items from an Elixir Stream incrementally. | |
## Examples | |
### Infinite Streams | |
When used on infinite streams such as the ones returned by `Stream.cycle/1`, the Stream | |
Iterator will always return the next item. | |
iex(1)> stream = Stream.cycle([1, 2, 3]) | |
iex(2)> iterator = StreamIterator.start(stream) | |
iex(3)> StreamIterator.next(iterator) | |
{:next, 1} | |
iex(4)> StreamIterator.next(iterator) | |
{:next, 2} | |
iex(5)> StreamIterator.next(iterator) | |
{:next, 3} | |
iex(6)> StreamIterator.next(iterator) | |
{:next, 1} | |
### Finite Streams | |
When used on finite streams such as the ones returned by `Stream.map/1` against an existing | |
List, the Stream Iterator will return each item, then return `:eof`, then `{:error, :executed}` | |
to mark the end of its iteration. | |
iex(1)> stream = Stream.map([1, 2, 3], & &1 + 1) | |
iex(2)> iterator = StreamIterator.start(stream) | |
iex(3)> StreamIterator.next(iterator) | |
{:next, 2} | |
iex(4)> StreamIterator.next(iterator) | |
{:next, 3} | |
iex(5)> StreamIterator.next(iterator) | |
{:next, 4} | |
iex(6)> StreamIterator.next(iterator) | |
:eof | |
iex(7)> StreamIterator.next(iterator) | |
{:error, :executed} | |
""" | |
@opaque t :: %__MODULE__{buffer_pid: pid(), reader_pid: pid()} | |
@enforce_keys ~w(buffer_pid reader_pid)a | |
defstruct buffer_pid: nil, reader_pid: nil | |
@spec start(Enumerable.t()) :: t | |
@spec next(t(), timeout()) :: {:next, item :: term()} | :eof | {:error, reason :: term()} | |
@spec stop(t(), timeout()) :: :ok | |
@doc """ | |
Prepares a Stream Iterator. Note that the Iterator must be run through, or `stop/2` should be | |
called in order to clean up internal state. | |
""" | |
def start(stream) do | |
buffer_pid = spawn(fn -> run_buffer() end) | |
reader_pid = spawn(fn -> run_reader(stream, buffer_pid) end) | |
%__MODULE__{buffer_pid: buffer_pid, reader_pid: reader_pid} | |
end | |
@doc """ | |
Obtains the upcoming item from the iterated Stream. | |
If the Stream has not been completely iterated, this returns `{:next, item}`. Otherwise this may | |
return `:eof` in case the Stream was finite and has been iterated, or `{:error, :executed}` if | |
the Stream was finite, has been iterated, and `:eof` has previously been returned. | |
""" | |
def next(state, timeout \\ 5000) do | |
cond do | |
alive?(state) -> run_next(state, timeout) | |
true -> {:error, :executed} | |
end | |
end | |
@doc """ | |
Cleans up the Stream Iterator. This should be called in case the Iterator has not completely run | |
through the underlying Stream. | |
""" | |
def stop(state, timeout \\ 500) do | |
:ok = stop_pid(state.buffer_pid, timeout) | |
:ok = stop_pid(state.reader_pid, timeout) | |
:ok | |
end | |
defp run_next(state, timeout) do | |
_ = send(state.buffer_pid, {:get, self()}) | |
receive do | |
{:next, item} -> {:next, item} | |
:eof -> run_next_eof(state) | |
after | |
timeout -> run_next_timeout(state) | |
end | |
end | |
defp run_next_eof(state) do | |
:ok = stop(state) | |
:eof | |
end | |
defp run_next_timeout(state) do | |
:ok = stop(state) | |
{:error, :timeout} | |
end | |
defp run_buffer do | |
receive do | |
{:put, item, reader_pid} -> run_buffer_put_item(item, reader_pid) | |
{:put, :eof} -> run_buffer_put_eof() | |
:exit -> :ok | |
end | |
end | |
defp run_buffer_put_item(item, reader_pid) do | |
receive do | |
{:get, caller_pid} -> | |
_ = send(caller_pid, {:next, item}) | |
_ = send(reader_pid, :next) | |
run_buffer() | |
end | |
end | |
defp run_buffer_put_eof do | |
receive do | |
{:get, caller_pid} -> | |
send(caller_pid, :eof) | |
run_buffer() | |
end | |
end | |
defp run_reader(stream, buffer_pid) do | |
Enum.each(stream, fn item -> | |
_ = send(buffer_pid, {:put, item, self()}) | |
:ok = run_reader_receive(:next) | |
end) | |
_ = send(buffer_pid, {:put, :eof}) | |
:ok = run_reader_receive(:exit) | |
end | |
defp run_reader_receive(message) do | |
receive do | |
^message -> :ok | |
end | |
end | |
defp stop_pid(pid, timeout) do | |
with true <- Process.alive?(pid), | |
ref = Process.monitor(pid), | |
_ = send(pid, :exit) do | |
receive do | |
{:DOWN, _, :process, ^pid, _} -> :ok | |
after | |
timeout -> Process.exit(pid, :kill) | |
end | |
_ = Process.demonitor(ref) | |
:ok | |
else | |
false -> :ok | |
end | |
end | |
defp alive?(state) do | |
cond do | |
not Process.alive?(state.buffer_pid) -> false | |
not Process.alive?(state.reader_pid) -> false | |
true -> true | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule StreamIterator.Suspension do | |
@moduledoc """ | |
The Stream Iterator can be used to consume items from an Elixir Stream incrementally. | |
## Examples | |
### Infinite Streams | |
When used on infinite streams such as the ones returned by `Stream.cycle/1`, the Stream | |
Iterator will always return the next item. | |
iex(1)> stream = Stream.cycle([1, 2, 3]) | |
iex(2)> iterator = StreamIterator.Suspension.start(stream) | |
iex(3)> {:next, 1, iterator} = StreamIterator.Suspension.next(iterator) | |
iex(4)> {:next, 2, iterator} = StreamIterator.Suspension.next(iterator) | |
iex(5)> {:next, 3, iterator} = StreamIterator.Suspension.next(iterator) | |
iex(6)> {:next, 1, _iterator} = StreamIterator.Suspension.next(iterator) | |
### Finite Streams | |
When used on finite streams such as the ones returned by `Stream.map/1` against an existing | |
List, the Stream Iterator will return each item, then return `{:eof, state}` to mark the end | |
of its iteration. | |
iex(1)> stream = Stream.map([1, 2, 3], & &1 + 1) | |
iex(2)> iterator = StreamIterator.Suspension.start(stream) | |
iex(3)> {:next, 2, iterator} = StreamIterator.Suspension.next(iterator) | |
iex(4)> {:next, 3, iterator} = StreamIterator.Suspension.next(iterator) | |
iex(5)> {:next, 4, iterator} = StreamIterator.Suspension.next(iterator) | |
iex(6)> {:eof, _iterator} = StreamIterator.Suspension.next(iterator) | |
""" | |
@opaque t :: %__MODULE__{continuation: function | nil} | |
@enforce_keys ~w(continuation)a | |
defstruct continuation: nil | |
@spec start(Enumerable.t()) :: t | |
@spec next(t()) :: {:next, item :: term(), t} | {:eof, t} | |
@spec stop(t()) :: {:ok, t} | |
@doc """ | |
Prepares the Iterator. | |
""" | |
def start(stream) do | |
reduce_fun = fn item, _acc -> {:suspend, item} end | |
{:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun) | |
%__MODULE__{continuation: continuation} | |
end | |
@doc """ | |
Obtains the upcoming item from the iterated Stream. | |
If the Stream has not been completely iterated, this returns `{:next, item, state}`. Otherwise, this returns `{:eof, state}`. | |
""" | |
def next(state) | |
def next(%{continuation: nil} = state) do | |
{:eof, state} | |
end | |
def next(state) do | |
case state.continuation.({:cont, nil}) do | |
{:suspended, item, continuation} -> {:next, item, %{state | continuation: continuation}} | |
{:halted, item} -> {item, %{state | continuation: nil}} | |
{:done, nil} -> {:eof, %{state | continuation: nil}} | |
end | |
end | |
@doc """ | |
Cleans up the Stream Iterator. | |
""" | |
def stop(state) do | |
{:ok, %{state | continuation: nil}} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment