Skip to content

Instantly share code, notes, and snippets.

@emerleite
Last active November 23, 2017 21:35
Show Gist options
  • Select an option

  • Save emerleite/8b2537fa2f9cd8990a3907cfa6faece8 to your computer and use it in GitHub Desktop.

Select an option

Save emerleite/8b2537fa2f9cd8990a3907cfa6faece8 to your computer and use it in GitHub Desktop.
defmodule UpaEventSourcing.VideoWatchProgress.EventStore do
use GenStage
@event_processing_timeout 11 #Max seconds to process a track video watch progress event
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:producer, {:queue.new, 0, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def enqueue(event) do
GenStage.cast(__MODULE__, {:enqueue, event})
end
def handle_cast({:enqueue, event}, {queue, demand, queue_size}) do
dispatch_events(:queue.in(event, queue), demand, [], queue_size+1)
end
def handle_demand(incoming_demand, {queue, demand, queue_size}) do
dispatch_events(queue, incoming_demand + demand, [], queue_size)
end
defp dispatch_events(queue, demand, events, queue_size) do
UpaMetrics.count("gen_stage", "queue_size") #Metrics App inside Umbrella
with d when d > 0 <- demand,
{item, queue} = :queue.out(queue),
{:value, event} <- item do
case check_event(event) do
:expired ->
UpaMetrics.count("gen_stage", "discarded") #Metrics App inside Umbrella
dispatch_events(queue, demand, events, queue_size-1)
:valid ->
UpaMetrics.count("gen_stage", "processed") #Metrics App inside Umbrella
dispatch_events(queue, demand - 1, [event | events], queue_size-1)
end
else
_ -> {:noreply, Enum.reverse(events), {queue, demand, queue_size}}
end
end
defp check_event(event) do
if :os.system_time(:millisecond) > (event.last_event_timestamp + :timer.seconds(@event_processing_timeout)) do
:expired
else
:valid
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment