Skip to content

Instantly share code, notes, and snippets.

@ankopainting
Created August 18, 2017 23:40
Show Gist options
  • Save ankopainting/cf7898bca393f314dfdd74df73b53faf to your computer and use it in GitHub Desktop.
Save ankopainting/cf7898bca393f314dfdd74df73b53faf to your computer and use it in GitHub Desktop.
Example of buffering groups of 400 messages before flushing in a genstage
# This will handle the acks back to rabbitmq
defmodule RabbitAckker do
use GenStage
require Logger
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
Process.send_after(self(), {:flush}, 500) # first message is fairly soon after startup
{:producer_consumer, []}
end
# handle events coming from rabbit
def handle_events(events, _from, state) do
event_buffer = state
new_event_buffer = event_buffer ++ events
messages_before_purge = 400
if (Enum.count(new_event_buffer) >= messages_before_purge) do
for {event, meta} <- new_event_buffer do
:ok = ack(meta.channel, meta.delivery_tag)
end
# pass all events down to the next stage
{:noreply, new_event_buffer, []}
else
# don't pass any events down, just store them in the state
{:noreply, [], new_event_buffer}
end
end
# when this flush message comes, ack the events and pass them down.
# TODO: only flush when we haven't acked anything in flush_time ms
def handle_info({:flush}, state) do
flush_time = 2_000 # ms
event_buffer = state
event_count = Enum.count(state)
if event_count > 0 do
Logger.debug("ackker: timer flush called for #{event_count} events")
end
# ack all the events we will flush
for {event, meta} <- event_buffer do
:ok = ack(meta.channel, meta.delivery_tag)
end
# call this again in another flush_time ms
Process.send_after(self(), {:flush}, flush_time)
# move event_buffer to events, and reset the state
{:noreply, event_buffer, []}
end
defp ack(channel, delivery_tag) do
try do
Wabbit.Basic.ack(channel, delivery_tag)
catch
_, _ ->
:ok
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment