Created
November 26, 2024 00:27
-
-
Save monotykamary/e053a05122b89d5e9c88a8cc76442dd1 to your computer and use it in GitHub Desktop.
Exactly Once Processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule ExactlyOnce do | |
| use GenServer | |
| require Logger | |
| # State will store processed event IDs and their status | |
| defmodule State do | |
| defstruct processed_events: %{}, | |
| processing_timeouts: %{}, | |
| retry_interval: 5_000, | |
| processing_timeout: 30_000 | |
| end | |
| def start_link(opts \\ []) do | |
| GenServer.start_link(__MODULE__, opts, name: __MODULE__) | |
| end | |
| def init(opts) do | |
| # Initialize ETS table for durable storage of processed events | |
| :ets.new(:processed_events, [:named_table, :set, :public]) | |
| {:ok, %State{}} | |
| end | |
| # Public API for processing events | |
| def process_event(event) do | |
| GenServer.cast(__MODULE__, {:process_event, event}) | |
| end | |
| # Handle incoming events | |
| def handle_cast({:process_event, event}, state) do | |
| event_id = generate_idempotency_key(event) | |
| case already_processed?(event_id) do | |
| true -> | |
| Logger.info("Event #{event_id} already processed, skipping") | |
| {:noreply, state} | |
| false -> | |
| # Mark event as being processed | |
| :ets.insert(:processed_events, {event_id, :processing}) | |
| # Start async processing | |
| Task.start(fn -> | |
| process_event_async(event, event_id) | |
| end) | |
| # Set processing timeout | |
| Process.send_after( | |
| self(), | |
| {:processing_timeout, event_id}, | |
| state.processing_timeout | |
| ) | |
| {:noreply, put_in(state.processing_timeouts[event_id], DateTime.utc_now())} | |
| end | |
| end | |
| # Handle processing timeouts | |
| def handle_info({:processing_timeout, event_id}, state) do | |
| case Map.get(state.processing_timeouts, event_id) do | |
| nil -> | |
| {:noreply, state} | |
| _started_at -> | |
| # Retry processing if it timed out | |
| Logger.warn("Processing timeout for event #{event_id}, retrying") | |
| :ets.insert(:processed_events, {event_id, :retry}) | |
| # Schedule retry | |
| Process.send_after( | |
| self(), | |
| {:retry_processing, event_id}, | |
| state.retry_interval | |
| ) | |
| {:noreply, %{state | processing_timeouts: Map.delete(state.processing_timeouts, event_id)}} | |
| end | |
| end | |
| # Private Functions | |
| defp generate_idempotency_key(event) do | |
| # Generate a unique key based on event properties | |
| # This could be based on Kafka topic + partition + offset, or other unique identifiers | |
| :crypto.hash(:sha256, :erlang.term_to_binary(event)) | |
| |> Base.encode16() | |
| end | |
| defp already_processed?(event_id) do | |
| case :ets.lookup(:processed_events, event_id) do | |
| [{^event_id, :completed}] -> true | |
| _ -> false | |
| end | |
| end | |
| defp process_event_async(event, event_id) do | |
| # Create a Flow pipeline for processing | |
| Flow.from_enumerable([event]) | |
| |> Flow.map(fn event -> | |
| # Your actual event processing logic goes here | |
| process_single_event(event) | |
| end) | |
| |> Flow.run() | |
| # Mark event as completed | |
| :ets.insert(:processed_events, {event_id, :completed}) | |
| # Clean up timeout tracking | |
| GenServer.cast(__MODULE__, {:completed_processing, event_id}) | |
| rescue | |
| error -> | |
| Logger.error("Error processing event #{event_id}: #{inspect(error)}") | |
| :ets.insert(:processed_events, {event_id, :failed}) | |
| GenServer.cast(__MODULE__, {:failed_processing, event_id}) | |
| end | |
| defp process_single_event(event) do | |
| # Implementation would depend on your specific use case | |
| # This is where you'd put your actual event processing logic | |
| Logger.info("Processing event: #{inspect(event)}") | |
| :timer.sleep(1000) # Simulate some work | |
| {:ok, event} | |
| end | |
| # Handle completion and failure callbacks | |
| def handle_cast({:completed_processing, event_id}, state) do | |
| {:noreply, %{state | processing_timeouts: Map.delete(state.processing_timeouts, event_id)}} | |
| end | |
| def handle_cast({:failed_processing, event_id}, state) do | |
| # Schedule retry for failed events | |
| Process.send_after( | |
| self(), | |
| {:retry_processing, event_id}, | |
| state.retry_interval | |
| ) | |
| {:noreply, %{state | processing_timeouts: Map.delete(state.processing_timeouts, event_id)}} | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Start the processor | |
| {:ok, pid} = ExactlyOnce.start_link() | |
| # Process some events | |
| ExactlyOnce.process_event(%{id: 1, data: "some data"}) | |
| ExactlyOnce.process_event(%{id: 2, data: "more data"}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment