Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Created November 26, 2024 00:27
Show Gist options
  • Select an option

  • Save monotykamary/e053a05122b89d5e9c88a8cc76442dd1 to your computer and use it in GitHub Desktop.

Select an option

Save monotykamary/e053a05122b89d5e9c88a8cc76442dd1 to your computer and use it in GitHub Desktop.
Exactly Once Processing
defmodule ExactlyOnce do
use GenServer
require Logger
# State will store processed event IDs and their status
defmodule State do
defstruct processed_events: %{},
processing_timeouts: %{},
retry_interval: 5_000,
processing_timeout: 30_000
end
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
# Initialize ETS table for durable storage of processed events
:ets.new(:processed_events, [:named_table, :set, :public])
{:ok, %State{}}
end
# Public API for processing events
def process_event(event) do
GenServer.cast(__MODULE__, {:process_event, event})
end
# Handle incoming events
def handle_cast({:process_event, event}, state) do
event_id = generate_idempotency_key(event)
case already_processed?(event_id) do
true ->
Logger.info("Event #{event_id} already processed, skipping")
{:noreply, state}
false ->
# Mark event as being processed
:ets.insert(:processed_events, {event_id, :processing})
# Start async processing
Task.start(fn ->
process_event_async(event, event_id)
end)
# Set processing timeout
Process.send_after(
self(),
{:processing_timeout, event_id},
state.processing_timeout
)
{:noreply, put_in(state.processing_timeouts[event_id], DateTime.utc_now())}
end
end
# Handle processing timeouts
def handle_info({:processing_timeout, event_id}, state) do
case Map.get(state.processing_timeouts, event_id) do
nil ->
{:noreply, state}
_started_at ->
# Retry processing if it timed out
Logger.warn("Processing timeout for event #{event_id}, retrying")
:ets.insert(:processed_events, {event_id, :retry})
# Schedule retry
Process.send_after(
self(),
{:retry_processing, event_id},
state.retry_interval
)
{:noreply, %{state | processing_timeouts: Map.delete(state.processing_timeouts, event_id)}}
end
end
# Private Functions
defp generate_idempotency_key(event) do
# Generate a unique key based on event properties
# This could be based on Kafka topic + partition + offset, or other unique identifiers
:crypto.hash(:sha256, :erlang.term_to_binary(event))
|> Base.encode16()
end
defp already_processed?(event_id) do
case :ets.lookup(:processed_events, event_id) do
[{^event_id, :completed}] -> true
_ -> false
end
end
defp process_event_async(event, event_id) do
# Create a Flow pipeline for processing
Flow.from_enumerable([event])
|> Flow.map(fn event ->
# Your actual event processing logic goes here
process_single_event(event)
end)
|> Flow.run()
# Mark event as completed
:ets.insert(:processed_events, {event_id, :completed})
# Clean up timeout tracking
GenServer.cast(__MODULE__, {:completed_processing, event_id})
rescue
error ->
Logger.error("Error processing event #{event_id}: #{inspect(error)}")
:ets.insert(:processed_events, {event_id, :failed})
GenServer.cast(__MODULE__, {:failed_processing, event_id})
end
defp process_single_event(event) do
# Implementation would depend on your specific use case
# This is where you'd put your actual event processing logic
Logger.info("Processing event: #{inspect(event)}")
:timer.sleep(1000) # Simulate some work
{:ok, event}
end
# Handle completion and failure callbacks
def handle_cast({:completed_processing, event_id}, state) do
{:noreply, %{state | processing_timeouts: Map.delete(state.processing_timeouts, event_id)}}
end
def handle_cast({:failed_processing, event_id}, state) do
# Schedule retry for failed events
Process.send_after(
self(),
{:retry_processing, event_id},
state.retry_interval
)
{:noreply, %{state | processing_timeouts: Map.delete(state.processing_timeouts, event_id)}}
end
end
# Start the processor
{:ok, pid} = ExactlyOnce.start_link()
# Process some events
ExactlyOnce.process_event(%{id: 1, data: "some data"})
ExactlyOnce.process_event(%{id: 2, data: "more data"})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment