Created
July 24, 2025 15:42
-
-
Save monotykamary/ab43e3af7f34c28b165db9a54dea21e3 to your computer and use it in GitHub Desktop.
Event Store - Disk-based Event Storage System
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule EventStore do | |
| @moduledoc """ | |
| - Resilient disk_log handling with automatic repair | |
| - Batch writing for better performance | |
| - Automatic log file maintenance | |
| - Monitoring and metrics | |
| - Quick recovery after crashes | |
| """ | |
| use GenServer | |
| require Logger | |
| @default_batch_size 1000 | |
| @default_batch_timeout 1000 | |
| # 1MB | |
| @default_max_chunk_size 1024 * 1024 | |
| @default_max_files 4 | |
| @cleanup_interval :timer.hours(1) | |
| defmodule State do | |
| defstruct [ | |
| :topic, | |
| :log_name, | |
| :base_dir, | |
| :batch_size, | |
| :batch_timeout, | |
| :batch_buffer, | |
| :batch_timer, | |
| :last_cleanup, | |
| :metrics, | |
| # Track the last assigned position | |
| :last_position | |
| ] | |
| end | |
| defmodule Event do | |
| @type t :: %__MODULE__{ | |
| position: non_neg_integer(), | |
| timestamp: non_neg_integer(), | |
| data: term() | |
| } | |
| defstruct [:position, :timestamp, :data] | |
| end | |
| defmodule Metrics do | |
| defstruct writes: 0, | |
| reads: 0, | |
| batches: 0, | |
| errors: 0, | |
| repairs: 0, | |
| last_write_time: nil, | |
| last_read_time: nil | |
| end | |
| # Public API | |
| def start_link(topic, opts \\ []) do | |
| GenServer.start_link(__MODULE__, [topic, opts], name: via_tuple(topic)) | |
| end | |
| def write_event(topic, event) do | |
| GenServer.cast(via_tuple(topic), {:write, event}) | |
| end | |
| def write_events(topic, events) when is_list(events) do | |
| GenServer.cast(via_tuple(topic), {:write_batch, events}) | |
| end | |
| def read_events(topic, opts \\ []) do | |
| GenServer.call(via_tuple(topic), {:read, opts}) | |
| end | |
| def get_metrics(topic) do | |
| GenServer.call(via_tuple(topic), :get_metrics) | |
| end | |
| def get_latest_event(topic) do | |
| GenServer.call(via_tuple(topic), :get_latest_event) | |
| end | |
| def read_last_events(topic, n) when is_integer(n) and n > 0 do | |
| GenServer.call(via_tuple(topic), {:read_last, n}) | |
| end | |
| def force_flush(topic) do | |
| GenServer.call(via_tuple(topic), :flush) | |
| end | |
| # Server Implementation | |
| @impl true | |
| def init([topic, opts]) do | |
| Process.flag(:trap_exit, true) | |
| base_dir = Keyword.get(opts, :base_dir, "data/#{topic}") | |
| batch_size = Keyword.get(opts, :batch_size, @default_batch_size) | |
| batch_timeout = Keyword.get(opts, :batch_timeout, @default_batch_timeout) | |
| File.mkdir_p!(base_dir) | |
| log_name = String.to_atom("#{topic}_log") | |
| log_file = Path.join(base_dir, "#{topic}.log") | |
| # Initialize disk_log with repair capability | |
| case initialize_disk_log(log_name, log_file) do | |
| {:ok, _} -> | |
| # Find the highest position efficiently by reading the last chunk | |
| last_position = | |
| case get_last_position(log_name) do | |
| {:ok, pos} -> pos | |
| _ -> -1 | |
| end | |
| state = %State{ | |
| topic: topic, | |
| log_name: log_name, | |
| base_dir: base_dir, | |
| batch_size: batch_size, | |
| batch_timeout: batch_timeout, | |
| batch_buffer: [], | |
| metrics: %Metrics{}, | |
| last_cleanup: System.system_time(:second), | |
| last_position: last_position | |
| } | |
| schedule_cleanup() | |
| schedule_batch_timeout(state) | |
| {:ok, state} | |
| {:error, reason} -> | |
| Logger.error("Failed to initialize disk_log: #{inspect(reason)}") | |
| {:stop, reason} | |
| end | |
| end | |
| @impl true | |
| def handle_cast({:write, event}, state) do | |
| {wrapped_event, new_state} = wrap_event(event, state) | |
| new_buffer = new_state.batch_buffer ++ [wrapped_event] | |
| if length(new_buffer) >= new_state.batch_size do | |
| case write_batch(new_buffer, new_state) do | |
| {:ok, newer_state} -> | |
| {:noreply, newer_state} | |
| {:error, reason, newer_state} -> | |
| Logger.error("Failed to write batch: #{inspect(reason)}") | |
| {:noreply, newer_state} | |
| end | |
| else | |
| {:noreply, %{new_state | batch_buffer: new_buffer}} | |
| end | |
| end | |
| @impl true | |
| def handle_cast({:write_batch, events}, state) do | |
| {wrapped_events, final_state} = | |
| Enum.reduce(events, {[], state}, fn event, {acc_events, acc_state} -> | |
| {wrapped, new_state} = wrap_event(event, acc_state) | |
| {acc_events ++ [wrapped], new_state} | |
| end) | |
| case write_batch(wrapped_events, final_state) do | |
| {:ok, new_state} -> | |
| {:noreply, new_state} | |
| {:error, reason, new_state} -> | |
| Logger.error("Failed to write batch: #{inspect(reason)}") | |
| {:noreply, new_state} | |
| end | |
| end | |
| @impl true | |
| def handle_call({:read, opts}, _from, state) do | |
| start_time = System.monotonic_time() | |
| case read_all_events(state.log_name, opts) do | |
| {:ok, events} -> | |
| metrics = update_read_metrics(state.metrics, length(events), start_time) | |
| {:reply, {:ok, events}, %{state | metrics: metrics}} | |
| {:error, reason} -> | |
| metrics = %{state.metrics | errors: state.metrics.errors + 1} | |
| {:reply, {:error, reason}, %{state | metrics: metrics}} | |
| end | |
| end | |
| @impl true | |
| def handle_call({:read_last, n}, _from, state) do | |
| start_time = System.monotonic_time() | |
| case read_last_n_events(state.log_name, n) do | |
| {:ok, events} -> | |
| metrics = update_read_metrics(state.metrics, length(events), start_time) | |
| {:reply, {:ok, events}, %{state | metrics: metrics}} | |
| {:error, reason} -> | |
| metrics = %{state.metrics | errors: state.metrics.errors + 1} | |
| {:reply, {:error, reason}, %{state | metrics: metrics}} | |
| end | |
| end | |
| @impl true | |
| def handle_call(:get_metrics, _from, state) do | |
| {:reply, state.metrics, state} | |
| end | |
| @impl true | |
| def handle_call(:get_latest_event, _from, state) do | |
| with {:ok, pos} <- get_position(state.last_position), | |
| {:ok, events} <- read_all_events(state.log_name, from_position: pos, chunk_size: 1), | |
| {:ok, event} <- get_single_event(events) do | |
| {:reply, {:ok, event}, state} | |
| else | |
| {:error, reason} -> {:reply, {:error, reason}, state} | |
| :no_event -> {:reply, {:ok, nil}, state} | |
| end | |
| end | |
| @impl true | |
| def handle_call(:flush, _from, state) do | |
| case write_batch(state.batch_buffer, state) do | |
| {:ok, new_state} -> {:reply, :ok, new_state} | |
| {:error, reason, new_state} -> {:reply, {:error, reason}, new_state} | |
| end | |
| end | |
| @impl true | |
| def handle_info(:batch_timeout, %{batch_buffer: []} = state) do | |
| schedule_batch_timeout(state) | |
| {:noreply, state} | |
| end | |
| @impl true | |
| def handle_info(:batch_timeout, %{batch_buffer: buffer, batch_timeout: timeout} = state) do | |
| schedule_batch_timeout(state) | |
| with {:ok, new_state} <- write_batch(buffer, state), | |
| :ok <- :disk_log.sync(state.log_name), | |
| :ok <- Process.sleep(div(timeout, 10)), | |
| :ok <- :disk_log.sync(state.log_name) do | |
| {:noreply, new_state} | |
| else | |
| {:error, reason, new_state} -> | |
| Logger.error("Failed to write batch on timeout: #{inspect(reason)}") | |
| {:noreply, new_state} | |
| {:error, reason} -> | |
| Logger.error("Failed to sync disk log: #{inspect(reason)}") | |
| {:noreply, state} | |
| end | |
| end | |
| @impl true | |
| def handle_info(:cleanup, state) do | |
| schedule_cleanup() | |
| {:noreply, perform_cleanup(state)} | |
| end | |
| @impl true | |
| def terminate(_reason, %{batch_buffer: buffer, log_name: log_name} = state) do | |
| if length(buffer) > 0 do | |
| case write_batch(buffer, state) do | |
| {:ok, _} -> | |
| :ok | |
| {:error, reason, _} -> | |
| Logger.error("Failed to flush events on termination: #{inspect(reason)}") | |
| end | |
| end | |
| if log_name do | |
| with :ok <- :disk_log.sync(log_name), | |
| :ok <- :disk_log.close(log_name) do | |
| :ok | |
| else | |
| {:error, reason} -> | |
| Logger.error("Failed to sync/close disk log: #{inspect(reason)}") | |
| :ok | |
| end | |
| end | |
| :ok | |
| end | |
| # Private Functions | |
| defp via_tuple(topic) do | |
| {:via, Registry, {EventStore.Registry, {__MODULE__, topic}}} | |
| end | |
| defp initialize_disk_log(log_name, log_file) do | |
| case :disk_log.open( | |
| name: log_name, | |
| file: String.to_charlist(log_file), | |
| type: :wrap, | |
| size: {@default_max_chunk_size, @default_max_files}, | |
| mode: :read_write, | |
| repair: true | |
| ) do | |
| {:ok, _} = result -> | |
| result | |
| {:repaired, log_name, {:recovered, _}, {:badbytes, _}} -> | |
| Logger.warning("Repaired corrupted log file") | |
| {:ok, log_name} | |
| {:error, :no_such_log} -> | |
| initialize_disk_log(log_name, log_file) | |
| {:error, reason} -> | |
| Logger.error("Failed to open disk_log: #{inspect(reason)}") | |
| {:error, reason} | |
| end | |
| end | |
| defp write_batch(events, state) when is_list(events) and events != [] do | |
| start_time = System.monotonic_time() | |
| with :ok <- :disk_log.log_terms(state.log_name, events), | |
| :ok <- :disk_log.sync(state.log_name), | |
| :ok <- Process.sleep(div(state.batch_timeout, 20)), | |
| :ok <- :disk_log.sync(state.log_name) do | |
| metrics = update_write_metrics(state.metrics, length(events), start_time) | |
| {:ok, %{state | batch_buffer: [], metrics: metrics}} | |
| else | |
| {:error, reason} -> | |
| metrics = %{state.metrics | errors: state.metrics.errors + 1} | |
| Logger.error("Failed to sync disk log: #{inspect(reason)}") | |
| {:error, reason, %{state | metrics: metrics}} | |
| end | |
| end | |
| defp write_batch([], state), do: {:ok, state} | |
| defp wrap_event(event, state) do | |
| next_position = (state.last_position || -1) + 1 | |
| timestamp = | |
| case event do | |
| %{timestamp: ts} -> ts | |
| _ -> System.system_time(:second) | |
| end | |
| data = | |
| case event do | |
| %Event{} -> event.data | |
| %{data: data} -> data | |
| _ -> %{value: event} | |
| end | |
| {%Event{ | |
| position: next_position, | |
| timestamp: timestamp, | |
| data: data | |
| }, %{state | last_position: next_position}} | |
| end | |
| defp read_all_events(log_name, opts) do | |
| try do | |
| chunk_size = Keyword.get(opts, :chunk_size, 100) | |
| # Use position instead of offset | |
| from_position = Keyword.get(opts, :from_position, 0) | |
| timestamp = Keyword.get(opts, :timestamp) | |
| case read_chunk(log_name, :start, [], timestamp, :forward) do | |
| {:ok, events} -> | |
| all_events = | |
| events | |
| |> process_chunks() | |
| # Ensure chronological order (oldest first) | |
| |> Enum.reverse() | |
| filtered_events = | |
| all_events | |
| |> filter_events(timestamp, from_position, chunk_size) | |
| {:ok, filtered_events} | |
| error -> | |
| error | |
| end | |
| catch | |
| :exit, reason -> | |
| Logger.error("Error reading events: #{inspect(reason)}") | |
| {:error, reason} | |
| end | |
| end | |
| defp read_last_n_events(log_name, n) do | |
| try do | |
| # Start from the end of the log | |
| case read_backwards(log_name, n) do | |
| {:ok, events} -> {:ok, events} | |
| error -> error | |
| end | |
| catch | |
| :exit, reason -> | |
| Logger.error("Error reading events: #{inspect(reason)}") | |
| {:error, reason} | |
| end | |
| end | |
| # Efficiently get the last position by reading only the last chunk | |
| defp get_last_position(log_name) do | |
| with info when is_list(info) <- :disk_log.info(log_name), | |
| current_file = | |
| Enum.find_value(info, 0, fn | |
| {:no_current_file, num} -> num | |
| _ -> false | |
| end), | |
| {:ok, chunk_data} when is_list(chunk_data) and length(chunk_data) > 0 <- | |
| read_last_chunk(log_name, current_file), | |
| last_event = List.last(chunk_data) do | |
| {:ok, last_event.position} | |
| else | |
| {:error, reason} -> {:error, reason} | |
| _ -> {:ok, -1} | |
| end | |
| end | |
| # Read the last chunk from the specified file | |
| defp read_last_chunk(log_name, file_num) when file_num >= 0 do | |
| try do | |
| case :disk_log.chunk(log_name, :start) do | |
| :eof -> | |
| {:ok, []} | |
| {:error, reason} -> | |
| {:error, reason} | |
| {_cont, terms} -> | |
| {:ok, List.wrap(terms)} | |
| {_cont, _chunk_num, terms} -> | |
| {:ok, List.wrap(terms)} | |
| end | |
| catch | |
| _kind, _reason -> | |
| {:ok, []} | |
| end | |
| end | |
| defp read_last_chunk(_log_name, _file_num), do: {:ok, []} | |
| defp read_backwards(log_name, n) do | |
| try do | |
| with {:ok, last_pos} <- get_last_position(log_name), | |
| false <- last_pos == -1, | |
| start_pos = max(0, last_pos - n + 1), | |
| {:ok, events} <- read_all_events(log_name, from_position: start_pos) do | |
| {:ok, Enum.take(events, -n)} | |
| else | |
| # handles last_pos == -1 | |
| true -> {:ok, []} | |
| # handles other errors | |
| error -> error | |
| end | |
| catch | |
| _kind, _reason -> | |
| {:ok, []} | |
| end | |
| end | |
| defp filter_events(events, timestamp, from_position, chunk_size) do | |
| events | |
| |> filter_by_timestamp(timestamp) | |
| |> filter_by_position(from_position) | |
| |> Enum.take(chunk_size) | |
| end | |
| defp filter_by_timestamp(events, nil), do: events | |
| defp filter_by_timestamp(events, timestamp) when is_integer(timestamp) do | |
| Enum.filter(events, fn %Event{timestamp: ts} -> ts >= timestamp end) | |
| end | |
| defp filter_by_position(events, 0), do: events | |
| defp filter_by_position(events, from_position) when is_integer(from_position) do | |
| Enum.filter(events, fn %Event{position: pos} -> pos >= from_position end) | |
| end | |
| # Handle the end of chunk reading by checking the continuation value | |
| defp maybe_append_chunk(terms, acc) when is_list(terms), do: [terms | acc] | |
| defp maybe_append_chunk(_, acc), do: acc | |
| defp read_chunk(log_name, continuation, acc, timestamp, direction) do | |
| case :disk_log.chunk(log_name, continuation) do | |
| :eof -> | |
| {:ok, process_chunks(acc)} | |
| {:error, :no_such_log} -> | |
| {:ok, []} | |
| {:error, reason} -> | |
| {:error, reason} | |
| {cont, terms} -> | |
| read_chunk(log_name, cont, maybe_append_chunk(terms, acc), timestamp, direction) | |
| {cont, _chunk_num, terms} -> | |
| read_chunk(log_name, cont, maybe_append_chunk(terms, acc), timestamp, direction) | |
| end | |
| end | |
| defp process_chunks(chunks) do | |
| chunks | |
| # Reverse the chunks to get them in order | |
| |> Enum.reverse() | |
| # Flatten into a single list | |
| |> List.flatten() | |
| end | |
| defp schedule_batch_timeout(%{batch_timeout: timeout} = _state) do | |
| Process.send_after(self(), :batch_timeout, timeout) | |
| end | |
| defp schedule_cleanup do | |
| Process.send_after(self(), :cleanup, @cleanup_interval) | |
| end | |
| defp perform_cleanup(%{log_name: log_name} = state) do | |
| with :ok <- :disk_log.sync(log_name), | |
| {:ok, info} when is_list(info) <- {:ok, :disk_log.info(log_name)}, | |
| bytes <- | |
| Enum.find_value(info, 0, fn | |
| {:no_current_bytes, size} -> size | |
| _ -> false | |
| end), | |
| :ok <- maybe_truncate(log_name, bytes) do | |
| %{state | last_cleanup: System.system_time(:second)} | |
| else | |
| {:error, reason} -> | |
| Logger.error("Failed to sync/cleanup log: #{inspect(reason)}") | |
| %{state | last_cleanup: System.system_time(:second)} | |
| error -> | |
| Logger.error("Error during cleanup: #{inspect(error)}") | |
| %{state | last_cleanup: System.system_time(:second)} | |
| end | |
| end | |
| defp maybe_truncate(log_name, bytes) | |
| when bytes > @default_max_chunk_size * @default_max_files do | |
| Logger.info("Truncating log file due to size: #{bytes} bytes") | |
| case :disk_log.truncate(log_name) do | |
| :ok -> | |
| :ok | |
| {:error, reason} -> | |
| Logger.error("Failed to truncate log: #{inspect(reason)}") | |
| {:error, reason} | |
| end | |
| end | |
| defp maybe_truncate(_log_name, _bytes), do: :ok | |
| defp update_write_metrics(metrics, count, start_time) do | |
| %{ | |
| metrics | |
| | writes: metrics.writes + count, | |
| batches: metrics.batches + 1, | |
| last_write_time: System.monotonic_time() - start_time | |
| } | |
| end | |
| defp update_read_metrics(metrics, count, start_time) do | |
| %{ | |
| metrics | |
| | reads: metrics.reads + count, | |
| last_read_time: System.monotonic_time() - start_time | |
| } | |
| end | |
| defp get_position(nil), do: :no_event | |
| defp get_position(-1), do: :no_event | |
| defp get_position(pos), do: {:ok, pos} | |
| defp get_single_event([event]), do: {:ok, event} | |
| defp get_single_event([]), do: :no_event | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule EventStoreBenchmark do | |
| use ExUnit.Case | |
| require Logger | |
| @tag timeout: :infinity | |
| @test_topic "benchmark_topic" | |
| @message_count 10_000 | |
| @cell_width 16 # Increased from 11 to accommodate percentages | |
| @inner_padding 3 # Padding inside cells for better readability | |
| # Configure logger to hide account manager logs | |
| Logger.configure_backend(:console, level: :error) | |
| setup do | |
| test_dir = "test/tmp/#{@test_topic}_#{System.unique_integer()}" | |
| File.mkdir_p!(test_dir) | |
| # Ensure test directory is clean | |
| File.rm_rf!(test_dir) | |
| File.mkdir_p!(test_dir) | |
| # Start the Registry if not already started | |
| Registry.start_link(keys: :unique, name: EventStore.Registry, partitions: 1) | |
| on_exit(fn -> | |
| try do | |
| File.rm_rf!(test_dir) | |
| catch | |
| _kind, _reason -> :ok | |
| end | |
| end) | |
| %{test_dir: test_dir} | |
| end | |
| def generate_message(size) do | |
| # Generate a random string of specified size | |
| 1..size | |
| |> Enum.map(fn _ -> Enum.random(?a..?z) end) | |
| |> List.to_string() | |
| end | |
| def format_bytes(bytes) when bytes >= 1024 * 1024 do | |
| mb = bytes / (1024 * 1024) | |
| "#{Float.round(mb, 2)} MB (#{bytes} bytes)" | |
| end | |
| def format_bytes(bytes) when bytes >= 1024 do | |
| kb = bytes / 1024 | |
| "#{Float.round(kb, 2)} KB (#{bytes} bytes)" | |
| end | |
| def format_bytes(bytes), do: "#{bytes} bytes" | |
| def analyze_memory_allocation do | |
| IO.puts("\n--- Memory Carrier Configuration ---") | |
| # Get allocator settings | |
| alloc_info = :erlang.system_info(:allocator) | |
| # Get temp_alloc settings | |
| temp_alloc = Enum.find(elem(alloc_info, 3), fn {type, _} -> type == :temp_alloc end) | |
| {_, settings} = temp_alloc | |
| # Extract and format carrier configurations | |
| mmbcs = format_bytes(settings[:mmbcs]) | |
| lmbcs = format_bytes(settings[:lmbcs]) | |
| sbct = format_bytes(settings[:sbct]) | |
| smbcs = format_bytes(settings[:smbcs]) | |
| # Print the carrier configurations | |
| IO.puts("mmbcs: #{mmbcs} # Minimum multi-block carrier size") | |
| IO.puts("lmbcs: #{lmbcs} # Maximum multi-block carrier size") | |
| IO.puts("sbct: #{sbct} # Single block carrier threshold") | |
| IO.puts("smbcs: #{smbcs} # Small multi-block carrier size") | |
| end | |
| def measure_write_performance(test_dir, message_size) do | |
| {:ok, pid} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| # Generate messages | |
| messages = for _ <- 1..@message_count do | |
| %{data: %{value: generate_message(message_size)}} | |
| end | |
| # Measure batch write time with natural timeouts | |
| {batch_write_micros, :ok} = :timer.tc(fn -> | |
| :ok = EventStore.write_events(@test_topic, messages) | |
| end) | |
| # Clean up and start fresh for force-flush measurement | |
| GenServer.stop(pid) | |
| {:ok, pid2} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| # Measure force-flush write time | |
| {force_flush_micros, :ok} = :timer.tc(fn -> | |
| :ok = EventStore.write_events(@test_topic, messages) | |
| :ok = EventStore.force_flush(@test_topic) | |
| end) | |
| # Calculate metrics | |
| batch_write_ms = batch_write_micros / 1000 | |
| force_flush_ms = force_flush_micros / 1000 | |
| # Calculate throughput metrics | |
| batch_msgs_per_sec = @message_count / (batch_write_ms / 1000) | |
| force_flush_msgs_per_sec = @message_count / (force_flush_ms / 1000) | |
| batch_mb_per_sec = (message_size * @message_count) / (batch_write_ms * 1000) | |
| force_flush_mb_per_sec = (message_size * @message_count) / (force_flush_ms * 1000) | |
| # Cleanup | |
| GenServer.stop(pid2) | |
| { | |
| batch_write_ms, # Time for batch writes | |
| force_flush_ms, # Time including force flush | |
| batch_msgs_per_sec, # Messages/sec with batch | |
| force_flush_msgs_per_sec, # Messages/sec with force flush | |
| batch_mb_per_sec, # MB/sec with batch | |
| force_flush_mb_per_sec # MB/sec with force flush | |
| } | |
| end | |
| def measure_read_performance(test_dir, message_size) do | |
| {:ok, pid} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| # Write test data first | |
| messages = for _ <- 1..@message_count do | |
| %{data: %{value: generate_message(message_size)}} | |
| end | |
| :ok = EventStore.write_events(@test_topic, messages) | |
| :ok = EventStore.force_flush(@test_topic) | |
| # Measure total read time | |
| {elapsed_micros, {:ok, _events}} = :timer.tc(fn -> | |
| EventStore.read_events(@test_topic) | |
| end) | |
| # Calculate metrics | |
| elapsed_ms = elapsed_micros / 1000 | |
| messages_per_second = @message_count / (elapsed_ms / 1000) | |
| throughput_mb_per_sec = (message_size * @message_count) / (elapsed_ms * 1000) | |
| # Cleanup | |
| GenServer.stop(pid) | |
| {elapsed_ms, messages_per_second, throughput_mb_per_sec} | |
| end | |
| def measure_restart_time(test_dir) do | |
| # First, write messages | |
| {:ok, pid} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| messages = for i <- 1..@message_count do | |
| %{data: %{value: "message_#{i}"}} | |
| end | |
| :ok = EventStore.write_events(@test_topic, messages) | |
| :ok = EventStore.force_flush(@test_topic) | |
| GenServer.stop(pid) | |
| # Measure time before disk log init | |
| {pre_init_micros, _} = :timer.tc(fn -> | |
| _log_name = String.to_atom("#{@test_topic}_log") | |
| _log_file = Path.join(test_dir, "#{@test_topic}.log") | |
| File.mkdir_p!(test_dir) | |
| end) | |
| # Now measure total restart time | |
| {total_restart_micros, {:ok, _pid}} = :timer.tc(fn -> | |
| EventStore.start_link(@test_topic, base_dir: test_dir) | |
| end) | |
| {pre_init_micros, total_restart_micros} | |
| end | |
| def format_number(number, :messages) when is_number(number) do | |
| number | |
| |> round() | |
| |> Integer.to_string() | |
| |> add_thousand_separators() | |
| end | |
| def format_number(number) when is_integer(number), do: format_number(number / 1) | |
| def format_number(number) when is_float(number) do | |
| Float.round(number, 2) # Round to 2 decimal places | |
| |> :erlang.float_to_binary([decimals: 2]) # Always show 2 decimal places | |
| |> add_thousand_separators() | |
| end | |
| def add_thousand_separators(number_string) do | |
| # Handle the decimal part if it exists | |
| [int_part, decimal_part] = String.split(number_string, ".") | |
| formatted_int = int_part | |
| |> String.reverse() | |
| |> String.replace(~r/(\d{3})(?=\d)/, "\\1,") | |
| |> String.reverse() | |
| case decimal_part do | |
| nil -> formatted_int | |
| "" -> formatted_int | |
| _ -> "#{formatted_int}.#{decimal_part}" | |
| end | |
| rescue | |
| # If there's no decimal point, just format the whole string | |
| _ -> number_string | |
| |> String.reverse() | |
| |> String.replace(~r/(\d{3})(?=\d)/, "\\1,") | |
| |> String.reverse() | |
| end | |
| def align_text(text, width, :right) do | |
| text_length = String.length(text) | |
| padding = width - text_length - @inner_padding | |
| String.duplicate(" ", padding) <> text <> String.duplicate(" ", @inner_padding) | |
| end | |
| def align_text(text, width, :left) do | |
| text_length = String.length(text) | |
| String.duplicate(" ", @inner_padding) <> text <> String.duplicate(" ", width - text_length - @inner_padding) | |
| end | |
| def format_cell(value, width) when is_number(value) do | |
| formatted = if is_integer(value) do | |
| "#{value}.00" # Ensure two decimal places for integers | |
| else | |
| format_number(value) | |
| end | |
| align_text(formatted, width, :right) | |
| end | |
| def format_cell(value, width) when is_binary(value) do | |
| cond do | |
| # Handle percentage values (e.g., "1,000.00 (33.00%)") | |
| String.match?(value, ~r/^[\d,]+\.?\d*\s\(\d+\.?\d*%\)$/) -> | |
| align_text(value, width, :right) | |
| # Handle microsecond latency values - convert to integer and right align | |
| String.match?(value, ~r/^[\d,]+\.?\d*μs$/) -> | |
| {num, _} = Float.parse(String.replace(value, ["μs", ","], "")) | |
| formatted = "#{trunc(num)}μs" | |
| align_text(formatted, width, :right) | |
| # Handle other time values (ms) | |
| String.match?(value, ~r/^[\d,]+\.?\d*ms$/) -> | |
| align_text(value, width, :right) | |
| # Handle numeric strings with thousand separators | |
| String.match?(value, ~r/^[\d,]+\.?\d*$/) -> | |
| align_text(value, width, :right) | |
| String.match?(value, ~r/^\d+b$/) -> | |
| align_text(value, width, :right) | |
| true -> | |
| align_text(value, width, :left) | |
| end | |
| end | |
| def print_table_header(headers) do | |
| col_width = @cell_width + @inner_padding * 2 | |
| total_width = (col_width + 1) * length(headers) + 1 | |
| # Print top border | |
| border = String.duplicate("-", total_width - 2) | |
| IO.puts("+" <> border <> "+") | |
| # Print headers with left alignment and proper spacing | |
| header_line = headers | |
| |> Enum.map(&align_text(&1, col_width, :left)) | |
| |> Enum.join("|") | |
| IO.puts("|" <> header_line <> "|") | |
| # Print separator | |
| IO.puts("+" <> border <> "+") | |
| end | |
| def print_table_row(values) do | |
| col_width = @cell_width + @inner_padding * 2 | |
| row = values | |
| |> Enum.map(&format_cell(&1, col_width)) | |
| |> Enum.join("|") | |
| IO.puts("|" <> row <> "|") | |
| end | |
| def print_table_footer(col_count) do | |
| total_width = (@cell_width + @inner_padding * 2 + 1) * col_count + 1 | |
| IO.puts("+" <> String.duplicate("-", total_width - 2) <> "+") | |
| end | |
| test "benchmark write performance and restarts", %{test_dir: test_dir} do | |
| # Before the benchmark | |
| IO.puts("\nPre-Benchmark Memory Analysis:") | |
| analyze_memory_allocation() | |
| message_sizes = [128, 512, 1024] | |
| # Write Performance Table | |
| IO.puts("\nWrite Performance:") | |
| headers = ["Size", "Batch Time", "Force Time", "Raw M/s", "Force M/s", "Raw MB/s", "Force MB/s"] | |
| print_table_header(headers) | |
| for size <- message_sizes do | |
| {batch_write_ms, force_flush_ms, batch_mps, force_mps, batch_mb, force_mb} = | |
| measure_write_performance(test_dir, size) | |
| values = [ | |
| "#{size}b", | |
| "#{format_number(batch_write_ms)}ms", | |
| "#{format_number(force_flush_ms)}ms", | |
| format_number(batch_mps, :messages), | |
| format_number(force_mps, :messages), | |
| format_number(batch_mb), | |
| format_number(force_mb) | |
| ] | |
| print_table_row(values) | |
| end | |
| print_table_footer(length(headers)) | |
| # Messages per Batch Timeout Table | |
| IO.puts("\nMessages/s per Batch Timeout:") | |
| batch_timeouts = [50, 100, 500, 1000] | |
| headers = ["Size", "Force Time", "Raw M/s" | Enum.map(batch_timeouts, &"#{&1}ms")] | |
| print_table_header(headers) | |
| for size <- message_sizes do | |
| {_batch_write_ms, force_flush_ms, batch_mps, _force_mps, _batch_mb, _force_mb} = | |
| measure_write_performance(test_dir, size) | |
| msgs_per_timeout = Enum.map(batch_timeouts, fn timeout -> | |
| # Convert force flush time to seconds | |
| force_flush_seconds = force_flush_ms / 1000 | |
| cycle_time_seconds = (timeout + force_flush_ms) / 1000 | |
| # Calculate number of complete cycles in 1 second | |
| cycles_per_second = 1 / cycle_time_seconds | |
| # Total time spent force flushing in 1 second | |
| total_force_flush_time = cycles_per_second * force_flush_seconds | |
| # Remaining time for actual message processing | |
| processing_time = 1 - total_force_flush_time | |
| # Calculate actual messages per second considering force flush overhead | |
| actual_msgs_per_second = batch_mps * processing_time | |
| percentage = (actual_msgs_per_second / batch_mps) * 100 | |
| "#{format_number(actual_msgs_per_second, :messages)} (#{format_number(percentage, :messages)}%)" | |
| end) | |
| print_table_row([ | |
| "#{size}b", | |
| "#{format_number(force_flush_ms)}ms", | |
| format_number(batch_mps, :messages) # Format batch M/s as whole number | |
| | msgs_per_timeout | |
| ]) | |
| end | |
| print_table_footer(length(headers)) | |
| # Read Performance Table | |
| IO.puts("\nRead Performance:") | |
| headers = ["Size", "Time", "Msgs/s", "MB/s"] | |
| print_table_header(headers) | |
| for size <- message_sizes do | |
| {read_elapsed_ms, read_mps, read_throughput} = | |
| measure_read_performance(test_dir, size) | |
| values = [ | |
| "#{size}b", | |
| "#{format_number(read_elapsed_ms)}ms", | |
| format_number(read_mps, :messages), | |
| format_number(read_throughput) | |
| ] | |
| print_table_row(values) | |
| end | |
| print_table_footer(length(headers)) | |
| # Restart Performance Table | |
| IO.puts("\nRestart Performance:") | |
| headers = ["Operation", "Time"] | |
| print_table_header(headers) | |
| {pre_init_micros, total_restart_micros} = measure_restart_time(test_dir) | |
| print_table_row(["Pre-Init ", "#{trunc(pre_init_micros)}μs"]) | |
| print_table_row(["Init", "#{trunc(total_restart_micros - pre_init_micros)}μs"]) | |
| print_table_row(["Total", "#{trunc(total_restart_micros)}μs"]) | |
| print_table_footer(length(headers)) | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule EventStoreTest do | |
| use ExUnit.Case | |
| require Logger | |
| @test_topic "test_topic" | |
| # Keep small for most tests | |
| @batch_size 10 | |
| @batch_timeout 100 | |
| # Use for high frequency test | |
| @large_batch_size 1000 | |
| setup do | |
| # Start the application | |
| Application.ensure_all_started(:event_store) | |
| test_dir = "test/tmp/#{@test_topic}_#{System.unique_integer()}" | |
| File.mkdir_p!(test_dir) | |
| # Ensure test directory is clean | |
| File.rm_rf!(test_dir) | |
| File.mkdir_p!(test_dir) | |
| # Start EventStore with test configuration | |
| {:ok, pid} = | |
| EventStore.start_link(@test_topic, | |
| base_dir: test_dir, | |
| batch_size: @batch_size, | |
| batch_timeout: @batch_timeout | |
| ) | |
| on_exit(fn -> | |
| try do | |
| # First stop the GenServer process | |
| if Process.alive?(pid) do | |
| GenServer.stop(pid) | |
| # Give time for cleanup | |
| Process.sleep(100) | |
| end | |
| catch | |
| _kind, _reason -> :ok | |
| end | |
| # Stop any other event store processes that might be using the test directory | |
| case Registry.lookup(EventStore.Registry, {EventStore, "empty_topic"}) do | |
| [{pid, _}] -> | |
| try do | |
| GenServer.stop(pid) | |
| Process.sleep(100) | |
| catch | |
| _kind, _reason -> :ok | |
| end | |
| _ -> | |
| :ok | |
| end | |
| # Give extra time for disk logs to close | |
| Process.sleep(200) | |
| # Finally remove the test directory | |
| try do | |
| File.rm_rf!(test_dir) | |
| catch | |
| _kind, _reason -> :ok | |
| end | |
| end) | |
| %{pid: pid, test_dir: test_dir} | |
| end | |
| describe "basic operations" do | |
| test "reads last n events" do | |
| events = | |
| for i <- 1..20 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read last 5 events | |
| assert {:ok, last_events} = EventStore.read_last_events(@test_topic, 5) | |
| assert length(last_events) == 5 | |
| assert Enum.map(last_events, & &1.data.value) == [16, 17, 18, 19, 20] | |
| # Read more events than exist | |
| assert {:ok, all_events} = EventStore.read_last_events(@test_topic, 30) | |
| assert length(all_events) == 20 | |
| assert Enum.map(all_events, & &1.data.value) == Enum.to_list(1..20) | |
| # Read from empty log | |
| test_dir = "test/tmp/empty_#{System.unique_integer()}" | |
| File.mkdir_p!(test_dir) | |
| {:ok, pid} = EventStore.start_link("empty_topic", base_dir: test_dir) | |
| assert {:ok, []} = EventStore.read_last_events("empty_topic", 5) | |
| # Clean up the process and wait for it to stop | |
| GenServer.stop(pid) | |
| # Give time for cleanup | |
| Process.sleep(100) | |
| # Now safe to remove the directory | |
| File.rm_rf!(test_dir) | |
| end | |
| test "gets latest event when no events exist" do | |
| assert {:ok, nil} = EventStore.get_latest_event(@test_topic) | |
| end | |
| test "gets latest event" do | |
| events = | |
| for i <- 1..5 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, latest} = EventStore.get_latest_event(@test_topic) | |
| assert latest.data.value == 5 | |
| # 0-based indexing | |
| assert latest.position == 4 | |
| end | |
| test "gets latest event after multiple writes" do | |
| # Write first batch | |
| events1 = | |
| for i <- 1..3 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events1) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Write second batch | |
| events2 = | |
| for i <- 4..5 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events2) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, latest} = EventStore.get_latest_event(@test_topic) | |
| assert latest.data.value == 5 | |
| # 0-based indexing | |
| assert latest.position == 4 | |
| end | |
| test "reads events from position" do | |
| events = | |
| for i <- 1..20 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read from position 5 | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic, from_position: 5) | |
| assert length(read_events) == 15 | |
| assert Enum.at(read_events, 0).data.value == 6 | |
| assert Enum.at(read_events, 0).position == 5 | |
| end | |
| test "reads events from timestamp" do | |
| # Write first batch with explicit timestamps | |
| base_time = System.system_time(:second) | |
| events1 = | |
| for i <- 1..5 do | |
| %{data: %{value: i}, timestamp: base_time} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events1) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Set timestamp for filtering | |
| filter_time = base_time + 1 | |
| # Write second batch with later timestamps | |
| events2 = | |
| for i <- 6..10 do | |
| %{data: %{value: i}, timestamp: base_time + 2} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events2) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read from timestamp | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic, timestamp: filter_time) | |
| assert length(read_events) == 5 | |
| assert Enum.all?(read_events, fn event -> event.timestamp >= filter_time end) | |
| assert Enum.all?(read_events, fn event -> event.data.value >= 6 end) | |
| end | |
| test "combines position and timestamp filters" do | |
| # Write first batch with explicit timestamps | |
| base_time = System.system_time(:second) | |
| events1 = | |
| for i <- 1..10 do | |
| %{data: %{value: i}, timestamp: base_time} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events1) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Set timestamp for filtering | |
| filter_time = base_time + 1 | |
| # Write second batch with later timestamps | |
| events2 = | |
| for i <- 11..20 do | |
| %{data: %{value: i}, timestamp: base_time + 2} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events2) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read with both position and timestamp | |
| # Should get events 16-20 (5 events after position 15 from the second batch) | |
| assert {:ok, read_events} = | |
| EventStore.read_events(@test_topic, from_position: 15, timestamp: filter_time) | |
| assert length(read_events) == 5 | |
| assert Enum.all?(read_events, fn event -> | |
| event.timestamp >= filter_time && event.data.value >= 16 && event.position >= 15 | |
| end) | |
| end | |
| test "maintains sequential positions" do | |
| events = | |
| for i <- 1..10 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| positions = Enum.map(read_events, & &1.position) | |
| # Positions should be sequential | |
| assert positions == Enum.to_list(0..9) | |
| end | |
| test "writes and reads a single event" do | |
| event = %{data: %{value: 42}} | |
| assert :ok = EventStore.write_event(@test_topic, event) | |
| # Force flush to ensure event is written | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, [read_event]} = EventStore.read_events(@test_topic) | |
| assert read_event.data.value == event.data.value | |
| assert is_integer(read_event.position) | |
| assert is_integer(read_event.timestamp) | |
| end | |
| test "handles events with existing timestamps" do | |
| timestamp = System.system_time(:second) | |
| event = %{data: %{value: 42}, timestamp: timestamp} | |
| assert :ok = EventStore.write_event(@test_topic, event) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, [read_event]} = EventStore.read_events(@test_topic) | |
| assert read_event.timestamp == timestamp | |
| assert read_event.data.value == event.data.value | |
| assert is_integer(read_event.position) | |
| end | |
| end | |
| describe "data integrity" do | |
| test "maintains event order when reading with different chunk sizes" do | |
| events = | |
| for i <- 1..50 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # First get a baseline reading with standard chunk size | |
| {:ok, reference_events} = EventStore.read_events(@test_topic) | |
| # Verify baseline reading has all events in order | |
| assert length(reference_events) == 50, "Should have read all 50 events" | |
| assert Enum.map(reference_events, & &1.data.value) == Enum.to_list(1..50) | |
| # Test reading with smaller chunk sizes | |
| chunk_sizes = [10, 20, 30] | |
| for chunk_size <- chunk_sizes do | |
| {:ok, chunked_events} = EventStore.read_events(@test_topic, chunk_size: chunk_size) | |
| assert length(chunked_events) == chunk_size, | |
| "Reading with chunk_size #{chunk_size} should return #{chunk_size} events" | |
| # Verify the chunked events match the corresponding subset of reference events | |
| expected_events = Enum.take(reference_events, chunk_size) | |
| assert chunked_events == expected_events, | |
| "Events from chunk_size #{chunk_size} don't match reference events" | |
| end | |
| end | |
| test "maintains event order when reading from multiple files" do | |
| # Write events in multiple batches to ensure multiple files | |
| events1 = | |
| for i <- 1..50 do | |
| %{data: %{value: i}, timestamp: System.system_time(:second)} | |
| end | |
| events2 = | |
| for i <- 51..100 do | |
| %{data: %{value: i}, timestamp: System.system_time(:second)} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events1) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Brief pause to ensure different timestamps | |
| Process.sleep(10) | |
| assert :ok = EventStore.write_events(@test_topic, events2) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read all events and verify order | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) == 100 | |
| # Verify values are in correct sequential order | |
| values = Enum.map(read_events, & &1.data.value) | |
| assert values == Enum.to_list(1..100) | |
| # Verify positions are sequential | |
| positions = Enum.map(read_events, & &1.position) | |
| # 0-based positions | |
| assert positions == Enum.to_list(0..99) | |
| end | |
| test "ensures no duplicate positions are assigned" do | |
| # Write initial batch of events | |
| events1 = | |
| for i <- 1..50 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events1) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Write another batch with overlapping logical sequence | |
| events2 = | |
| for i <- 40..60 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events2) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read all events | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| # Extract positions and check for duplicates | |
| positions = Enum.map(read_events, & &1.position) | |
| unique_positions = Enum.uniq(positions) | |
| assert length(positions) == length(unique_positions), "Duplicate positions found" | |
| # Verify positions are sequential | |
| assert positions == Enum.to_list(0..(length(positions) - 1)) | |
| end | |
| test "maintains data consistency after unexpected process termination", %{test_dir: test_dir} do | |
| # Start a DynamicSupervisor for the test | |
| {:ok, sup_pid} = DynamicSupervisor.start_link(strategy: :one_for_one) | |
| # Enable error trapping in the test process | |
| Process.flag(:trap_exit, true) | |
| # Write initial batch | |
| events1 = | |
| for i <- 1..30 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events1) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Ensure data is written | |
| Process.sleep(100) | |
| # Get the current process and verify it exists | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| assert Process.alive?(pid) | |
| # Verify initial data | |
| assert {:ok, initial_events} = EventStore.read_events(@test_topic) | |
| assert length(initial_events) == 30 | |
| # Monitor both the process and the registry | |
| process_ref = Process.monitor(pid) | |
| registry_ref = Process.monitor(Process.whereis(EventStore.Registry)) | |
| # Terminate the process with shutdown signal | |
| Process.exit(pid, :shutdown) | |
| # Wait for process to die and registry to clean up | |
| assert_receive {:DOWN, ^process_ref, :process, ^pid, _}, 1000 | |
| # Wait for cleanup | |
| Process.sleep(200) | |
| # Ensure the process is really dead | |
| refute Process.alive?(pid) | |
| # Wait for any registry messages | |
| receive do | |
| {:DOWN, ^registry_ref, :process, _, _} -> :ok | |
| after | |
| 0 -> :ok | |
| end | |
| # Clean up registry and wait for it to restart | |
| Process.sleep(200) | |
| # Write more events after restart | |
| events2 = | |
| for i <- 31..60 do | |
| %{data: %{value: i}} | |
| end | |
| # Start new process under dynamic supervisor | |
| child_spec = %{ | |
| id: EventStore, | |
| start: | |
| {EventStore, :start_link, | |
| [ | |
| @test_topic, | |
| [ | |
| base_dir: test_dir, | |
| batch_size: @batch_size, | |
| batch_timeout: @batch_timeout | |
| ] | |
| ]}, | |
| restart: :temporary | |
| } | |
| # Start the process under supervision with retry | |
| {:ok, new_pid} = retry_start_supervised(sup_pid, child_spec) | |
| # Verify new process is alive | |
| assert Process.alive?(new_pid) | |
| # Ensure process is fully started | |
| Process.sleep(200) | |
| # Write events in smaller batches with error handling | |
| Enum.chunk_every(events2, 10) | |
| |> Enum.each(fn chunk -> | |
| retry_operation(fn -> | |
| :ok = EventStore.write_events(@test_topic, chunk) | |
| :ok = EventStore.force_flush(@test_topic) | |
| # Brief pause between chunks | |
| Process.sleep(50) | |
| end) | |
| end) | |
| # Final pause to ensure all writes are complete | |
| Process.sleep(100) | |
| # Verify all data is consistent with retry logic | |
| {:ok, read_events} = | |
| retry_operation(fn -> | |
| EventStore.read_events(@test_topic) | |
| end) | |
| assert length(read_events) == 60 | |
| values = Enum.map(read_events, & &1.data.value) | |
| assert values == Enum.to_list(1..60) | |
| positions = Enum.map(read_events, & &1.position) | |
| assert positions == Enum.to_list(0..59) | |
| # Clean up supervisor | |
| DynamicSupervisor.stop(sup_pid, :normal) | |
| end | |
| # Helper function to retry starting a supervised process | |
| defp retry_start_supervised(sup_pid, child_spec, attempts \\ 3) do | |
| case DynamicSupervisor.start_child(sup_pid, child_spec) do | |
| {:ok, pid} -> | |
| {:ok, pid} | |
| {:error, _} when attempts > 1 -> | |
| Process.sleep(100) | |
| retry_start_supervised(sup_pid, child_spec, attempts - 1) | |
| error -> | |
| error | |
| end | |
| end | |
| # Helper function to retry operations with exponential backoff | |
| defp retry_operation(fun, attempts \\ 3, delay \\ 100) do | |
| try do | |
| fun.() | |
| rescue | |
| e -> | |
| if attempts > 1 do | |
| Process.sleep(delay) | |
| retry_operation(fun, attempts - 1, delay * 2) | |
| else | |
| reraise e, __STACKTRACE__ | |
| end | |
| catch | |
| :exit, _ when attempts > 1 -> | |
| Process.sleep(delay) | |
| retry_operation(fun, attempts - 1, delay * 2) | |
| kind, reason -> | |
| :erlang.raise(kind, reason, __STACKTRACE__) | |
| end | |
| end | |
| test "maintains correct order with same timestamp events" do | |
| timestamp = System.system_time(:second) | |
| # Create multiple events with the same timestamp | |
| events = | |
| for i <- 1..20 do | |
| %{data: %{value: i}, timestamp: timestamp} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read events back | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| # Verify all events have the same timestamp | |
| timestamps = Enum.map(read_events, & &1.timestamp) | |
| assert Enum.all?(timestamps, &(&1 == timestamp)) | |
| # Verify events maintained their order despite same timestamp | |
| values = Enum.map(read_events, & &1.data.value) | |
| assert values == Enum.to_list(1..20) | |
| # Verify positions are sequential | |
| positions = Enum.map(read_events, & &1.position) | |
| assert positions == Enum.to_list(0..19) | |
| end | |
| end | |
| describe "batch operations" do | |
| test "writes and reads multiple events in batch" do | |
| events = | |
| for i <- 1..20 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) == length(events) | |
| Enum.zip(events, read_events) | |
| |> Enum.each(fn {original, read} -> | |
| assert read.data.value == original.data.value | |
| assert is_integer(read.position) | |
| assert is_integer(read.timestamp) | |
| end) | |
| # Verify positions are sequential | |
| positions = Enum.map(read_events, & &1.position) | |
| assert positions == Enum.to_list(0..19) | |
| end | |
| test "automatically flushes batch when size limit reached" do | |
| events = | |
| for i <- 1..(@batch_size + 1) do | |
| %{data: %{value: i}} | |
| end | |
| # Write events one by one | |
| Enum.each(events, fn event -> | |
| EventStore.write_event(@test_topic, event) | |
| end) | |
| # Wait briefly to ensure batch processing | |
| Process.sleep(50) | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) >= @batch_size | |
| end | |
| test "automatically flushes batch after timeout" do | |
| event = %{data: %{value: 1}} | |
| EventStore.write_event(@test_topic, event) | |
| # Wait for batch timeout | |
| Process.sleep(@batch_timeout + 50) | |
| assert {:ok, [read_event]} = EventStore.read_events(@test_topic) | |
| assert read_event.data == event.data | |
| end | |
| end | |
| describe "resilience" do | |
| test "survives process restarts", %{test_dir: test_dir} do | |
| events = | |
| for i <- 1..5 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Get the current process | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| ref = Process.monitor(pid) | |
| # Stop the process normally | |
| GenServer.stop(pid, :normal) | |
| # Wait for process to stop | |
| assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1000 | |
| # Restart the process | |
| {:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| # Verify data persisted | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) == length(events) | |
| end | |
| test "maintains position sequence across restarts", %{test_dir: test_dir} do | |
| # Write initial events | |
| events1 = | |
| for i <- 1..5 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events1) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Verify initial positions | |
| assert {:ok, initial_events} = EventStore.read_events(@test_topic) | |
| assert Enum.map(initial_events, & &1.position) == Enum.to_list(0..4) | |
| # Get the current process | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| ref = Process.monitor(pid) | |
| # Stop the process normally | |
| GenServer.stop(pid, :normal) | |
| # Wait for process to stop | |
| assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1000 | |
| # Restart the process | |
| {:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| # Write more events after restart | |
| events2 = | |
| for i <- 6..10 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events2) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Verify all positions are sequential | |
| assert {:ok, all_events} = EventStore.read_events(@test_topic) | |
| assert length(all_events) == 10 | |
| assert Enum.map(all_events, & &1.position) == Enum.to_list(0..9) | |
| # Verify values are in correct order | |
| assert Enum.map(all_events, & &1.data.value) == Enum.to_list(1..10) | |
| end | |
| test "handles concurrent writes" do | |
| events = | |
| for i <- 1..100 do | |
| %{data: %{value: i}} | |
| end | |
| # Spawn multiple processes to write concurrently | |
| tasks = | |
| Enum.map(events, fn event -> | |
| Task.async(fn -> | |
| EventStore.write_event(@test_topic, event) | |
| end) | |
| end) | |
| # Wait for all writes to complete | |
| results = Task.await_many(tasks) | |
| assert Enum.all?(results, &(&1 == :ok)) | |
| # Force flush to ensure all events are written | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Verify we can read the events | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) == length(events) | |
| end | |
| end | |
| describe "metrics" do | |
| test "tracks write metrics" do | |
| events = | |
| for i <- 1..5 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| metrics = EventStore.get_metrics(@test_topic) | |
| assert metrics.writes == length(events) | |
| assert metrics.batches > 0 | |
| assert metrics.last_write_time != nil | |
| end | |
| test "tracks read metrics" do | |
| events = | |
| for i <- 1..5 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| metrics = EventStore.get_metrics(@test_topic) | |
| assert metrics.reads == length(read_events) | |
| assert metrics.last_read_time != nil | |
| end | |
| end | |
| describe "cleanup" do | |
| test "performs cleanup operations", %{test_dir: _test_dir} do | |
| # Write enough events to trigger cleanup | |
| events = | |
| for i <- 1..10_000 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Force cleanup by sending cleanup message | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| send(pid, :cleanup) | |
| # Wait for cleanup to complete | |
| Process.sleep(100) | |
| # Verify we can still read events after cleanup | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) > 0 | |
| end | |
| end | |
| describe "error handling" do | |
| test "handles disk full scenario", %{test_dir: test_dir} do | |
| # Mock disk full by making directory read-only | |
| File.chmod!(test_dir, 0o444) | |
| event = %{data: %{value: 1}} | |
| result = EventStore.write_event(@test_topic, event) | |
| # Reset permissions | |
| File.chmod!(test_dir, 0o755) | |
| # Should return error or crash gracefully | |
| assert result in [:ok, {:error, :no_such_log}, {:error, :full}] | |
| end | |
| test "handles corrupted log file", %{test_dir: test_dir} do | |
| # Write some initial data | |
| events = [%{data: %{value: 1}}] | |
| :ok = EventStore.write_events(@test_topic, events) | |
| :ok = EventStore.force_flush(@test_topic) | |
| # Corrupt the log file | |
| log_file = Path.join(test_dir, "#{@test_topic}.LOG") | |
| File.write!(log_file, "corrupted data", [:append]) | |
| # Stop the current process | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| GenServer.stop(pid) | |
| # Restart should handle corruption | |
| {:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| # Should still be able to write and read | |
| new_event = %{data: %{value: 2}} | |
| assert :ok = EventStore.write_event(@test_topic, new_event) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, _events} = EventStore.read_events(@test_topic) | |
| end | |
| test "handles invalid input data" do | |
| # Test with empty/invalid events | |
| assert :ok = EventStore.write_event(@test_topic, nil) | |
| assert :ok = EventStore.write_event(@test_topic, "not a map") | |
| # Test with invalid event structure | |
| invalid_event = %{invalid: "structure"} | |
| assert :ok = EventStore.write_event(@test_topic, invalid_event) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Should still be readable | |
| assert {:ok, events} = EventStore.read_events(@test_topic) | |
| assert length(events) == 3 | |
| # Verify events were wrapped properly | |
| [nil_event, string_event, invalid_struct_event] = events | |
| assert nil_event.data == %{value: nil} | |
| assert string_event.data == %{value: "not a map"} | |
| assert invalid_struct_event.data == %{value: %{invalid: "structure"}} | |
| end | |
| test "handles non-existent topic" do | |
| non_existent = "non_existent_topic" | |
| # Try operations without starting the topic process | |
| assert {:error, {:noproc, _}} = try_read_events(non_existent) | |
| assert {:error, {:noproc, _}} = try_get_latest_event(non_existent) | |
| assert {:error, {:noproc, _}} = try_read_last_events(non_existent, 5) | |
| # Write event should fail since topic doesn't exist | |
| assert {:error, {:noproc, _}} = try_write_event(non_existent, %{data: %{value: 1}}) | |
| end | |
| test "handles invalid base_dir" do | |
| invalid_dir = "/root/invalid_event_store_dir" | |
| assert {:error, _reason} = EventStore.start_link(@test_topic, base_dir: invalid_dir) | |
| end | |
| # Helper functions for safely testing non-existent topics | |
| defp try_read_events(topic) do | |
| try do | |
| EventStore.read_events(topic) | |
| catch | |
| :exit, reason -> {:error, {:noproc, reason}} | |
| end | |
| end | |
| defp try_get_latest_event(topic) do | |
| try do | |
| EventStore.get_latest_event(topic) | |
| catch | |
| :exit, reason -> {:error, {:noproc, reason}} | |
| end | |
| end | |
| defp try_read_last_events(topic, n) do | |
| try do | |
| EventStore.read_last_events(topic, n) | |
| catch | |
| :exit, reason -> {:error, {:noproc, reason}} | |
| end | |
| end | |
| defp try_write_event(topic, event) do | |
| try do | |
| # Use call instead of cast to ensure we get the error | |
| GenServer.call(via_tuple(topic), {:write, event}) | |
| catch | |
| :exit, reason -> {:error, {:noproc, reason}} | |
| end | |
| end | |
| defp via_tuple(topic) do | |
| {:via, Registry, {EventStore.Registry, {EventStore, topic}}} | |
| end | |
| end | |
| describe "edge cases" do | |
| test "handles empty events" do | |
| # Write empty event list | |
| assert :ok = EventStore.write_events(@test_topic, []) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, []} = EventStore.read_events(@test_topic) | |
| # Write single empty event | |
| assert :ok = EventStore.write_event(@test_topic, %{data: %{value: ""}}) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, [event]} = EventStore.read_events(@test_topic) | |
| assert event.data.value == "" | |
| assert is_integer(event.position) | |
| end | |
| test "handles maximum integer values for positions" do | |
| # Create events that would exceed max integer | |
| # JavaScript max safe integer | |
| max_safe_integer = 9_007_199_254_740_991 | |
| # Set initial position near max value | |
| events = | |
| for i <- (max_safe_integer - 10)..max_safe_integer do | |
| %{data: %{value: i}, position: i} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| positions = Enum.map(read_events, & &1.position) | |
| assert length(positions) == length(events) | |
| assert Enum.all?(positions, &is_integer/1) | |
| end | |
| test "handles minimum/maximum timestamp values" do | |
| min_timestamp = 0 | |
| # 32-bit max | |
| max_timestamp = (:math.pow(2, 32) - 1) |> round() | |
| events = [ | |
| %{data: %{value: "min"}, timestamp: min_timestamp}, | |
| %{data: %{value: "max"}, timestamp: max_timestamp} | |
| ] | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) == 2 | |
| [event1, event2] = read_events | |
| assert event1.timestamp == min_timestamp | |
| assert event2.timestamp == max_timestamp | |
| end | |
| test "handles batch buffer partially full at shutdown", %{test_dir: test_dir} do | |
| # Write some events but don't flush | |
| events = | |
| for i <- 1..(@batch_size - 1) do | |
| %{data: %{value: i}} | |
| end | |
| Enum.each(events, fn event -> | |
| assert :ok = EventStore.write_event(@test_topic, event) | |
| end) | |
| # Get the current process | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| # Stop the process normally (should trigger terminate callback) | |
| GenServer.stop(pid, :normal) | |
| Process.sleep(100) | |
| # Restart and verify data was saved | |
| {:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir) | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| # Verify all events were saved | |
| assert length(read_events) == length(events) | |
| values = Enum.map(read_events, & &1.data.value) | |
| assert values == Enum.to_list(1..(@batch_size - 1)) | |
| end | |
| end | |
| describe "concurrent access" do | |
| test "handles multiple readers while writing" do | |
| # Start multiple reader processes | |
| reader_count = 5 | |
| read_iterations = 20 | |
| # Write initial events | |
| initial_events = | |
| for i <- 1..50 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, initial_events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Start reader tasks | |
| reader_tasks = | |
| for _i <- 1..reader_count do | |
| Task.async(fn -> | |
| for _j <- 1..read_iterations do | |
| {:ok, events} = EventStore.read_events(@test_topic) | |
| assert length(events) >= length(initial_events) | |
| # Random delay | |
| Process.sleep(Enum.random(1..10)) | |
| end | |
| :ok | |
| end) | |
| end | |
| # Write more events while readers are running | |
| writer_task = | |
| Task.async(fn -> | |
| for i <- 51..100 do | |
| :ok = EventStore.write_event(@test_topic, %{data: %{value: i}}) | |
| # Random delay | |
| Process.sleep(Enum.random(1..10)) | |
| end | |
| :ok = EventStore.force_flush(@test_topic) | |
| :ok | |
| end) | |
| # Wait for all tasks to complete | |
| assert :ok = Task.await(writer_task) | |
| assert Enum.all?(Task.await_many(reader_tasks), &(&1 == :ok)) | |
| # Verify final state | |
| assert {:ok, final_events} = EventStore.read_events(@test_topic) | |
| assert length(final_events) == 100 | |
| end | |
| test "handles multiple writers to same topic" do | |
| writer_count = 5 | |
| events_per_writer = 20 | |
| # Start multiple writer tasks | |
| writer_tasks = | |
| for writer_id <- 1..writer_count do | |
| Task.async(fn -> | |
| for i <- 1..events_per_writer do | |
| event = %{data: %{writer: writer_id, value: i}} | |
| :ok = EventStore.write_event(@test_topic, event) | |
| # Random delay | |
| Process.sleep(Enum.random(1..10)) | |
| end | |
| :ok | |
| end) | |
| end | |
| # Wait for all writers to complete | |
| assert Enum.all?(Task.await_many(writer_tasks), &(&1 == :ok)) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Verify results | |
| assert {:ok, events} = EventStore.read_events(@test_topic) | |
| assert length(events) == writer_count * events_per_writer | |
| # Verify all events from all writers are present | |
| events_by_writer = Enum.group_by(events, & &1.data.writer) | |
| assert map_size(events_by_writer) == writer_count | |
| Enum.each(events_by_writer, fn {_writer_id, writer_events} -> | |
| values = Enum.map(writer_events, & &1.data.value) | |
| assert length(values) == events_per_writer | |
| assert Enum.sort(values) == Enum.to_list(1..events_per_writer) | |
| end) | |
| end | |
| test "handles multiple topics simultaneously" do | |
| topic_count = 5 | |
| events_per_topic = 20 | |
| # Create multiple topics | |
| topics = | |
| for i <- 1..topic_count do | |
| topic = "test_topic_#{i}" | |
| test_dir = "test/tmp/#{topic}" | |
| File.mkdir_p!(test_dir) | |
| {:ok, _} = EventStore.start_link(topic, base_dir: test_dir) | |
| topic | |
| end | |
| # Write to all topics concurrently | |
| writer_tasks = | |
| for topic <- topics do | |
| Task.async(fn -> | |
| for i <- 1..events_per_topic do | |
| event = %{data: %{value: i}} | |
| :ok = EventStore.write_event(topic, event) | |
| end | |
| :ok = EventStore.force_flush(topic) | |
| :ok | |
| end) | |
| end | |
| # Wait for all writers | |
| assert Enum.all?(Task.await_many(writer_tasks), &(&1 == :ok)) | |
| # Verify each topic | |
| for topic <- topics do | |
| assert {:ok, events} = EventStore.read_events(topic) | |
| assert length(events) == events_per_topic | |
| values = Enum.map(events, & &1.data.value) | |
| assert Enum.sort(values) == Enum.to_list(1..events_per_topic) | |
| end | |
| # Cleanup | |
| for topic <- topics do | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, topic}}) | |
| GenServer.stop(pid) | |
| File.rm_rf!("test/tmp/#{topic}") | |
| end | |
| end | |
| test "handles race conditions during cleanup operations" do | |
| # Write initial events in smaller batches | |
| initial_events = | |
| for i <- 1..100 do | |
| %{data: %{value: i}} | |
| end | |
| # Write in chunks of 10 | |
| Enum.chunk_every(initial_events, 10) | |
| |> Enum.each(fn chunk -> | |
| assert :ok = EventStore.write_events(@test_topic, chunk) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Brief pause between chunks | |
| Process.sleep(10) | |
| end) | |
| # Verify initial state with chunk_size | |
| assert {:ok, events_before} = EventStore.read_events(@test_topic, chunk_size: 1000) | |
| assert length(events_before) == 100 | |
| # Create an agent to track write completion | |
| {:ok, write_agent} = Agent.start_link(fn -> 0 end) | |
| # Start concurrent readers | |
| reader_task = | |
| Task.async(fn -> | |
| for _i <- 1..20 do | |
| {:ok, events} = EventStore.read_events(@test_topic, chunk_size: 1000) | |
| # Should at least have initial events | |
| assert length(events) >= 100 | |
| Process.sleep(Enum.random(5..15)) | |
| end | |
| :ok | |
| end) | |
| # Start concurrent writers | |
| writer_task = | |
| Task.async(fn -> | |
| # Write additional events in smaller batches | |
| additional_events = | |
| for i <- 101..150 do | |
| %{data: %{value: i}} | |
| end | |
| Enum.chunk_every(additional_events, 5) | |
| |> Enum.each(fn chunk -> | |
| :ok = EventStore.write_events(@test_topic, chunk) | |
| :ok = EventStore.force_flush(@test_topic) | |
| Agent.update(write_agent, &(&1 + length(chunk))) | |
| Process.sleep(Enum.random(5..15)) | |
| end) | |
| :ok | |
| end) | |
| # Wait for writers to make progress | |
| Process.sleep(200) | |
| # Trigger cleanup and monitor its completion | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| cleanup_ref = Process.monitor(pid) | |
| send(pid, :cleanup) | |
| # Wait for cleanup message to be processed | |
| Process.sleep(200) | |
| # Wait for operations to complete | |
| assert :ok = Task.await(reader_task, 5000) | |
| assert :ok = Task.await(writer_task, 5000) | |
| # Ensure all writes are flushed | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Wait for final flush | |
| Process.sleep(200) | |
| # Wait for cleanup to complete or timeout | |
| receive do | |
| {:DOWN, ^cleanup_ref, :process, ^pid, _} -> :ok | |
| after | |
| 1000 -> :ok | |
| end | |
| # Function to retry reads with exponential backoff | |
| retry_read = fn retry_fn, attempts -> | |
| case EventStore.read_events(@test_topic, chunk_size: 1000) do | |
| {:ok, events} when length(events) >= 150 -> | |
| {:ok, events} | |
| _ when attempts > 1 -> | |
| # Longer delay between retries | |
| Process.sleep(200) | |
| retry_fn.(retry_fn, attempts - 1) | |
| {:ok, events} -> | |
| {:ok, events} | |
| end | |
| end | |
| # Verify final state with retries | |
| # More retries | |
| assert {:ok, final_events} = retry_read.(retry_read, 10) | |
| assert length(final_events) == 150 | |
| # Verify all events are present and in order | |
| values = Enum.map(final_events, & &1.data.value) | |
| sorted_values = Enum.sort(values) | |
| assert sorted_values == Enum.to_list(1..150) | |
| # Clean up agent | |
| Agent.stop(write_agent) | |
| end | |
| end | |
| describe "performance boundaries" do | |
| test "handles large binary data chunks" do | |
| # Create events with large binary data (1MB each) | |
| binary_data = String.duplicate("a", 1024 * 1024) | |
| events = | |
| for _i <- 1..4 do | |
| %{data: %{value: binary_data}} | |
| end | |
| # Write events and force flush | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read events back | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| # Verify data integrity | |
| assert length(read_events) == 4 | |
| Enum.each(read_events, fn event -> | |
| assert String.length(event.data.value) == 1024 * 1024 | |
| assert event.data.value == binary_data | |
| end) | |
| # Verify events are in correct order | |
| positions = Enum.map(read_events, & &1.position) | |
| assert positions == Enum.to_list(0..3) | |
| end | |
| test "handles extremely large events" do | |
| # Create a large event with 100KB of data | |
| large_data = String.duplicate("a", 100 * 1024) | |
| event = %{data: %{value: large_data}} | |
| assert :ok = EventStore.write_event(@test_topic, event) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| assert {:ok, [read_event]} = EventStore.read_events(@test_topic) | |
| assert read_event.data.value == large_data | |
| end | |
| test "handles high write frequency" do | |
| # Use a unique topic name for this test | |
| test_topic = "high_freq_test_#{System.unique_integer()}" | |
| test_dir = "test/tmp/#{test_topic}_#{System.unique_integer()}" | |
| File.mkdir_p!(test_dir) | |
| # Start with larger batch size for this test | |
| {:ok, pid} = | |
| EventStore.start_link(test_topic, | |
| base_dir: test_dir, | |
| batch_size: @large_batch_size, | |
| batch_timeout: @batch_timeout | |
| ) | |
| # Write 1000 events as fast as possible | |
| events = | |
| for i <- 1..1000 do | |
| %{data: %{value: i}} | |
| end | |
| # Write all events at once | |
| assert :ok = EventStore.write_events(test_topic, events) | |
| on_exit(fn -> | |
| try do | |
| if Process.alive?(pid) do | |
| GenServer.stop(pid) | |
| end | |
| catch | |
| _kind, _reason -> :ok | |
| end | |
| try do | |
| File.rm_rf!(test_dir) | |
| catch | |
| _kind, _reason -> :ok | |
| end | |
| end) | |
| assert :ok = EventStore.force_flush(test_topic) | |
| # Wait for all writes to complete | |
| Process.sleep(500) | |
| # Read and verify in chunks to avoid memory pressure | |
| chunks = Enum.chunk_every(1..1000, 100) | |
| Enum.each(chunks, fn range -> | |
| {:ok, current_events} = EventStore.read_events(test_topic, chunk_size: 1000) | |
| # Log progress | |
| Logger.debug("Verifying events #{Enum.min(range)}..#{Enum.max(range)}") | |
| # Verify all events are present and in order | |
| values = Enum.map(current_events, & &1.data.value) | |
| assert length(values) == 1000, | |
| "Expected 1000 events, got #{length(values)}" | |
| # Verify this chunk's values are present and in order | |
| chunk_values = Enum.slice(values, Enum.min(range) - 1, length(range)) | |
| assert chunk_values == Enum.to_list(range), | |
| "Events #{Enum.min(range)}..#{Enum.max(range)} are not in correct order" | |
| # Brief pause between verifications | |
| Process.sleep(100) | |
| end) | |
| # Verify all events were written | |
| assert {:ok, read_events} = EventStore.read_events(test_topic, chunk_size: 1000) | |
| # Verify we got all events and they're in order | |
| values = Enum.map(read_events, & &1.data.value) | |
| assert values == Enum.to_list(1..1000) | |
| assert length(read_events) == 1000 | |
| end | |
| test "handles memory usage under load", %{test_dir: test_dir} do | |
| # Write a large number of medium-sized events | |
| events = | |
| for i <- 1..5000 do | |
| # Each event around 1KB | |
| data = String.duplicate("data-#{i}", 100) | |
| %{data: %{value: data}} | |
| end | |
| # Write events in chunks to avoid overwhelming the system | |
| Enum.chunk_every(events, 500) | |
| |> Enum.each(fn chunk -> | |
| assert :ok = EventStore.write_events(@test_topic, chunk) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Brief pause to allow for cleanup | |
| Process.sleep(10) | |
| end) | |
| # Verify we can still read all events | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) > 0 | |
| # Check disk usage hasn't exceeded limits | |
| files = File.ls!(test_dir) | |
| log_file = | |
| files | |
| |> Enum.find(fn file -> String.starts_with?(file, @test_topic) end) | |
| |> case do | |
| nil -> raise "Log file not found in #{test_dir}" | |
| file -> Path.join(test_dir, file) | |
| end | |
| {:ok, %{size: size}} = File.stat(log_file) | |
| # Should be less than 10MB | |
| assert size < 1024 * 1024 * 10 | |
| end | |
| test "handles behavior near max file size limits", %{test_dir: test_dir} do | |
| # Write events until we exceed the default max chunk size | |
| # 1MB | |
| chunk_size = 1024 * 1024 | |
| # 10KB per event | |
| data_size = 10 * 1024 | |
| events_per_chunk = div(chunk_size, data_size) | |
| # Generate events that will fill multiple chunks | |
| events = | |
| for i <- 1..(events_per_chunk * 2) do | |
| data = String.duplicate("#{i}", data_size) | |
| %{data: %{value: data}} | |
| end | |
| # Write events and force cleanup | |
| assert :ok = EventStore.write_events(@test_topic, events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Force cleanup | |
| pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}}) | |
| send(pid, :cleanup) | |
| # Wait for cleanup | |
| Process.sleep(100) | |
| # Write more events after cleanup | |
| more_events = | |
| for i <- 1..10 do | |
| %{data: %{value: i}} | |
| end | |
| assert :ok = EventStore.write_events(@test_topic, more_events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Verify we can still read events after hitting size limits | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) > 0 | |
| # Verify log file size is managed | |
| # List files in directory to see what was created | |
| files = File.ls!(test_dir) | |
| log_file = | |
| files | |
| |> Enum.find(fn file -> String.starts_with?(file, @test_topic) end) | |
| |> case do | |
| nil -> raise "Log file not found in #{test_dir}" | |
| file -> Path.join(test_dir, file) | |
| end | |
| {:ok, %{size: size}} = File.stat(log_file) | |
| # Should not exceed max total size | |
| assert size < chunk_size * 10 | |
| end | |
| end | |
| describe "data format handling" do | |
| test "handles deeply nested data structures" do | |
| # Create a deeply nested map structure | |
| deep_nest = fn depth -> | |
| Enum.reduce(1..depth, %{value: "leaf"}, fn i, acc -> | |
| %{"level_#{i}" => acc} | |
| end) | |
| end | |
| # Create events with increasingly deep nesting | |
| nested_events = | |
| for depth <- 1..10 do | |
| %{ | |
| data: %{ | |
| depth: depth, | |
| nested: deep_nest.(depth), | |
| arrays: List.duplicate([1, 2, [3, 4, [5]]], depth) | |
| } | |
| } | |
| end | |
| # Write all nested events | |
| assert :ok = EventStore.write_events(@test_topic, nested_events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read back and verify | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) == length(nested_events) | |
| # Compare original and read data | |
| Enum.zip(nested_events, read_events) | |
| |> Enum.each(fn {original, read} -> | |
| assert read.data == original.data | |
| assert is_integer(read.position) | |
| assert is_integer(read.timestamp) | |
| # Verify the depth matches | |
| assert read.data.depth == original.data.depth | |
| # Verify nested structure is preserved | |
| assert read.data.nested == original.data.nested | |
| # Verify nested arrays are preserved | |
| assert read.data.arrays == original.data.arrays | |
| end) | |
| # Verify positions are sequential | |
| positions = Enum.map(read_events, & &1.position) | |
| assert positions == Enum.to_list(0..(length(nested_events) - 1)) | |
| end | |
| test "handles events with special characters and unusual formats" do | |
| special_events = [ | |
| # Unicode emoji | |
| %{data: %{value: "Hello 🌍 World!"}}, | |
| # Control characters | |
| %{data: %{value: "\t\n\r"}}, | |
| # Mixed quotes | |
| %{data: %{value: "'''\"\"'''"}}, | |
| # Long Unicode string | |
| %{data: %{value: String.duplicate("长", 100)}}, | |
| # Complex nested structure | |
| %{data: %{value: %{nested: %{deep: "🎉", array: [1, "2", :three]}}}}, | |
| # Escaped characters | |
| %{data: %{value: "\\special\\chars"}}, | |
| # Empty array | |
| %{data: %{value: []}}, | |
| # Empty map | |
| %{data: %{value: %{}}}, | |
| # Mixed types array | |
| %{data: %{value: [nil, true, false, 42, 3.14]}}, | |
| # HTML-like content | |
| %{data: %{value: "<script>alert('test')</script>"}} | |
| ] | |
| # Write all special events | |
| assert :ok = EventStore.write_events(@test_topic, special_events) | |
| assert :ok = EventStore.force_flush(@test_topic) | |
| # Read back and verify | |
| assert {:ok, read_events} = EventStore.read_events(@test_topic) | |
| assert length(read_events) == length(special_events) | |
| # Compare original and read data | |
| Enum.zip(special_events, read_events) | |
| |> Enum.each(fn {original, read} -> | |
| assert read.data == original.data | |
| assert is_integer(read.position) | |
| assert is_integer(read.timestamp) | |
| end) | |
| # Verify positions are sequential | |
| positions = Enum.map(read_events, & &1.position) | |
| assert positions == Enum.to_list(0..(length(special_events) - 1)) | |
| end | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule SimpleEventStore do | |
| @moduledoc """ | |
| A simplified implementation of EventStore using only disk_log. | |
| This version is primarily for testing and understanding disk_log behavior. | |
| """ | |
| use GenServer | |
| require Logger | |
| defmodule State do | |
| defstruct [:topic, :log_name, :base_dir] | |
| end | |
| # Public API | |
| def start_link(topic, opts \\ []) do | |
| GenServer.start_link(__MODULE__, [topic, opts], name: via_tuple(topic)) | |
| end | |
| def write_event(topic, event) do | |
| GenServer.call(via_tuple(topic), {:write, event}) | |
| end | |
| def read_events(topic) do | |
| GenServer.call(via_tuple(topic), :read) | |
| end | |
| # Server Implementation | |
| @impl true | |
| def init([topic, opts]) do | |
| base_dir = Keyword.get(opts, :base_dir, "data/#{topic}") | |
| File.mkdir_p!(base_dir) | |
| log_name = String.to_atom("#{topic}_log") | |
| log_file = Path.join(base_dir, "#{topic}.log") | |
| # Open or create disk_log | |
| case :disk_log.open( | |
| name: log_name, | |
| file: String.to_charlist(log_file), | |
| type: :wrap, | |
| # 1MB file, 10 files max | |
| size: {1024 * 1024, 10}, | |
| mode: :read_write | |
| ) do | |
| {:ok, _} -> | |
| {:ok, %State{topic: topic, log_name: log_name, base_dir: base_dir}} | |
| {:repaired, _, _, _} -> | |
| Logger.info("Repaired corrupted log file") | |
| {:ok, %State{topic: topic, log_name: log_name, base_dir: base_dir}} | |
| {:error, reason} -> | |
| Logger.error("Failed to open disk_log: #{inspect(reason)}") | |
| {:stop, reason} | |
| end | |
| end | |
| @impl true | |
| def handle_call({:write, event}, _from, state) do | |
| wrapped_event = wrap_event(event) | |
| binary_event = :erlang.term_to_binary(wrapped_event) | |
| case :disk_log.log_terms(state.log_name, [binary_event]) do | |
| :ok -> | |
| {:reply, :ok, state} | |
| {:error, reason} -> | |
| Logger.error("Failed to write event: #{inspect(reason)}") | |
| {:reply, {:error, reason}, state} | |
| end | |
| end | |
| @impl true | |
| def handle_call(:read, _from, state) do | |
| case read_all_events(state.log_name) do | |
| {:ok, events} -> | |
| {:reply, {:ok, events}, state} | |
| {:error, reason} -> | |
| Logger.error("Failed to read events: #{inspect(reason)}") | |
| {:reply, {:error, reason}, state} | |
| end | |
| end | |
| @impl true | |
| def terminate(_reason, %{log_name: log_name} = state) when not is_nil(log_name) do | |
| with :ok <- :disk_log.sync(state.log_name), | |
| :ok <- :disk_log.close(state.log_name) do | |
| :ok | |
| else | |
| {:error, _reason} -> :ok | |
| end | |
| end | |
| def terminate(_reason, _state) do | |
| :ok | |
| end | |
| # Private Functions | |
| defp via_tuple(topic) do | |
| {:via, Registry, {EventStore.Registry, {__MODULE__, topic}}} | |
| end | |
| defp wrap_event(event) do | |
| case event do | |
| %{timestamp: _} -> event | |
| _ -> Map.put(event, :timestamp, System.system_time(:second)) | |
| end | |
| end | |
| defp read_all_events(log_name) do | |
| try do | |
| chunk_loop(log_name, :start, []) | |
| catch | |
| :exit, reason -> | |
| {:error, reason} | |
| end | |
| end | |
| defp chunk_loop(log_name, continuation, acc) do | |
| case :disk_log.chunk(log_name, continuation) do | |
| :eof -> | |
| {:ok, acc} | |
| {:error, :no_such_log} -> | |
| {:ok, []} | |
| {:error, reason} -> | |
| {:error, reason} | |
| {next_continuation, terms} when is_list(terms) -> | |
| chunk_loop(log_name, next_continuation, acc ++ process_terms(terms)) | |
| end | |
| end | |
| defp process_terms(terms) when is_list(terms) do | |
| Enum.map(terms, fn | |
| term when is_binary(term) -> :erlang.binary_to_term(term) | |
| term -> term | |
| end) | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment