Skip to content

Instantly share code, notes, and snippets.

@lmarlow
Last active December 13, 2016 17:58
Show Gist options
  • Save lmarlow/c6d4ed14f55f3f7026aecb926e63e8ba to your computer and use it in GitHub Desktop.
Save lmarlow/c6d4ed14f55f3f7026aecb926e63e8ba to your computer and use it in GitHub Desktop.
Chunky flow
alias Experimental.Flow
defmodule Chunky do
@moduledoc ~S"""
A chunky stream produces documents, each with a list of items in it.
The default settings will produce documents with a few thousand items
for a bit and then fall back to trickling in items every few seconds.
This simulates catching up to some external paged feed.
"""
require Logger
defstruct initial_chunks: [1000, 1000, 1000, 167],
max_chunk_size: 10,
chunk_probability: 0.2
defmodule Doc do
defstruct collected_at: nil, items: []
end
def emit(%Chunky{initial_chunks: []} = acc) do
Process.sleep(500)
chunk = gen_chunk(:rand.uniform, acc.chunk_probability, acc.max_chunk_size)
{[document(chunk)], acc}
end
def emit(%Chunky{initial_chunks: [0 | rest]} = acc) do
{[document([])], %{acc | initial_chunks: rest}}
end
def emit(%Chunky{initial_chunks: [size | rest]} = acc) do
{[document(Enum.to_list(1..size))], %{acc | initial_chunks: rest}}
end
defp document(chunk) do
Logger.info("Emitting #{length(chunk)} items in process: #{inspect(self)}")
%Doc{items: chunk, collected_at: DateTime.utc_now}
end
defp gen_chunk(rand, prob, _) when rand > prob, do: []
defp gen_chunk(_, _, size) do
n = :rand.uniform(size)
List.duplicate(n, n)
end
def stream(acc \\ %Chunky{})
def stream(init_chunks) when is_list(init_chunks), do: stream(%Chunky{initial_chunks: init_chunks})
def stream(%Chunky{} = acc) do
Stream.resource(fn ->
Logger.debug("Initializing stream in process: #{inspect(self)}")
acc
end,
&Chunky.emit/1, fn(_) -> :ok end)
end
@doc ~S"""
Count the number of items seen every 5 seconds.
Starts quickly as the max demand of 10 is only accumulated across 1 stage.
"""
def flow_quickly do
Chunky.stream
|> Flow.from_enumerable(stages: 1, max_demand: 10)
|> Flow.flat_map(fn(%Doc{items: items}) ->
Logger.debug("exploding document into #{length(items)} items in process: #{inspect(self)}")
items
end)
|> Flow.partition(window: Flow.Window.global |> Flow.Window.trigger_periodically(5, :second, :reset))
|> Flow.reduce(fn -> 0 end,
fn(_, acc) ->
Logger.debug("incrementing #{acc} in process: #{inspect(self)}")
acc + 1
end)
|> Flow.departition(fn -> 0 end,
fn(partition_state, acc) ->
sum = partition_state + acc
Logger.info("summing to #{sum} in process: #{inspect(self)}")
sum
end,
&(&1))
|> Flow.run
end
@doc ~S"""
Count the number of items seen every 5 seconds.
Starts slowly as the max demand of 10 is only accumulated across `System.schedulers_online` stages.
This means it will need to emit 80 times before starting on an 8 core system.
"""
def flow_accumulate do
Chunky.stream
|> Flow.from_enumerable(window: Flow.Window.global |> Flow.Window.trigger_periodically(5, :second, :reset), max_demand: 10)
|> Flow.flat_map(fn(%Doc{items: items}) ->
Logger.debug("exploding document into #{length(items)} items in process: #{inspect(self)}")
items
end)
|> Flow.reduce(fn -> 0 end,
fn(_, acc) ->
Logger.debug("incrementing #{acc} in process: #{inspect(self)}")
acc + 1
end)
|> Flow.departition(fn -> 0 end,
fn(partition_state, acc) ->
sum = partition_state + acc
Logger.info("summing to #{sum} in process: #{inspect(self)}")
sum
end,
&(&1))
|> Flow.run
end
end
# Dbg.call(&Experimental.GenStage.Streamer.handle_demand/2)
Dbg.trace(:new, [:call])
Dbg.call(&Experimental.GenStage.Streamer.handle_demand/2)
Chunky.flow_quickly
# Chunky.flow_accumulate
# Chunky.stream([2, 3, 1])
# |> Stream.flat_map(fn(%{items: items}) ->
# items
# end)
# |> Enum.take(6)
# |> IO.inspect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment