Skip to content

Instantly share code, notes, and snippets.

@tlux
Last active August 14, 2024 17:55
Show Gist options
  • Save tlux/c50cb47ba65980e2d5cad2949b8b48d5 to your computer and use it in GitHub Desktop.
Save tlux/c50cb47ba65980e2d5cad2949b8b48d5 to your computer and use it in GitHub Desktop.
Simple Elixir Observable using GenStage
defmodule Observable do
@moduledoc """
A module that provides creating an observible value that emits the values to
a stream.
"""
alias Observable.Server
defstruct [:producer, :consumer]
@opaque t :: %__MODULE__{
producer: GenServer.server(),
consumer: Enumerable.t()
}
@type value :: any
@spec new(value) :: t
def new(value \\ nil) do
{:ok, producer} = Server.start_link(value)
consumer = Server.stream(producer)
%__MODULE__{producer: producer, consumer: consumer}
end
@spec get(t, timeout) :: value
def get(observable, timeout \\ 5000) do
Server.get(observable.producer, timeout)
end
@spec put(t, value) :: :ok
def put(observable, value) do
Server.put(observable.producer, value)
end
@spec update(t, (value -> value)) :: :ok
def update(observable, fun) do
Server.update(observable.producer, fun)
end
@spec consume(t) :: Enumerable.t()
def consume(observable) do
observable.consumer
end
@spec close(t, timeout) :: :ok
def close(observable, timeout \\ :infinity) do
Server.stop(observable.producer, :normal, timeout)
end
end
defmodule Observable.Server do
@moduledoc """
A module that provides a observable data structure.
"""
use GenStage, restart: :transient
@type t :: GenServer.server()
@spec start_link(value :: any, GenServer.options()) :: GenServer.on_start()
def start_link(value \\ nil, opts \\ []) do
GenStage.start_link(__MODULE__, value, opts)
end
@spec stop(t, reason :: term, timeout) :: :ok
def stop(observable, reason \\ :normal, timeout \\ :infinity) do
GenStage.stop(observable, reason, timeout)
end
@spec get(t, timeout) :: any
def get(observable, timeout \\ 5000) do
GenStage.call(observable, :get, timeout)
end
@spec put(t, any) :: :ok
def put(observable, value) do
GenStage.cast(observable, {:put, value})
end
@spec stream(t) :: Enumerable.t()
def stream(observable) do
GenStage.stream([observable])
end
@doc """
Updates a value in the state.
"""
@spec update(t, (any -> any)) :: :ok
def update(observable, fun) do
GenStage.cast(observable, {:update, fun})
end
## Server
@impl true
def init(value) do
{:producer, value, dispatcher: GenStage.BroadcastDispatcher}
end
@impl true
def handle_demand(_demand, value) do
{:noreply, [], value}
end
@impl true
def handle_call(:get, _from, value) do
{:reply, value, [], value}
end
@impl true
def handle_cast({:put, value}, value) do
{:noreply, [], value}
end
def handle_cast({:put, value}, _prev_value) do
{:noreply, [value], value}
end
def handle_cast({:update, fun}, prev_value) do
case fun.(prev_value) do
^prev_value ->
{:noreply, [], prev_value}
value ->
{:noreply, [value], value}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment