Skip to content

Instantly share code, notes, and snippets.

@wrgoldstein
Created March 18, 2022 20:55
Show Gist options
  • Select an option

  • Save wrgoldstein/d1a960164dc59a22590a56be27af2b7c to your computer and use it in GitHub Desktop.

Select an option

Save wrgoldstein/d1a960164dc59a22590a56be27af2b7c to your computer and use it in GitHub Desktop.
# imagine each of these is of the form {uuid, timestamp}
# can definitely be refactored but wanted to learn about `Flow.emit_and_reduce/3`
data = [
{:a, 1},
{:a, 2},
{:b, 2},
{:a, 5},
{:a, 11},
{:a, 12},
{:c, 3}
]
window = Flow.Window.fixed(5, :second, fn {_k, t} -> t end)
data
|> Flow.from_enumerable()
|> Flow.partition(key: fn e -> elem(e, 0) end, window: window)
|> Flow.emit_and_reduce(&Map.new/0, fn event, acc -> # group sessions!
{key, time} = event
if Map.has_key?(acc, key) do
# have it, either emit a new session or merge
old = Map.get(acc, key)
if time - old.max_t > 5 do
# emit a new session and record the current event as a new session
new = %{key: key, events: 1, min_t: time, max_t: time}
{[%{key => old}], Map.put(acc, key, new)}
else
# emit nothing and merge this into our existing session
new = Map.merge(old, %{events: old.events + 1, max_t: time})
{[], Map.put(acc, key, new)}
end
else
# Never seen it before, emit nothing and add to accumulator
new = %{key: key, events: 1, min_t: time, max_t: time}
{[], Map.put(acc, key, new)}
end
end)
|>Flow.on_trigger(fn map ->
# session logic!
IO.inspect(map, label: "What is it?")
{[map], %{}}
end)
|> Enum.to_list
|> IO.inspect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment