-
-
Save w0rd-driven/511d19b2557c2185af782656d2103662 to your computer and use it in GitHub Desktop.
A small demo to show how you might, given a stream, do a "fan out", processing different elements in separate streams. Powered by simple primitives like `Stream.resource` and `spawn_link`. Open in Livebook: https://livebook.dev/run?url=https%3A%2F%2Fgist.github.com%2Fzachdaniel%2Fd5ab06a9d2362fceeb6d27c37b206e28
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!-- livebook:{"persist_outputs":true} --> | |
# Distribute | |
## Section | |
A small toy to show how you might, given a stream, do a "fan out", processing different elements in separate streams. Powered by simple primitives like `Stream.resource` and `spawn_link`. | |
```elixir | |
defmodule Distribute do | |
@moduledoc """ | |
Documentation for `Distribute`. | |
""" | |
def distribute(enum, key_fun, stream_funs, opts \\ []) do | |
stream_ref = make_ref() | |
collect? = Keyword.get(opts, :collect?, true) | |
Stream.transform( | |
enum, | |
fn -> %{} end, | |
fn i, pids -> | |
stream_id = key_fun.(i) | |
case Map.fetch(pids, stream_id) do | |
{:ok, {pid, _ref}} -> | |
send(pid, {:stream_distribute_push, stream_ref, i}) | |
{[], pids} | |
:error -> | |
worker_ref = make_ref() | |
pid = | |
distributed_worker( | |
self(), | |
worker_ref, | |
stream_ref, | |
stream_funs[stream_id] || (& &1), | |
collect? | |
) | |
send(pid, {:stream_distribute_push, stream_ref, i}) | |
{[], Map.put(pids, stream_id, {pid, worker_ref})} | |
end | |
end, | |
fn pids -> | |
if collect? do | |
{pids | |
|> Stream.flat_map(fn {_, {pid, worker_ref}} -> | |
send(pid, {:stream_distribute_stop, stream_ref}) | |
Stream.resource( | |
fn -> nil end, | |
fn | |
:stop -> | |
{:halt, nil} | |
nil -> | |
receive do | |
{:stream_distribute_pull, ^worker_ref, ^stream_ref, :item, item} -> | |
{[item], nil} | |
{:stream_distribute_pull, ^worker_ref, ^stream_ref, :done} -> | |
{receive_all_pulls(worker_ref, stream_ref), :stop} | |
after | |
0 -> | |
{[], nil} | |
end | |
end, | |
fn nil -> :ok end | |
) | |
end), nil} | |
else | |
{[], nil} | |
end | |
end, | |
fn _ -> :ok end | |
) | |
end | |
defp distributed_worker(parent, worker_ref, stream_ref, stream_fun, collect?) do | |
spawn_link(fn -> | |
stream = | |
Stream.resource( | |
fn -> nil end, | |
fn | |
:stop -> | |
{:halt, nil} | |
nil -> | |
receive do | |
{:stream_distribute_push, ^stream_ref, item} -> | |
{[item], nil} | |
{:stream_distribute_stop, ^stream_ref} -> | |
{receive_all_pushes(stream_ref), :stop} | |
after | |
0 -> | |
{[], nil} | |
end | |
end, | |
fn nil -> :ok end | |
) | |
stream = stream_fun.(stream) | |
if collect? do | |
Enum.each( | |
stream, | |
&send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :item, &1}) | |
) | |
send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :done}) | |
[] | |
else | |
Stream.run(stream) | |
send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :done}) | |
end | |
end) | |
end | |
defp receive_all_pulls(worker_ref, stream_ref, acc \\ []) do | |
receive do | |
{:stream_distribute_pull, ^worker_ref, ^stream_ref, :item, item} -> | |
[item | acc] | |
after | |
0 -> acc | |
end | |
end | |
defp receive_all_pushes(stream_ref, acc \\ []) do | |
receive do | |
{:stream_distribute_push, ^stream_ref, item} -> | |
[item | acc] | |
after | |
0 -> acc | |
end | |
end | |
end | |
``` | |
<!-- livebook:{"output":true} --> | |
``` | |
{:module, Distribute, <<70, 79, 82, 49, 0, 0, 26, ...>>, {:receive_all_pushes, 2}} | |
``` | |
```elixir | |
processed = | |
[{:a, 1}, {:a, 2}, {:b, 1}, {:b, 2}] | |
|> Distribute.distribute(&elem(&1, 0), %{ | |
a: fn stream -> Stream.map(stream, &{:a, elem(&1, 1), self()}) end, | |
b: fn stream -> Stream.map(stream, &{:b, elem(&1, 1), self()}) end | |
}) | |
|> Enum.to_list() | |
|> IO.inspect() | |
{as, bs} = Enum.split_with(processed, &(elem(&1, 0) == :a)) | |
a_pids = as |> Enum.map(&elem(&1, 2)) |> Enum.uniq() |> IO.inspect() | |
b_pids = bs |> Enum.map(&elem(&1, 2)) |> Enum.uniq() |> IO.inspect() | |
a_pids != b_pids | |
``` | |
<!-- livebook:{"output":true} --> | |
``` | |
[ | |
{:a, 1, #PID<0.201.0>}, | |
{:a, 2, #PID<0.201.0>}, | |
{:b, 1, #PID<0.202.0>}, | |
{:b, 2, #PID<0.202.0>} | |
] | |
[#PID<0.201.0>] | |
[#PID<0.202.0>] | |
``` | |
<!-- livebook:{"output":true} --> | |
``` | |
true | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment