Created
August 18, 2017 23:40
-
-
Save ankopainting/cf7898bca393f314dfdd74df73b53faf to your computer and use it in GitHub Desktop.
Example of buffering groups of 400 messages before flushing in a genstage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This will handle the acks back to rabbitmq | |
defmodule RabbitAckker do | |
use GenStage | |
require Logger | |
def start_link() do | |
GenStage.start_link(__MODULE__, :ok) | |
end | |
def init(:ok) do | |
Process.send_after(self(), {:flush}, 500) # first message is fairly soon after startup | |
{:producer_consumer, []} | |
end | |
# handle events coming from rabbit | |
def handle_events(events, _from, state) do | |
event_buffer = state | |
new_event_buffer = event_buffer ++ events | |
messages_before_purge = 400 | |
if (Enum.count(new_event_buffer) >= messages_before_purge) do | |
for {event, meta} <- new_event_buffer do | |
:ok = ack(meta.channel, meta.delivery_tag) | |
end | |
# pass all events down to the next stage | |
{:noreply, new_event_buffer, []} | |
else | |
# don't pass any events down, just store them in the state | |
{:noreply, [], new_event_buffer} | |
end | |
end | |
# when this flush message comes, ack the events and pass them down. | |
# TODO: only flush when we haven't acked anything in flush_time ms | |
def handle_info({:flush}, state) do | |
flush_time = 2_000 # ms | |
event_buffer = state | |
event_count = Enum.count(state) | |
if event_count > 0 do | |
Logger.debug("ackker: timer flush called for #{event_count} events") | |
end | |
# ack all the events we will flush | |
for {event, meta} <- event_buffer do | |
:ok = ack(meta.channel, meta.delivery_tag) | |
end | |
# call this again in another flush_time ms | |
Process.send_after(self(), {:flush}, flush_time) | |
# move event_buffer to events, and reset the state | |
{:noreply, event_buffer, []} | |
end | |
defp ack(channel, delivery_tag) do | |
try do | |
Wabbit.Basic.ack(channel, delivery_tag) | |
catch | |
_, _ -> | |
:ok | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment