Last active
August 14, 2024 17:55
-
-
Save tlux/c50cb47ba65980e2d5cad2949b8b48d5 to your computer and use it in GitHub Desktop.
Simple Elixir Observable using GenStage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Observable do | |
@moduledoc """ | |
A module that provides creating an observible value that emits the values to | |
a stream. | |
""" | |
alias Observable.Server | |
defstruct [:producer, :consumer] | |
@opaque t :: %__MODULE__{ | |
producer: GenServer.server(), | |
consumer: Enumerable.t() | |
} | |
@type value :: any | |
@spec new(value) :: t | |
def new(value \\ nil) do | |
{:ok, producer} = Server.start_link(value) | |
consumer = Server.stream(producer) | |
%__MODULE__{producer: producer, consumer: consumer} | |
end | |
@spec get(t, timeout) :: value | |
def get(observable, timeout \\ 5000) do | |
Server.get(observable.producer, timeout) | |
end | |
@spec put(t, value) :: :ok | |
def put(observable, value) do | |
Server.put(observable.producer, value) | |
end | |
@spec update(t, (value -> value)) :: :ok | |
def update(observable, fun) do | |
Server.update(observable.producer, fun) | |
end | |
@spec consume(t) :: Enumerable.t() | |
def consume(observable) do | |
observable.consumer | |
end | |
@spec close(t, timeout) :: :ok | |
def close(observable, timeout \\ :infinity) do | |
Server.stop(observable.producer, :normal, timeout) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Observable.Server do | |
@moduledoc """ | |
A module that provides a observable data structure. | |
""" | |
use GenStage, restart: :transient | |
@type t :: GenServer.server() | |
@spec start_link(value :: any, GenServer.options()) :: GenServer.on_start() | |
def start_link(value \\ nil, opts \\ []) do | |
GenStage.start_link(__MODULE__, value, opts) | |
end | |
@spec stop(t, reason :: term, timeout) :: :ok | |
def stop(observable, reason \\ :normal, timeout \\ :infinity) do | |
GenStage.stop(observable, reason, timeout) | |
end | |
@spec get(t, timeout) :: any | |
def get(observable, timeout \\ 5000) do | |
GenStage.call(observable, :get, timeout) | |
end | |
@spec put(t, any) :: :ok | |
def put(observable, value) do | |
GenStage.cast(observable, {:put, value}) | |
end | |
@spec stream(t) :: Enumerable.t() | |
def stream(observable) do | |
GenStage.stream([observable]) | |
end | |
@doc """ | |
Updates a value in the state. | |
""" | |
@spec update(t, (any -> any)) :: :ok | |
def update(observable, fun) do | |
GenStage.cast(observable, {:update, fun}) | |
end | |
## Server | |
@impl true | |
def init(value) do | |
{:producer, value, dispatcher: GenStage.BroadcastDispatcher} | |
end | |
@impl true | |
def handle_demand(_demand, value) do | |
{:noreply, [], value} | |
end | |
@impl true | |
def handle_call(:get, _from, value) do | |
{:reply, value, [], value} | |
end | |
@impl true | |
def handle_cast({:put, value}, value) do | |
{:noreply, [], value} | |
end | |
def handle_cast({:put, value}, _prev_value) do | |
{:noreply, [value], value} | |
end | |
def handle_cast({:update, fun}, prev_value) do | |
case fun.(prev_value) do | |
^prev_value -> | |
{:noreply, [], prev_value} | |
value -> | |
{:noreply, [value], value} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment