Last active
December 13, 2016 17:58
-
-
Save lmarlow/c6d4ed14f55f3f7026aecb926e63e8ba to your computer and use it in GitHub Desktop.
Chunky flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alias Experimental.Flow | |
defmodule Chunky do | |
@moduledoc ~S""" | |
A chunky stream produces documents, each with a list of items in it. | |
The default settings will produce documents with a few thousand items | |
for a bit and then fall back to trickling in items every few seconds. | |
This simulates catching up to some external paged feed. | |
""" | |
require Logger | |
defstruct initial_chunks: [1000, 1000, 1000, 167], | |
max_chunk_size: 10, | |
chunk_probability: 0.2 | |
defmodule Doc do | |
defstruct collected_at: nil, items: [] | |
end | |
def emit(%Chunky{initial_chunks: []} = acc) do | |
Process.sleep(500) | |
chunk = gen_chunk(:rand.uniform, acc.chunk_probability, acc.max_chunk_size) | |
{[document(chunk)], acc} | |
end | |
def emit(%Chunky{initial_chunks: [0 | rest]} = acc) do | |
{[document([])], %{acc | initial_chunks: rest}} | |
end | |
def emit(%Chunky{initial_chunks: [size | rest]} = acc) do | |
{[document(Enum.to_list(1..size))], %{acc | initial_chunks: rest}} | |
end | |
defp document(chunk) do | |
Logger.info("Emitting #{length(chunk)} items in process: #{inspect(self)}") | |
%Doc{items: chunk, collected_at: DateTime.utc_now} | |
end | |
defp gen_chunk(rand, prob, _) when rand > prob, do: [] | |
defp gen_chunk(_, _, size) do | |
n = :rand.uniform(size) | |
List.duplicate(n, n) | |
end | |
def stream(acc \\ %Chunky{}) | |
def stream(init_chunks) when is_list(init_chunks), do: stream(%Chunky{initial_chunks: init_chunks}) | |
def stream(%Chunky{} = acc) do | |
Stream.resource(fn -> | |
Logger.debug("Initializing stream in process: #{inspect(self)}") | |
acc | |
end, | |
&Chunky.emit/1, fn(_) -> :ok end) | |
end | |
@doc ~S""" | |
Count the number of items seen every 5 seconds. | |
Starts quickly as the max demand of 10 is only accumulated across 1 stage. | |
""" | |
def flow_quickly do | |
Chunky.stream | |
|> Flow.from_enumerable(stages: 1, max_demand: 10) | |
|> Flow.flat_map(fn(%Doc{items: items}) -> | |
Logger.debug("exploding document into #{length(items)} items in process: #{inspect(self)}") | |
items | |
end) | |
|> Flow.partition(window: Flow.Window.global |> Flow.Window.trigger_periodically(5, :second, :reset)) | |
|> Flow.reduce(fn -> 0 end, | |
fn(_, acc) -> | |
Logger.debug("incrementing #{acc} in process: #{inspect(self)}") | |
acc + 1 | |
end) | |
|> Flow.departition(fn -> 0 end, | |
fn(partition_state, acc) -> | |
sum = partition_state + acc | |
Logger.info("summing to #{sum} in process: #{inspect(self)}") | |
sum | |
end, | |
&(&1)) | |
|> Flow.run | |
end | |
@doc ~S""" | |
Count the number of items seen every 5 seconds. | |
Starts slowly as the max demand of 10 is only accumulated across `System.schedulers_online` stages. | |
This means it will need to emit 80 times before starting on an 8 core system. | |
""" | |
def flow_accumulate do | |
Chunky.stream | |
|> Flow.from_enumerable(window: Flow.Window.global |> Flow.Window.trigger_periodically(5, :second, :reset), max_demand: 10) | |
|> Flow.flat_map(fn(%Doc{items: items}) -> | |
Logger.debug("exploding document into #{length(items)} items in process: #{inspect(self)}") | |
items | |
end) | |
|> Flow.reduce(fn -> 0 end, | |
fn(_, acc) -> | |
Logger.debug("incrementing #{acc} in process: #{inspect(self)}") | |
acc + 1 | |
end) | |
|> Flow.departition(fn -> 0 end, | |
fn(partition_state, acc) -> | |
sum = partition_state + acc | |
Logger.info("summing to #{sum} in process: #{inspect(self)}") | |
sum | |
end, | |
&(&1)) | |
|> Flow.run | |
end | |
end | |
# Dbg.call(&Experimental.GenStage.Streamer.handle_demand/2) | |
Dbg.trace(:new, [:call]) | |
Dbg.call(&Experimental.GenStage.Streamer.handle_demand/2) | |
Chunky.flow_quickly | |
# Chunky.flow_accumulate | |
# Chunky.stream([2, 3, 1]) | |
# |> Stream.flat_map(fn(%{items: items}) -> | |
# items | |
# end) | |
# |> Enum.take(6) | |
# |> IO.inspect | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment