Created
June 4, 2019 13:29
-
-
Save frekw/91c0ccbc9ec9c2952b8a1c1bfa1e2784 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Producer do | |
require Logger | |
use GenStage | |
def start_link([]) do | |
Logger.info("starting producer") | |
GenStage.start_link(__MODULE__, :ok, name: __MODULE__) | |
end | |
def init(:ok) do | |
Process.send_after(self(), :notify, 100) | |
{:producer, {:queue.new(), 0}} | |
end | |
def handle_info(:notify, {queue, pending_demand} = state) do | |
Process.send_after(self(), :notify, 5) | |
case pending_demand > 0 do | |
true -> | |
Logger.info("generated event") | |
queue = :queue.in(:event, queue) | |
dispatch_events(queue, pending_demand, []) | |
false -> | |
{:noreply, [], state} | |
end | |
end | |
def handle_demand(incoming_demand, {queue, pending_demand}) do | |
Logger.info("got demand: #{incoming_demand} demand") | |
dispatch_events(queue, incoming_demand + pending_demand, []) | |
end | |
defp dispatch_events(queue, 0, events) do | |
{:noreply, Enum.reverse(events), {queue, 0}} | |
end | |
defp dispatch_events(queue, demand, events) do | |
case :queue.out(queue) do | |
{{:value, event}, queue} -> | |
dispatch_events(queue, demand - 1, [event | events]) | |
{:empty, queue} -> | |
{:noreply, Enum.reverse(events), {queue, demand}} | |
end | |
end | |
end | |
defmodule Consumer do | |
use GenStage | |
require Logger | |
def start_link([]) do | |
GenStage.start_link(__MODULE__, :ok) | |
end | |
def init(:ok) do | |
Logger.info("starting consumer") | |
{:consumer, :ok, subscribe_to: [{Producer, max_demand: 200}]} | |
end | |
def handle_events(events, _from, state) do | |
# Wait for a second. | |
Process.sleep(1000) | |
# Inspect the events. | |
Logger.info("processed #{Enum.count(events)} events") | |
# We are a consumer, so we would never emit items. | |
{:noreply, [], state} | |
end | |
end | |
defmodule Buffering.Application do | |
# See https://hexdocs.pm/elixir/Application.html | |
# for more information on OTP Applications | |
@moduledoc false | |
use Application | |
def start(_type, _args) do | |
# List all child processes to be supervised | |
children = [ | |
Producer, | |
Consumer | |
] | |
# See https://hexdocs.pm/elixir/Supervisor.html | |
# for other strategies and supported options | |
opts = [strategy: :one_for_one, name: Buffering.Supervisor] | |
Supervisor.start_link(children, opts) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15:27:31.858 [info] starting producer | |
15:27:31.861 [info] starting consumer | |
15:27:31.861 [info] got demand: 200 demand | |
15:27:31.960 [info] generated event | |
15:27:32.961 [info] processed 1 events | |
15:27:32.962 [info] generated event | |
.... | |
15:27:33.147 [info] generated event | |
15:27:33.154 [info] generated event | |
15:27:33.961 [info] processed 1 events | |
15:27:34.962 [info] processed 1 events | |
15:27:35.963 [info] processed 1 events | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment