Skip to content

Instantly share code, notes, and snippets.

@frekw
Created June 4, 2019 13:29
Show Gist options
  • Save frekw/91c0ccbc9ec9c2952b8a1c1bfa1e2784 to your computer and use it in GitHub Desktop.
Save frekw/91c0ccbc9ec9c2952b8a1c1bfa1e2784 to your computer and use it in GitHub Desktop.
defmodule Producer do
require Logger
use GenStage
def start_link([]) do
Logger.info("starting producer")
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
Process.send_after(self(), :notify, 100)
{:producer, {:queue.new(), 0}}
end
def handle_info(:notify, {queue, pending_demand} = state) do
Process.send_after(self(), :notify, 5)
case pending_demand > 0 do
true ->
Logger.info("generated event")
queue = :queue.in(:event, queue)
dispatch_events(queue, pending_demand, [])
false ->
{:noreply, [], state}
end
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
Logger.info("got demand: #{incoming_demand} demand")
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, event}, queue} ->
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
defmodule Consumer do
use GenStage
require Logger
def start_link([]) do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
Logger.info("starting consumer")
{:consumer, :ok, subscribe_to: [{Producer, max_demand: 200}]}
end
def handle_events(events, _from, state) do
# Wait for a second.
Process.sleep(1000)
# Inspect the events.
Logger.info("processed #{Enum.count(events)} events")
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
defmodule Buffering.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
Producer,
Consumer
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Buffering.Supervisor]
Supervisor.start_link(children, opts)
end
end
15:27:31.858 [info] starting producer
15:27:31.861 [info] starting consumer
15:27:31.861 [info] got demand: 200 demand
15:27:31.960 [info] generated event
15:27:32.961 [info] processed 1 events
15:27:32.962 [info] generated event
....
15:27:33.147 [info] generated event
15:27:33.154 [info] generated event
15:27:33.961 [info] processed 1 events
15:27:34.962 [info] processed 1 events
15:27:35.963 [info] processed 1 events
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment