Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Created July 24, 2025 15:42
Show Gist options
  • Save monotykamary/ab43e3af7f34c28b165db9a54dea21e3 to your computer and use it in GitHub Desktop.
Save monotykamary/ab43e3af7f34c28b165db9a54dea21e3 to your computer and use it in GitHub Desktop.
Event Store - Disk-based Event Storage System
defmodule EventStore do
@moduledoc """
- Resilient disk_log handling with automatic repair
- Batch writing for better performance
- Automatic log file maintenance
- Monitoring and metrics
- Quick recovery after crashes
"""
use GenServer
require Logger
@default_batch_size 1000
@default_batch_timeout 1000
# 1MB
@default_max_chunk_size 1024 * 1024
@default_max_files 4
@cleanup_interval :timer.hours(1)
defmodule State do
defstruct [
:topic,
:log_name,
:base_dir,
:batch_size,
:batch_timeout,
:batch_buffer,
:batch_timer,
:last_cleanup,
:metrics,
# Track the last assigned position
:last_position
]
end
defmodule Event do
@type t :: %__MODULE__{
position: non_neg_integer(),
timestamp: non_neg_integer(),
data: term()
}
defstruct [:position, :timestamp, :data]
end
defmodule Metrics do
defstruct writes: 0,
reads: 0,
batches: 0,
errors: 0,
repairs: 0,
last_write_time: nil,
last_read_time: nil
end
# Public API
def start_link(topic, opts \\ []) do
GenServer.start_link(__MODULE__, [topic, opts], name: via_tuple(topic))
end
def write_event(topic, event) do
GenServer.cast(via_tuple(topic), {:write, event})
end
def write_events(topic, events) when is_list(events) do
GenServer.cast(via_tuple(topic), {:write_batch, events})
end
def read_events(topic, opts \\ []) do
GenServer.call(via_tuple(topic), {:read, opts})
end
def get_metrics(topic) do
GenServer.call(via_tuple(topic), :get_metrics)
end
def get_latest_event(topic) do
GenServer.call(via_tuple(topic), :get_latest_event)
end
def read_last_events(topic, n) when is_integer(n) and n > 0 do
GenServer.call(via_tuple(topic), {:read_last, n})
end
def force_flush(topic) do
GenServer.call(via_tuple(topic), :flush)
end
# Server Implementation
@impl true
def init([topic, opts]) do
Process.flag(:trap_exit, true)
base_dir = Keyword.get(opts, :base_dir, "data/#{topic}")
batch_size = Keyword.get(opts, :batch_size, @default_batch_size)
batch_timeout = Keyword.get(opts, :batch_timeout, @default_batch_timeout)
File.mkdir_p!(base_dir)
log_name = String.to_atom("#{topic}_log")
log_file = Path.join(base_dir, "#{topic}.log")
# Initialize disk_log with repair capability
case initialize_disk_log(log_name, log_file) do
{:ok, _} ->
# Find the highest position efficiently by reading the last chunk
last_position =
case get_last_position(log_name) do
{:ok, pos} -> pos
_ -> -1
end
state = %State{
topic: topic,
log_name: log_name,
base_dir: base_dir,
batch_size: batch_size,
batch_timeout: batch_timeout,
batch_buffer: [],
metrics: %Metrics{},
last_cleanup: System.system_time(:second),
last_position: last_position
}
schedule_cleanup()
schedule_batch_timeout(state)
{:ok, state}
{:error, reason} ->
Logger.error("Failed to initialize disk_log: #{inspect(reason)}")
{:stop, reason}
end
end
@impl true
def handle_cast({:write, event}, state) do
{wrapped_event, new_state} = wrap_event(event, state)
new_buffer = new_state.batch_buffer ++ [wrapped_event]
if length(new_buffer) >= new_state.batch_size do
case write_batch(new_buffer, new_state) do
{:ok, newer_state} ->
{:noreply, newer_state}
{:error, reason, newer_state} ->
Logger.error("Failed to write batch: #{inspect(reason)}")
{:noreply, newer_state}
end
else
{:noreply, %{new_state | batch_buffer: new_buffer}}
end
end
@impl true
def handle_cast({:write_batch, events}, state) do
{wrapped_events, final_state} =
Enum.reduce(events, {[], state}, fn event, {acc_events, acc_state} ->
{wrapped, new_state} = wrap_event(event, acc_state)
{acc_events ++ [wrapped], new_state}
end)
case write_batch(wrapped_events, final_state) do
{:ok, new_state} ->
{:noreply, new_state}
{:error, reason, new_state} ->
Logger.error("Failed to write batch: #{inspect(reason)}")
{:noreply, new_state}
end
end
@impl true
def handle_call({:read, opts}, _from, state) do
start_time = System.monotonic_time()
case read_all_events(state.log_name, opts) do
{:ok, events} ->
metrics = update_read_metrics(state.metrics, length(events), start_time)
{:reply, {:ok, events}, %{state | metrics: metrics}}
{:error, reason} ->
metrics = %{state.metrics | errors: state.metrics.errors + 1}
{:reply, {:error, reason}, %{state | metrics: metrics}}
end
end
@impl true
def handle_call({:read_last, n}, _from, state) do
start_time = System.monotonic_time()
case read_last_n_events(state.log_name, n) do
{:ok, events} ->
metrics = update_read_metrics(state.metrics, length(events), start_time)
{:reply, {:ok, events}, %{state | metrics: metrics}}
{:error, reason} ->
metrics = %{state.metrics | errors: state.metrics.errors + 1}
{:reply, {:error, reason}, %{state | metrics: metrics}}
end
end
@impl true
def handle_call(:get_metrics, _from, state) do
{:reply, state.metrics, state}
end
@impl true
def handle_call(:get_latest_event, _from, state) do
with {:ok, pos} <- get_position(state.last_position),
{:ok, events} <- read_all_events(state.log_name, from_position: pos, chunk_size: 1),
{:ok, event} <- get_single_event(events) do
{:reply, {:ok, event}, state}
else
{:error, reason} -> {:reply, {:error, reason}, state}
:no_event -> {:reply, {:ok, nil}, state}
end
end
@impl true
def handle_call(:flush, _from, state) do
case write_batch(state.batch_buffer, state) do
{:ok, new_state} -> {:reply, :ok, new_state}
{:error, reason, new_state} -> {:reply, {:error, reason}, new_state}
end
end
@impl true
def handle_info(:batch_timeout, %{batch_buffer: []} = state) do
schedule_batch_timeout(state)
{:noreply, state}
end
@impl true
def handle_info(:batch_timeout, %{batch_buffer: buffer, batch_timeout: timeout} = state) do
schedule_batch_timeout(state)
with {:ok, new_state} <- write_batch(buffer, state),
:ok <- :disk_log.sync(state.log_name),
:ok <- Process.sleep(div(timeout, 10)),
:ok <- :disk_log.sync(state.log_name) do
{:noreply, new_state}
else
{:error, reason, new_state} ->
Logger.error("Failed to write batch on timeout: #{inspect(reason)}")
{:noreply, new_state}
{:error, reason} ->
Logger.error("Failed to sync disk log: #{inspect(reason)}")
{:noreply, state}
end
end
@impl true
def handle_info(:cleanup, state) do
schedule_cleanup()
{:noreply, perform_cleanup(state)}
end
@impl true
def terminate(_reason, %{batch_buffer: buffer, log_name: log_name} = state) do
if length(buffer) > 0 do
case write_batch(buffer, state) do
{:ok, _} ->
:ok
{:error, reason, _} ->
Logger.error("Failed to flush events on termination: #{inspect(reason)}")
end
end
if log_name do
with :ok <- :disk_log.sync(log_name),
:ok <- :disk_log.close(log_name) do
:ok
else
{:error, reason} ->
Logger.error("Failed to sync/close disk log: #{inspect(reason)}")
:ok
end
end
:ok
end
# Private Functions
defp via_tuple(topic) do
{:via, Registry, {EventStore.Registry, {__MODULE__, topic}}}
end
defp initialize_disk_log(log_name, log_file) do
case :disk_log.open(
name: log_name,
file: String.to_charlist(log_file),
type: :wrap,
size: {@default_max_chunk_size, @default_max_files},
mode: :read_write,
repair: true
) do
{:ok, _} = result ->
result
{:repaired, log_name, {:recovered, _}, {:badbytes, _}} ->
Logger.warning("Repaired corrupted log file")
{:ok, log_name}
{:error, :no_such_log} ->
initialize_disk_log(log_name, log_file)
{:error, reason} ->
Logger.error("Failed to open disk_log: #{inspect(reason)}")
{:error, reason}
end
end
defp write_batch(events, state) when is_list(events) and events != [] do
start_time = System.monotonic_time()
with :ok <- :disk_log.log_terms(state.log_name, events),
:ok <- :disk_log.sync(state.log_name),
:ok <- Process.sleep(div(state.batch_timeout, 20)),
:ok <- :disk_log.sync(state.log_name) do
metrics = update_write_metrics(state.metrics, length(events), start_time)
{:ok, %{state | batch_buffer: [], metrics: metrics}}
else
{:error, reason} ->
metrics = %{state.metrics | errors: state.metrics.errors + 1}
Logger.error("Failed to sync disk log: #{inspect(reason)}")
{:error, reason, %{state | metrics: metrics}}
end
end
defp write_batch([], state), do: {:ok, state}
defp wrap_event(event, state) do
next_position = (state.last_position || -1) + 1
timestamp =
case event do
%{timestamp: ts} -> ts
_ -> System.system_time(:second)
end
data =
case event do
%Event{} -> event.data
%{data: data} -> data
_ -> %{value: event}
end
{%Event{
position: next_position,
timestamp: timestamp,
data: data
}, %{state | last_position: next_position}}
end
defp read_all_events(log_name, opts) do
try do
chunk_size = Keyword.get(opts, :chunk_size, 100)
# Use position instead of offset
from_position = Keyword.get(opts, :from_position, 0)
timestamp = Keyword.get(opts, :timestamp)
case read_chunk(log_name, :start, [], timestamp, :forward) do
{:ok, events} ->
all_events =
events
|> process_chunks()
# Ensure chronological order (oldest first)
|> Enum.reverse()
filtered_events =
all_events
|> filter_events(timestamp, from_position, chunk_size)
{:ok, filtered_events}
error ->
error
end
catch
:exit, reason ->
Logger.error("Error reading events: #{inspect(reason)}")
{:error, reason}
end
end
defp read_last_n_events(log_name, n) do
try do
# Start from the end of the log
case read_backwards(log_name, n) do
{:ok, events} -> {:ok, events}
error -> error
end
catch
:exit, reason ->
Logger.error("Error reading events: #{inspect(reason)}")
{:error, reason}
end
end
# Efficiently get the last position by reading only the last chunk
defp get_last_position(log_name) do
with info when is_list(info) <- :disk_log.info(log_name),
current_file =
Enum.find_value(info, 0, fn
{:no_current_file, num} -> num
_ -> false
end),
{:ok, chunk_data} when is_list(chunk_data) and length(chunk_data) > 0 <-
read_last_chunk(log_name, current_file),
last_event = List.last(chunk_data) do
{:ok, last_event.position}
else
{:error, reason} -> {:error, reason}
_ -> {:ok, -1}
end
end
# Read the last chunk from the specified file
defp read_last_chunk(log_name, file_num) when file_num >= 0 do
try do
case :disk_log.chunk(log_name, :start) do
:eof ->
{:ok, []}
{:error, reason} ->
{:error, reason}
{_cont, terms} ->
{:ok, List.wrap(terms)}
{_cont, _chunk_num, terms} ->
{:ok, List.wrap(terms)}
end
catch
_kind, _reason ->
{:ok, []}
end
end
defp read_last_chunk(_log_name, _file_num), do: {:ok, []}
defp read_backwards(log_name, n) do
try do
with {:ok, last_pos} <- get_last_position(log_name),
false <- last_pos == -1,
start_pos = max(0, last_pos - n + 1),
{:ok, events} <- read_all_events(log_name, from_position: start_pos) do
{:ok, Enum.take(events, -n)}
else
# handles last_pos == -1
true -> {:ok, []}
# handles other errors
error -> error
end
catch
_kind, _reason ->
{:ok, []}
end
end
defp filter_events(events, timestamp, from_position, chunk_size) do
events
|> filter_by_timestamp(timestamp)
|> filter_by_position(from_position)
|> Enum.take(chunk_size)
end
defp filter_by_timestamp(events, nil), do: events
defp filter_by_timestamp(events, timestamp) when is_integer(timestamp) do
Enum.filter(events, fn %Event{timestamp: ts} -> ts >= timestamp end)
end
defp filter_by_position(events, 0), do: events
defp filter_by_position(events, from_position) when is_integer(from_position) do
Enum.filter(events, fn %Event{position: pos} -> pos >= from_position end)
end
# Handle the end of chunk reading by checking the continuation value
defp maybe_append_chunk(terms, acc) when is_list(terms), do: [terms | acc]
defp maybe_append_chunk(_, acc), do: acc
defp read_chunk(log_name, continuation, acc, timestamp, direction) do
case :disk_log.chunk(log_name, continuation) do
:eof ->
{:ok, process_chunks(acc)}
{:error, :no_such_log} ->
{:ok, []}
{:error, reason} ->
{:error, reason}
{cont, terms} ->
read_chunk(log_name, cont, maybe_append_chunk(terms, acc), timestamp, direction)
{cont, _chunk_num, terms} ->
read_chunk(log_name, cont, maybe_append_chunk(terms, acc), timestamp, direction)
end
end
defp process_chunks(chunks) do
chunks
# Reverse the chunks to get them in order
|> Enum.reverse()
# Flatten into a single list
|> List.flatten()
end
defp schedule_batch_timeout(%{batch_timeout: timeout} = _state) do
Process.send_after(self(), :batch_timeout, timeout)
end
defp schedule_cleanup do
Process.send_after(self(), :cleanup, @cleanup_interval)
end
defp perform_cleanup(%{log_name: log_name} = state) do
with :ok <- :disk_log.sync(log_name),
{:ok, info} when is_list(info) <- {:ok, :disk_log.info(log_name)},
bytes <-
Enum.find_value(info, 0, fn
{:no_current_bytes, size} -> size
_ -> false
end),
:ok <- maybe_truncate(log_name, bytes) do
%{state | last_cleanup: System.system_time(:second)}
else
{:error, reason} ->
Logger.error("Failed to sync/cleanup log: #{inspect(reason)}")
%{state | last_cleanup: System.system_time(:second)}
error ->
Logger.error("Error during cleanup: #{inspect(error)}")
%{state | last_cleanup: System.system_time(:second)}
end
end
defp maybe_truncate(log_name, bytes)
when bytes > @default_max_chunk_size * @default_max_files do
Logger.info("Truncating log file due to size: #{bytes} bytes")
case :disk_log.truncate(log_name) do
:ok ->
:ok
{:error, reason} ->
Logger.error("Failed to truncate log: #{inspect(reason)}")
{:error, reason}
end
end
defp maybe_truncate(_log_name, _bytes), do: :ok
defp update_write_metrics(metrics, count, start_time) do
%{
metrics
| writes: metrics.writes + count,
batches: metrics.batches + 1,
last_write_time: System.monotonic_time() - start_time
}
end
defp update_read_metrics(metrics, count, start_time) do
%{
metrics
| reads: metrics.reads + count,
last_read_time: System.monotonic_time() - start_time
}
end
defp get_position(nil), do: :no_event
defp get_position(-1), do: :no_event
defp get_position(pos), do: {:ok, pos}
defp get_single_event([event]), do: {:ok, event}
defp get_single_event([]), do: :no_event
end
defmodule EventStoreBenchmark do
use ExUnit.Case
require Logger
@tag timeout: :infinity
@test_topic "benchmark_topic"
@message_count 10_000
@cell_width 16 # Increased from 11 to accommodate percentages
@inner_padding 3 # Padding inside cells for better readability
# Configure logger to hide account manager logs
Logger.configure_backend(:console, level: :error)
setup do
test_dir = "test/tmp/#{@test_topic}_#{System.unique_integer()}"
File.mkdir_p!(test_dir)
# Ensure test directory is clean
File.rm_rf!(test_dir)
File.mkdir_p!(test_dir)
# Start the Registry if not already started
Registry.start_link(keys: :unique, name: EventStore.Registry, partitions: 1)
on_exit(fn ->
try do
File.rm_rf!(test_dir)
catch
_kind, _reason -> :ok
end
end)
%{test_dir: test_dir}
end
def generate_message(size) do
# Generate a random string of specified size
1..size
|> Enum.map(fn _ -> Enum.random(?a..?z) end)
|> List.to_string()
end
def format_bytes(bytes) when bytes >= 1024 * 1024 do
mb = bytes / (1024 * 1024)
"#{Float.round(mb, 2)} MB (#{bytes} bytes)"
end
def format_bytes(bytes) when bytes >= 1024 do
kb = bytes / 1024
"#{Float.round(kb, 2)} KB (#{bytes} bytes)"
end
def format_bytes(bytes), do: "#{bytes} bytes"
def analyze_memory_allocation do
IO.puts("\n--- Memory Carrier Configuration ---")
# Get allocator settings
alloc_info = :erlang.system_info(:allocator)
# Get temp_alloc settings
temp_alloc = Enum.find(elem(alloc_info, 3), fn {type, _} -> type == :temp_alloc end)
{_, settings} = temp_alloc
# Extract and format carrier configurations
mmbcs = format_bytes(settings[:mmbcs])
lmbcs = format_bytes(settings[:lmbcs])
sbct = format_bytes(settings[:sbct])
smbcs = format_bytes(settings[:smbcs])
# Print the carrier configurations
IO.puts("mmbcs: #{mmbcs} # Minimum multi-block carrier size")
IO.puts("lmbcs: #{lmbcs} # Maximum multi-block carrier size")
IO.puts("sbct: #{sbct} # Single block carrier threshold")
IO.puts("smbcs: #{smbcs} # Small multi-block carrier size")
end
def measure_write_performance(test_dir, message_size) do
{:ok, pid} = EventStore.start_link(@test_topic, base_dir: test_dir)
# Generate messages
messages = for _ <- 1..@message_count do
%{data: %{value: generate_message(message_size)}}
end
# Measure batch write time with natural timeouts
{batch_write_micros, :ok} = :timer.tc(fn ->
:ok = EventStore.write_events(@test_topic, messages)
end)
# Clean up and start fresh for force-flush measurement
GenServer.stop(pid)
{:ok, pid2} = EventStore.start_link(@test_topic, base_dir: test_dir)
# Measure force-flush write time
{force_flush_micros, :ok} = :timer.tc(fn ->
:ok = EventStore.write_events(@test_topic, messages)
:ok = EventStore.force_flush(@test_topic)
end)
# Calculate metrics
batch_write_ms = batch_write_micros / 1000
force_flush_ms = force_flush_micros / 1000
# Calculate throughput metrics
batch_msgs_per_sec = @message_count / (batch_write_ms / 1000)
force_flush_msgs_per_sec = @message_count / (force_flush_ms / 1000)
batch_mb_per_sec = (message_size * @message_count) / (batch_write_ms * 1000)
force_flush_mb_per_sec = (message_size * @message_count) / (force_flush_ms * 1000)
# Cleanup
GenServer.stop(pid2)
{
batch_write_ms, # Time for batch writes
force_flush_ms, # Time including force flush
batch_msgs_per_sec, # Messages/sec with batch
force_flush_msgs_per_sec, # Messages/sec with force flush
batch_mb_per_sec, # MB/sec with batch
force_flush_mb_per_sec # MB/sec with force flush
}
end
def measure_read_performance(test_dir, message_size) do
{:ok, pid} = EventStore.start_link(@test_topic, base_dir: test_dir)
# Write test data first
messages = for _ <- 1..@message_count do
%{data: %{value: generate_message(message_size)}}
end
:ok = EventStore.write_events(@test_topic, messages)
:ok = EventStore.force_flush(@test_topic)
# Measure total read time
{elapsed_micros, {:ok, _events}} = :timer.tc(fn ->
EventStore.read_events(@test_topic)
end)
# Calculate metrics
elapsed_ms = elapsed_micros / 1000
messages_per_second = @message_count / (elapsed_ms / 1000)
throughput_mb_per_sec = (message_size * @message_count) / (elapsed_ms * 1000)
# Cleanup
GenServer.stop(pid)
{elapsed_ms, messages_per_second, throughput_mb_per_sec}
end
def measure_restart_time(test_dir) do
# First, write messages
{:ok, pid} = EventStore.start_link(@test_topic, base_dir: test_dir)
messages = for i <- 1..@message_count do
%{data: %{value: "message_#{i}"}}
end
:ok = EventStore.write_events(@test_topic, messages)
:ok = EventStore.force_flush(@test_topic)
GenServer.stop(pid)
# Measure time before disk log init
{pre_init_micros, _} = :timer.tc(fn ->
_log_name = String.to_atom("#{@test_topic}_log")
_log_file = Path.join(test_dir, "#{@test_topic}.log")
File.mkdir_p!(test_dir)
end)
# Now measure total restart time
{total_restart_micros, {:ok, _pid}} = :timer.tc(fn ->
EventStore.start_link(@test_topic, base_dir: test_dir)
end)
{pre_init_micros, total_restart_micros}
end
def format_number(number, :messages) when is_number(number) do
number
|> round()
|> Integer.to_string()
|> add_thousand_separators()
end
def format_number(number) when is_integer(number), do: format_number(number / 1)
def format_number(number) when is_float(number) do
Float.round(number, 2) # Round to 2 decimal places
|> :erlang.float_to_binary([decimals: 2]) # Always show 2 decimal places
|> add_thousand_separators()
end
def add_thousand_separators(number_string) do
# Handle the decimal part if it exists
[int_part, decimal_part] = String.split(number_string, ".")
formatted_int = int_part
|> String.reverse()
|> String.replace(~r/(\d{3})(?=\d)/, "\\1,")
|> String.reverse()
case decimal_part do
nil -> formatted_int
"" -> formatted_int
_ -> "#{formatted_int}.#{decimal_part}"
end
rescue
# If there's no decimal point, just format the whole string
_ -> number_string
|> String.reverse()
|> String.replace(~r/(\d{3})(?=\d)/, "\\1,")
|> String.reverse()
end
def align_text(text, width, :right) do
text_length = String.length(text)
padding = width - text_length - @inner_padding
String.duplicate(" ", padding) <> text <> String.duplicate(" ", @inner_padding)
end
def align_text(text, width, :left) do
text_length = String.length(text)
String.duplicate(" ", @inner_padding) <> text <> String.duplicate(" ", width - text_length - @inner_padding)
end
def format_cell(value, width) when is_number(value) do
formatted = if is_integer(value) do
"#{value}.00" # Ensure two decimal places for integers
else
format_number(value)
end
align_text(formatted, width, :right)
end
def format_cell(value, width) when is_binary(value) do
cond do
# Handle percentage values (e.g., "1,000.00 (33.00%)")
String.match?(value, ~r/^[\d,]+\.?\d*\s\(\d+\.?\d*%\)$/) ->
align_text(value, width, :right)
# Handle microsecond latency values - convert to integer and right align
String.match?(value, ~r/^[\d,]+\.?\d*μs$/) ->
{num, _} = Float.parse(String.replace(value, ["μs", ","], ""))
formatted = "#{trunc(num)}μs"
align_text(formatted, width, :right)
# Handle other time values (ms)
String.match?(value, ~r/^[\d,]+\.?\d*ms$/) ->
align_text(value, width, :right)
# Handle numeric strings with thousand separators
String.match?(value, ~r/^[\d,]+\.?\d*$/) ->
align_text(value, width, :right)
String.match?(value, ~r/^\d+b$/) ->
align_text(value, width, :right)
true ->
align_text(value, width, :left)
end
end
def print_table_header(headers) do
col_width = @cell_width + @inner_padding * 2
total_width = (col_width + 1) * length(headers) + 1
# Print top border
border = String.duplicate("-", total_width - 2)
IO.puts("+" <> border <> "+")
# Print headers with left alignment and proper spacing
header_line = headers
|> Enum.map(&align_text(&1, col_width, :left))
|> Enum.join("|")
IO.puts("|" <> header_line <> "|")
# Print separator
IO.puts("+" <> border <> "+")
end
def print_table_row(values) do
col_width = @cell_width + @inner_padding * 2
row = values
|> Enum.map(&format_cell(&1, col_width))
|> Enum.join("|")
IO.puts("|" <> row <> "|")
end
def print_table_footer(col_count) do
total_width = (@cell_width + @inner_padding * 2 + 1) * col_count + 1
IO.puts("+" <> String.duplicate("-", total_width - 2) <> "+")
end
test "benchmark write performance and restarts", %{test_dir: test_dir} do
# Before the benchmark
IO.puts("\nPre-Benchmark Memory Analysis:")
analyze_memory_allocation()
message_sizes = [128, 512, 1024]
# Write Performance Table
IO.puts("\nWrite Performance:")
headers = ["Size", "Batch Time", "Force Time", "Raw M/s", "Force M/s", "Raw MB/s", "Force MB/s"]
print_table_header(headers)
for size <- message_sizes do
{batch_write_ms, force_flush_ms, batch_mps, force_mps, batch_mb, force_mb} =
measure_write_performance(test_dir, size)
values = [
"#{size}b",
"#{format_number(batch_write_ms)}ms",
"#{format_number(force_flush_ms)}ms",
format_number(batch_mps, :messages),
format_number(force_mps, :messages),
format_number(batch_mb),
format_number(force_mb)
]
print_table_row(values)
end
print_table_footer(length(headers))
# Messages per Batch Timeout Table
IO.puts("\nMessages/s per Batch Timeout:")
batch_timeouts = [50, 100, 500, 1000]
headers = ["Size", "Force Time", "Raw M/s" | Enum.map(batch_timeouts, &"#{&1}ms")]
print_table_header(headers)
for size <- message_sizes do
{_batch_write_ms, force_flush_ms, batch_mps, _force_mps, _batch_mb, _force_mb} =
measure_write_performance(test_dir, size)
msgs_per_timeout = Enum.map(batch_timeouts, fn timeout ->
# Convert force flush time to seconds
force_flush_seconds = force_flush_ms / 1000
cycle_time_seconds = (timeout + force_flush_ms) / 1000
# Calculate number of complete cycles in 1 second
cycles_per_second = 1 / cycle_time_seconds
# Total time spent force flushing in 1 second
total_force_flush_time = cycles_per_second * force_flush_seconds
# Remaining time for actual message processing
processing_time = 1 - total_force_flush_time
# Calculate actual messages per second considering force flush overhead
actual_msgs_per_second = batch_mps * processing_time
percentage = (actual_msgs_per_second / batch_mps) * 100
"#{format_number(actual_msgs_per_second, :messages)} (#{format_number(percentage, :messages)}%)"
end)
print_table_row([
"#{size}b",
"#{format_number(force_flush_ms)}ms",
format_number(batch_mps, :messages) # Format batch M/s as whole number
| msgs_per_timeout
])
end
print_table_footer(length(headers))
# Read Performance Table
IO.puts("\nRead Performance:")
headers = ["Size", "Time", "Msgs/s", "MB/s"]
print_table_header(headers)
for size <- message_sizes do
{read_elapsed_ms, read_mps, read_throughput} =
measure_read_performance(test_dir, size)
values = [
"#{size}b",
"#{format_number(read_elapsed_ms)}ms",
format_number(read_mps, :messages),
format_number(read_throughput)
]
print_table_row(values)
end
print_table_footer(length(headers))
# Restart Performance Table
IO.puts("\nRestart Performance:")
headers = ["Operation", "Time"]
print_table_header(headers)
{pre_init_micros, total_restart_micros} = measure_restart_time(test_dir)
print_table_row(["Pre-Init ", "#{trunc(pre_init_micros)}μs"])
print_table_row(["Init", "#{trunc(total_restart_micros - pre_init_micros)}μs"])
print_table_row(["Total", "#{trunc(total_restart_micros)}μs"])
print_table_footer(length(headers))
end
end
defmodule EventStoreTest do
use ExUnit.Case
require Logger
@test_topic "test_topic"
# Keep small for most tests
@batch_size 10
@batch_timeout 100
# Use for high frequency test
@large_batch_size 1000
setup do
# Start the application
Application.ensure_all_started(:event_store)
test_dir = "test/tmp/#{@test_topic}_#{System.unique_integer()}"
File.mkdir_p!(test_dir)
# Ensure test directory is clean
File.rm_rf!(test_dir)
File.mkdir_p!(test_dir)
# Start EventStore with test configuration
{:ok, pid} =
EventStore.start_link(@test_topic,
base_dir: test_dir,
batch_size: @batch_size,
batch_timeout: @batch_timeout
)
on_exit(fn ->
try do
# First stop the GenServer process
if Process.alive?(pid) do
GenServer.stop(pid)
# Give time for cleanup
Process.sleep(100)
end
catch
_kind, _reason -> :ok
end
# Stop any other event store processes that might be using the test directory
case Registry.lookup(EventStore.Registry, {EventStore, "empty_topic"}) do
[{pid, _}] ->
try do
GenServer.stop(pid)
Process.sleep(100)
catch
_kind, _reason -> :ok
end
_ ->
:ok
end
# Give extra time for disk logs to close
Process.sleep(200)
# Finally remove the test directory
try do
File.rm_rf!(test_dir)
catch
_kind, _reason -> :ok
end
end)
%{pid: pid, test_dir: test_dir}
end
describe "basic operations" do
test "reads last n events" do
events =
for i <- 1..20 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# Read last 5 events
assert {:ok, last_events} = EventStore.read_last_events(@test_topic, 5)
assert length(last_events) == 5
assert Enum.map(last_events, & &1.data.value) == [16, 17, 18, 19, 20]
# Read more events than exist
assert {:ok, all_events} = EventStore.read_last_events(@test_topic, 30)
assert length(all_events) == 20
assert Enum.map(all_events, & &1.data.value) == Enum.to_list(1..20)
# Read from empty log
test_dir = "test/tmp/empty_#{System.unique_integer()}"
File.mkdir_p!(test_dir)
{:ok, pid} = EventStore.start_link("empty_topic", base_dir: test_dir)
assert {:ok, []} = EventStore.read_last_events("empty_topic", 5)
# Clean up the process and wait for it to stop
GenServer.stop(pid)
# Give time for cleanup
Process.sleep(100)
# Now safe to remove the directory
File.rm_rf!(test_dir)
end
test "gets latest event when no events exist" do
assert {:ok, nil} = EventStore.get_latest_event(@test_topic)
end
test "gets latest event" do
events =
for i <- 1..5 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, latest} = EventStore.get_latest_event(@test_topic)
assert latest.data.value == 5
# 0-based indexing
assert latest.position == 4
end
test "gets latest event after multiple writes" do
# Write first batch
events1 =
for i <- 1..3 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events1)
assert :ok = EventStore.force_flush(@test_topic)
# Write second batch
events2 =
for i <- 4..5 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events2)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, latest} = EventStore.get_latest_event(@test_topic)
assert latest.data.value == 5
# 0-based indexing
assert latest.position == 4
end
test "reads events from position" do
events =
for i <- 1..20 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# Read from position 5
assert {:ok, read_events} = EventStore.read_events(@test_topic, from_position: 5)
assert length(read_events) == 15
assert Enum.at(read_events, 0).data.value == 6
assert Enum.at(read_events, 0).position == 5
end
test "reads events from timestamp" do
# Write first batch with explicit timestamps
base_time = System.system_time(:second)
events1 =
for i <- 1..5 do
%{data: %{value: i}, timestamp: base_time}
end
assert :ok = EventStore.write_events(@test_topic, events1)
assert :ok = EventStore.force_flush(@test_topic)
# Set timestamp for filtering
filter_time = base_time + 1
# Write second batch with later timestamps
events2 =
for i <- 6..10 do
%{data: %{value: i}, timestamp: base_time + 2}
end
assert :ok = EventStore.write_events(@test_topic, events2)
assert :ok = EventStore.force_flush(@test_topic)
# Read from timestamp
assert {:ok, read_events} = EventStore.read_events(@test_topic, timestamp: filter_time)
assert length(read_events) == 5
assert Enum.all?(read_events, fn event -> event.timestamp >= filter_time end)
assert Enum.all?(read_events, fn event -> event.data.value >= 6 end)
end
test "combines position and timestamp filters" do
# Write first batch with explicit timestamps
base_time = System.system_time(:second)
events1 =
for i <- 1..10 do
%{data: %{value: i}, timestamp: base_time}
end
assert :ok = EventStore.write_events(@test_topic, events1)
assert :ok = EventStore.force_flush(@test_topic)
# Set timestamp for filtering
filter_time = base_time + 1
# Write second batch with later timestamps
events2 =
for i <- 11..20 do
%{data: %{value: i}, timestamp: base_time + 2}
end
assert :ok = EventStore.write_events(@test_topic, events2)
assert :ok = EventStore.force_flush(@test_topic)
# Read with both position and timestamp
# Should get events 16-20 (5 events after position 15 from the second batch)
assert {:ok, read_events} =
EventStore.read_events(@test_topic, from_position: 15, timestamp: filter_time)
assert length(read_events) == 5
assert Enum.all?(read_events, fn event ->
event.timestamp >= filter_time && event.data.value >= 16 && event.position >= 15
end)
end
test "maintains sequential positions" do
events =
for i <- 1..10 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, read_events} = EventStore.read_events(@test_topic)
positions = Enum.map(read_events, & &1.position)
# Positions should be sequential
assert positions == Enum.to_list(0..9)
end
test "writes and reads a single event" do
event = %{data: %{value: 42}}
assert :ok = EventStore.write_event(@test_topic, event)
# Force flush to ensure event is written
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, [read_event]} = EventStore.read_events(@test_topic)
assert read_event.data.value == event.data.value
assert is_integer(read_event.position)
assert is_integer(read_event.timestamp)
end
test "handles events with existing timestamps" do
timestamp = System.system_time(:second)
event = %{data: %{value: 42}, timestamp: timestamp}
assert :ok = EventStore.write_event(@test_topic, event)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, [read_event]} = EventStore.read_events(@test_topic)
assert read_event.timestamp == timestamp
assert read_event.data.value == event.data.value
assert is_integer(read_event.position)
end
end
describe "data integrity" do
test "maintains event order when reading with different chunk sizes" do
events =
for i <- 1..50 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# First get a baseline reading with standard chunk size
{:ok, reference_events} = EventStore.read_events(@test_topic)
# Verify baseline reading has all events in order
assert length(reference_events) == 50, "Should have read all 50 events"
assert Enum.map(reference_events, & &1.data.value) == Enum.to_list(1..50)
# Test reading with smaller chunk sizes
chunk_sizes = [10, 20, 30]
for chunk_size <- chunk_sizes do
{:ok, chunked_events} = EventStore.read_events(@test_topic, chunk_size: chunk_size)
assert length(chunked_events) == chunk_size,
"Reading with chunk_size #{chunk_size} should return #{chunk_size} events"
# Verify the chunked events match the corresponding subset of reference events
expected_events = Enum.take(reference_events, chunk_size)
assert chunked_events == expected_events,
"Events from chunk_size #{chunk_size} don't match reference events"
end
end
test "maintains event order when reading from multiple files" do
# Write events in multiple batches to ensure multiple files
events1 =
for i <- 1..50 do
%{data: %{value: i}, timestamp: System.system_time(:second)}
end
events2 =
for i <- 51..100 do
%{data: %{value: i}, timestamp: System.system_time(:second)}
end
assert :ok = EventStore.write_events(@test_topic, events1)
assert :ok = EventStore.force_flush(@test_topic)
# Brief pause to ensure different timestamps
Process.sleep(10)
assert :ok = EventStore.write_events(@test_topic, events2)
assert :ok = EventStore.force_flush(@test_topic)
# Read all events and verify order
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) == 100
# Verify values are in correct sequential order
values = Enum.map(read_events, & &1.data.value)
assert values == Enum.to_list(1..100)
# Verify positions are sequential
positions = Enum.map(read_events, & &1.position)
# 0-based positions
assert positions == Enum.to_list(0..99)
end
test "ensures no duplicate positions are assigned" do
# Write initial batch of events
events1 =
for i <- 1..50 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events1)
assert :ok = EventStore.force_flush(@test_topic)
# Write another batch with overlapping logical sequence
events2 =
for i <- 40..60 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events2)
assert :ok = EventStore.force_flush(@test_topic)
# Read all events
assert {:ok, read_events} = EventStore.read_events(@test_topic)
# Extract positions and check for duplicates
positions = Enum.map(read_events, & &1.position)
unique_positions = Enum.uniq(positions)
assert length(positions) == length(unique_positions), "Duplicate positions found"
# Verify positions are sequential
assert positions == Enum.to_list(0..(length(positions) - 1))
end
test "maintains data consistency after unexpected process termination", %{test_dir: test_dir} do
# Start a DynamicSupervisor for the test
{:ok, sup_pid} = DynamicSupervisor.start_link(strategy: :one_for_one)
# Enable error trapping in the test process
Process.flag(:trap_exit, true)
# Write initial batch
events1 =
for i <- 1..30 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events1)
assert :ok = EventStore.force_flush(@test_topic)
# Ensure data is written
Process.sleep(100)
# Get the current process and verify it exists
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
assert Process.alive?(pid)
# Verify initial data
assert {:ok, initial_events} = EventStore.read_events(@test_topic)
assert length(initial_events) == 30
# Monitor both the process and the registry
process_ref = Process.monitor(pid)
registry_ref = Process.monitor(Process.whereis(EventStore.Registry))
# Terminate the process with shutdown signal
Process.exit(pid, :shutdown)
# Wait for process to die and registry to clean up
assert_receive {:DOWN, ^process_ref, :process, ^pid, _}, 1000
# Wait for cleanup
Process.sleep(200)
# Ensure the process is really dead
refute Process.alive?(pid)
# Wait for any registry messages
receive do
{:DOWN, ^registry_ref, :process, _, _} -> :ok
after
0 -> :ok
end
# Clean up registry and wait for it to restart
Process.sleep(200)
# Write more events after restart
events2 =
for i <- 31..60 do
%{data: %{value: i}}
end
# Start new process under dynamic supervisor
child_spec = %{
id: EventStore,
start:
{EventStore, :start_link,
[
@test_topic,
[
base_dir: test_dir,
batch_size: @batch_size,
batch_timeout: @batch_timeout
]
]},
restart: :temporary
}
# Start the process under supervision with retry
{:ok, new_pid} = retry_start_supervised(sup_pid, child_spec)
# Verify new process is alive
assert Process.alive?(new_pid)
# Ensure process is fully started
Process.sleep(200)
# Write events in smaller batches with error handling
Enum.chunk_every(events2, 10)
|> Enum.each(fn chunk ->
retry_operation(fn ->
:ok = EventStore.write_events(@test_topic, chunk)
:ok = EventStore.force_flush(@test_topic)
# Brief pause between chunks
Process.sleep(50)
end)
end)
# Final pause to ensure all writes are complete
Process.sleep(100)
# Verify all data is consistent with retry logic
{:ok, read_events} =
retry_operation(fn ->
EventStore.read_events(@test_topic)
end)
assert length(read_events) == 60
values = Enum.map(read_events, & &1.data.value)
assert values == Enum.to_list(1..60)
positions = Enum.map(read_events, & &1.position)
assert positions == Enum.to_list(0..59)
# Clean up supervisor
DynamicSupervisor.stop(sup_pid, :normal)
end
# Helper function to retry starting a supervised process
defp retry_start_supervised(sup_pid, child_spec, attempts \\ 3) do
case DynamicSupervisor.start_child(sup_pid, child_spec) do
{:ok, pid} ->
{:ok, pid}
{:error, _} when attempts > 1 ->
Process.sleep(100)
retry_start_supervised(sup_pid, child_spec, attempts - 1)
error ->
error
end
end
# Helper function to retry operations with exponential backoff
defp retry_operation(fun, attempts \\ 3, delay \\ 100) do
try do
fun.()
rescue
e ->
if attempts > 1 do
Process.sleep(delay)
retry_operation(fun, attempts - 1, delay * 2)
else
reraise e, __STACKTRACE__
end
catch
:exit, _ when attempts > 1 ->
Process.sleep(delay)
retry_operation(fun, attempts - 1, delay * 2)
kind, reason ->
:erlang.raise(kind, reason, __STACKTRACE__)
end
end
test "maintains correct order with same timestamp events" do
timestamp = System.system_time(:second)
# Create multiple events with the same timestamp
events =
for i <- 1..20 do
%{data: %{value: i}, timestamp: timestamp}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# Read events back
assert {:ok, read_events} = EventStore.read_events(@test_topic)
# Verify all events have the same timestamp
timestamps = Enum.map(read_events, & &1.timestamp)
assert Enum.all?(timestamps, &(&1 == timestamp))
# Verify events maintained their order despite same timestamp
values = Enum.map(read_events, & &1.data.value)
assert values == Enum.to_list(1..20)
# Verify positions are sequential
positions = Enum.map(read_events, & &1.position)
assert positions == Enum.to_list(0..19)
end
end
describe "batch operations" do
test "writes and reads multiple events in batch" do
events =
for i <- 1..20 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) == length(events)
Enum.zip(events, read_events)
|> Enum.each(fn {original, read} ->
assert read.data.value == original.data.value
assert is_integer(read.position)
assert is_integer(read.timestamp)
end)
# Verify positions are sequential
positions = Enum.map(read_events, & &1.position)
assert positions == Enum.to_list(0..19)
end
test "automatically flushes batch when size limit reached" do
events =
for i <- 1..(@batch_size + 1) do
%{data: %{value: i}}
end
# Write events one by one
Enum.each(events, fn event ->
EventStore.write_event(@test_topic, event)
end)
# Wait briefly to ensure batch processing
Process.sleep(50)
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) >= @batch_size
end
test "automatically flushes batch after timeout" do
event = %{data: %{value: 1}}
EventStore.write_event(@test_topic, event)
# Wait for batch timeout
Process.sleep(@batch_timeout + 50)
assert {:ok, [read_event]} = EventStore.read_events(@test_topic)
assert read_event.data == event.data
end
end
describe "resilience" do
test "survives process restarts", %{test_dir: test_dir} do
events =
for i <- 1..5 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# Get the current process
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
ref = Process.monitor(pid)
# Stop the process normally
GenServer.stop(pid, :normal)
# Wait for process to stop
assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1000
# Restart the process
{:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir)
# Verify data persisted
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) == length(events)
end
test "maintains position sequence across restarts", %{test_dir: test_dir} do
# Write initial events
events1 =
for i <- 1..5 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events1)
assert :ok = EventStore.force_flush(@test_topic)
# Verify initial positions
assert {:ok, initial_events} = EventStore.read_events(@test_topic)
assert Enum.map(initial_events, & &1.position) == Enum.to_list(0..4)
# Get the current process
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
ref = Process.monitor(pid)
# Stop the process normally
GenServer.stop(pid, :normal)
# Wait for process to stop
assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1000
# Restart the process
{:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir)
# Write more events after restart
events2 =
for i <- 6..10 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events2)
assert :ok = EventStore.force_flush(@test_topic)
# Verify all positions are sequential
assert {:ok, all_events} = EventStore.read_events(@test_topic)
assert length(all_events) == 10
assert Enum.map(all_events, & &1.position) == Enum.to_list(0..9)
# Verify values are in correct order
assert Enum.map(all_events, & &1.data.value) == Enum.to_list(1..10)
end
test "handles concurrent writes" do
events =
for i <- 1..100 do
%{data: %{value: i}}
end
# Spawn multiple processes to write concurrently
tasks =
Enum.map(events, fn event ->
Task.async(fn ->
EventStore.write_event(@test_topic, event)
end)
end)
# Wait for all writes to complete
results = Task.await_many(tasks)
assert Enum.all?(results, &(&1 == :ok))
# Force flush to ensure all events are written
assert :ok = EventStore.force_flush(@test_topic)
# Verify we can read the events
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) == length(events)
end
end
describe "metrics" do
test "tracks write metrics" do
events =
for i <- 1..5 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
metrics = EventStore.get_metrics(@test_topic)
assert metrics.writes == length(events)
assert metrics.batches > 0
assert metrics.last_write_time != nil
end
test "tracks read metrics" do
events =
for i <- 1..5 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, read_events} = EventStore.read_events(@test_topic)
metrics = EventStore.get_metrics(@test_topic)
assert metrics.reads == length(read_events)
assert metrics.last_read_time != nil
end
end
describe "cleanup" do
test "performs cleanup operations", %{test_dir: _test_dir} do
# Write enough events to trigger cleanup
events =
for i <- 1..10_000 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# Force cleanup by sending cleanup message
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
send(pid, :cleanup)
# Wait for cleanup to complete
Process.sleep(100)
# Verify we can still read events after cleanup
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) > 0
end
end
describe "error handling" do
test "handles disk full scenario", %{test_dir: test_dir} do
# Mock disk full by making directory read-only
File.chmod!(test_dir, 0o444)
event = %{data: %{value: 1}}
result = EventStore.write_event(@test_topic, event)
# Reset permissions
File.chmod!(test_dir, 0o755)
# Should return error or crash gracefully
assert result in [:ok, {:error, :no_such_log}, {:error, :full}]
end
test "handles corrupted log file", %{test_dir: test_dir} do
# Write some initial data
events = [%{data: %{value: 1}}]
:ok = EventStore.write_events(@test_topic, events)
:ok = EventStore.force_flush(@test_topic)
# Corrupt the log file
log_file = Path.join(test_dir, "#{@test_topic}.LOG")
File.write!(log_file, "corrupted data", [:append])
# Stop the current process
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
GenServer.stop(pid)
# Restart should handle corruption
{:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir)
# Should still be able to write and read
new_event = %{data: %{value: 2}}
assert :ok = EventStore.write_event(@test_topic, new_event)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, _events} = EventStore.read_events(@test_topic)
end
test "handles invalid input data" do
# Test with empty/invalid events
assert :ok = EventStore.write_event(@test_topic, nil)
assert :ok = EventStore.write_event(@test_topic, "not a map")
# Test with invalid event structure
invalid_event = %{invalid: "structure"}
assert :ok = EventStore.write_event(@test_topic, invalid_event)
assert :ok = EventStore.force_flush(@test_topic)
# Should still be readable
assert {:ok, events} = EventStore.read_events(@test_topic)
assert length(events) == 3
# Verify events were wrapped properly
[nil_event, string_event, invalid_struct_event] = events
assert nil_event.data == %{value: nil}
assert string_event.data == %{value: "not a map"}
assert invalid_struct_event.data == %{value: %{invalid: "structure"}}
end
test "handles non-existent topic" do
non_existent = "non_existent_topic"
# Try operations without starting the topic process
assert {:error, {:noproc, _}} = try_read_events(non_existent)
assert {:error, {:noproc, _}} = try_get_latest_event(non_existent)
assert {:error, {:noproc, _}} = try_read_last_events(non_existent, 5)
# Write event should fail since topic doesn't exist
assert {:error, {:noproc, _}} = try_write_event(non_existent, %{data: %{value: 1}})
end
test "handles invalid base_dir" do
invalid_dir = "/root/invalid_event_store_dir"
assert {:error, _reason} = EventStore.start_link(@test_topic, base_dir: invalid_dir)
end
# Helper functions for safely testing non-existent topics
defp try_read_events(topic) do
try do
EventStore.read_events(topic)
catch
:exit, reason -> {:error, {:noproc, reason}}
end
end
defp try_get_latest_event(topic) do
try do
EventStore.get_latest_event(topic)
catch
:exit, reason -> {:error, {:noproc, reason}}
end
end
defp try_read_last_events(topic, n) do
try do
EventStore.read_last_events(topic, n)
catch
:exit, reason -> {:error, {:noproc, reason}}
end
end
defp try_write_event(topic, event) do
try do
# Use call instead of cast to ensure we get the error
GenServer.call(via_tuple(topic), {:write, event})
catch
:exit, reason -> {:error, {:noproc, reason}}
end
end
defp via_tuple(topic) do
{:via, Registry, {EventStore.Registry, {EventStore, topic}}}
end
end
describe "edge cases" do
test "handles empty events" do
# Write empty event list
assert :ok = EventStore.write_events(@test_topic, [])
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, []} = EventStore.read_events(@test_topic)
# Write single empty event
assert :ok = EventStore.write_event(@test_topic, %{data: %{value: ""}})
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, [event]} = EventStore.read_events(@test_topic)
assert event.data.value == ""
assert is_integer(event.position)
end
test "handles maximum integer values for positions" do
# Create events that would exceed max integer
# JavaScript max safe integer
max_safe_integer = 9_007_199_254_740_991
# Set initial position near max value
events =
for i <- (max_safe_integer - 10)..max_safe_integer do
%{data: %{value: i}, position: i}
end
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, read_events} = EventStore.read_events(@test_topic)
positions = Enum.map(read_events, & &1.position)
assert length(positions) == length(events)
assert Enum.all?(positions, &is_integer/1)
end
test "handles minimum/maximum timestamp values" do
min_timestamp = 0
# 32-bit max
max_timestamp = (:math.pow(2, 32) - 1) |> round()
events = [
%{data: %{value: "min"}, timestamp: min_timestamp},
%{data: %{value: "max"}, timestamp: max_timestamp}
]
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) == 2
[event1, event2] = read_events
assert event1.timestamp == min_timestamp
assert event2.timestamp == max_timestamp
end
test "handles batch buffer partially full at shutdown", %{test_dir: test_dir} do
# Write some events but don't flush
events =
for i <- 1..(@batch_size - 1) do
%{data: %{value: i}}
end
Enum.each(events, fn event ->
assert :ok = EventStore.write_event(@test_topic, event)
end)
# Get the current process
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
# Stop the process normally (should trigger terminate callback)
GenServer.stop(pid, :normal)
Process.sleep(100)
# Restart and verify data was saved
{:ok, _} = EventStore.start_link(@test_topic, base_dir: test_dir)
assert {:ok, read_events} = EventStore.read_events(@test_topic)
# Verify all events were saved
assert length(read_events) == length(events)
values = Enum.map(read_events, & &1.data.value)
assert values == Enum.to_list(1..(@batch_size - 1))
end
end
describe "concurrent access" do
test "handles multiple readers while writing" do
# Start multiple reader processes
reader_count = 5
read_iterations = 20
# Write initial events
initial_events =
for i <- 1..50 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, initial_events)
assert :ok = EventStore.force_flush(@test_topic)
# Start reader tasks
reader_tasks =
for _i <- 1..reader_count do
Task.async(fn ->
for _j <- 1..read_iterations do
{:ok, events} = EventStore.read_events(@test_topic)
assert length(events) >= length(initial_events)
# Random delay
Process.sleep(Enum.random(1..10))
end
:ok
end)
end
# Write more events while readers are running
writer_task =
Task.async(fn ->
for i <- 51..100 do
:ok = EventStore.write_event(@test_topic, %{data: %{value: i}})
# Random delay
Process.sleep(Enum.random(1..10))
end
:ok = EventStore.force_flush(@test_topic)
:ok
end)
# Wait for all tasks to complete
assert :ok = Task.await(writer_task)
assert Enum.all?(Task.await_many(reader_tasks), &(&1 == :ok))
# Verify final state
assert {:ok, final_events} = EventStore.read_events(@test_topic)
assert length(final_events) == 100
end
test "handles multiple writers to same topic" do
writer_count = 5
events_per_writer = 20
# Start multiple writer tasks
writer_tasks =
for writer_id <- 1..writer_count do
Task.async(fn ->
for i <- 1..events_per_writer do
event = %{data: %{writer: writer_id, value: i}}
:ok = EventStore.write_event(@test_topic, event)
# Random delay
Process.sleep(Enum.random(1..10))
end
:ok
end)
end
# Wait for all writers to complete
assert Enum.all?(Task.await_many(writer_tasks), &(&1 == :ok))
assert :ok = EventStore.force_flush(@test_topic)
# Verify results
assert {:ok, events} = EventStore.read_events(@test_topic)
assert length(events) == writer_count * events_per_writer
# Verify all events from all writers are present
events_by_writer = Enum.group_by(events, & &1.data.writer)
assert map_size(events_by_writer) == writer_count
Enum.each(events_by_writer, fn {_writer_id, writer_events} ->
values = Enum.map(writer_events, & &1.data.value)
assert length(values) == events_per_writer
assert Enum.sort(values) == Enum.to_list(1..events_per_writer)
end)
end
test "handles multiple topics simultaneously" do
topic_count = 5
events_per_topic = 20
# Create multiple topics
topics =
for i <- 1..topic_count do
topic = "test_topic_#{i}"
test_dir = "test/tmp/#{topic}"
File.mkdir_p!(test_dir)
{:ok, _} = EventStore.start_link(topic, base_dir: test_dir)
topic
end
# Write to all topics concurrently
writer_tasks =
for topic <- topics do
Task.async(fn ->
for i <- 1..events_per_topic do
event = %{data: %{value: i}}
:ok = EventStore.write_event(topic, event)
end
:ok = EventStore.force_flush(topic)
:ok
end)
end
# Wait for all writers
assert Enum.all?(Task.await_many(writer_tasks), &(&1 == :ok))
# Verify each topic
for topic <- topics do
assert {:ok, events} = EventStore.read_events(topic)
assert length(events) == events_per_topic
values = Enum.map(events, & &1.data.value)
assert Enum.sort(values) == Enum.to_list(1..events_per_topic)
end
# Cleanup
for topic <- topics do
pid = Registry.whereis_name({EventStore.Registry, {EventStore, topic}})
GenServer.stop(pid)
File.rm_rf!("test/tmp/#{topic}")
end
end
test "handles race conditions during cleanup operations" do
# Write initial events in smaller batches
initial_events =
for i <- 1..100 do
%{data: %{value: i}}
end
# Write in chunks of 10
Enum.chunk_every(initial_events, 10)
|> Enum.each(fn chunk ->
assert :ok = EventStore.write_events(@test_topic, chunk)
assert :ok = EventStore.force_flush(@test_topic)
# Brief pause between chunks
Process.sleep(10)
end)
# Verify initial state with chunk_size
assert {:ok, events_before} = EventStore.read_events(@test_topic, chunk_size: 1000)
assert length(events_before) == 100
# Create an agent to track write completion
{:ok, write_agent} = Agent.start_link(fn -> 0 end)
# Start concurrent readers
reader_task =
Task.async(fn ->
for _i <- 1..20 do
{:ok, events} = EventStore.read_events(@test_topic, chunk_size: 1000)
# Should at least have initial events
assert length(events) >= 100
Process.sleep(Enum.random(5..15))
end
:ok
end)
# Start concurrent writers
writer_task =
Task.async(fn ->
# Write additional events in smaller batches
additional_events =
for i <- 101..150 do
%{data: %{value: i}}
end
Enum.chunk_every(additional_events, 5)
|> Enum.each(fn chunk ->
:ok = EventStore.write_events(@test_topic, chunk)
:ok = EventStore.force_flush(@test_topic)
Agent.update(write_agent, &(&1 + length(chunk)))
Process.sleep(Enum.random(5..15))
end)
:ok
end)
# Wait for writers to make progress
Process.sleep(200)
# Trigger cleanup and monitor its completion
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
cleanup_ref = Process.monitor(pid)
send(pid, :cleanup)
# Wait for cleanup message to be processed
Process.sleep(200)
# Wait for operations to complete
assert :ok = Task.await(reader_task, 5000)
assert :ok = Task.await(writer_task, 5000)
# Ensure all writes are flushed
assert :ok = EventStore.force_flush(@test_topic)
# Wait for final flush
Process.sleep(200)
# Wait for cleanup to complete or timeout
receive do
{:DOWN, ^cleanup_ref, :process, ^pid, _} -> :ok
after
1000 -> :ok
end
# Function to retry reads with exponential backoff
retry_read = fn retry_fn, attempts ->
case EventStore.read_events(@test_topic, chunk_size: 1000) do
{:ok, events} when length(events) >= 150 ->
{:ok, events}
_ when attempts > 1 ->
# Longer delay between retries
Process.sleep(200)
retry_fn.(retry_fn, attempts - 1)
{:ok, events} ->
{:ok, events}
end
end
# Verify final state with retries
# More retries
assert {:ok, final_events} = retry_read.(retry_read, 10)
assert length(final_events) == 150
# Verify all events are present and in order
values = Enum.map(final_events, & &1.data.value)
sorted_values = Enum.sort(values)
assert sorted_values == Enum.to_list(1..150)
# Clean up agent
Agent.stop(write_agent)
end
end
describe "performance boundaries" do
test "handles large binary data chunks" do
# Create events with large binary data (1MB each)
binary_data = String.duplicate("a", 1024 * 1024)
events =
for _i <- 1..4 do
%{data: %{value: binary_data}}
end
# Write events and force flush
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# Read events back
assert {:ok, read_events} = EventStore.read_events(@test_topic)
# Verify data integrity
assert length(read_events) == 4
Enum.each(read_events, fn event ->
assert String.length(event.data.value) == 1024 * 1024
assert event.data.value == binary_data
end)
# Verify events are in correct order
positions = Enum.map(read_events, & &1.position)
assert positions == Enum.to_list(0..3)
end
test "handles extremely large events" do
# Create a large event with 100KB of data
large_data = String.duplicate("a", 100 * 1024)
event = %{data: %{value: large_data}}
assert :ok = EventStore.write_event(@test_topic, event)
assert :ok = EventStore.force_flush(@test_topic)
assert {:ok, [read_event]} = EventStore.read_events(@test_topic)
assert read_event.data.value == large_data
end
test "handles high write frequency" do
# Use a unique topic name for this test
test_topic = "high_freq_test_#{System.unique_integer()}"
test_dir = "test/tmp/#{test_topic}_#{System.unique_integer()}"
File.mkdir_p!(test_dir)
# Start with larger batch size for this test
{:ok, pid} =
EventStore.start_link(test_topic,
base_dir: test_dir,
batch_size: @large_batch_size,
batch_timeout: @batch_timeout
)
# Write 1000 events as fast as possible
events =
for i <- 1..1000 do
%{data: %{value: i}}
end
# Write all events at once
assert :ok = EventStore.write_events(test_topic, events)
on_exit(fn ->
try do
if Process.alive?(pid) do
GenServer.stop(pid)
end
catch
_kind, _reason -> :ok
end
try do
File.rm_rf!(test_dir)
catch
_kind, _reason -> :ok
end
end)
assert :ok = EventStore.force_flush(test_topic)
# Wait for all writes to complete
Process.sleep(500)
# Read and verify in chunks to avoid memory pressure
chunks = Enum.chunk_every(1..1000, 100)
Enum.each(chunks, fn range ->
{:ok, current_events} = EventStore.read_events(test_topic, chunk_size: 1000)
# Log progress
Logger.debug("Verifying events #{Enum.min(range)}..#{Enum.max(range)}")
# Verify all events are present and in order
values = Enum.map(current_events, & &1.data.value)
assert length(values) == 1000,
"Expected 1000 events, got #{length(values)}"
# Verify this chunk's values are present and in order
chunk_values = Enum.slice(values, Enum.min(range) - 1, length(range))
assert chunk_values == Enum.to_list(range),
"Events #{Enum.min(range)}..#{Enum.max(range)} are not in correct order"
# Brief pause between verifications
Process.sleep(100)
end)
# Verify all events were written
assert {:ok, read_events} = EventStore.read_events(test_topic, chunk_size: 1000)
# Verify we got all events and they're in order
values = Enum.map(read_events, & &1.data.value)
assert values == Enum.to_list(1..1000)
assert length(read_events) == 1000
end
test "handles memory usage under load", %{test_dir: test_dir} do
# Write a large number of medium-sized events
events =
for i <- 1..5000 do
# Each event around 1KB
data = String.duplicate("data-#{i}", 100)
%{data: %{value: data}}
end
# Write events in chunks to avoid overwhelming the system
Enum.chunk_every(events, 500)
|> Enum.each(fn chunk ->
assert :ok = EventStore.write_events(@test_topic, chunk)
assert :ok = EventStore.force_flush(@test_topic)
# Brief pause to allow for cleanup
Process.sleep(10)
end)
# Verify we can still read all events
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) > 0
# Check disk usage hasn't exceeded limits
files = File.ls!(test_dir)
log_file =
files
|> Enum.find(fn file -> String.starts_with?(file, @test_topic) end)
|> case do
nil -> raise "Log file not found in #{test_dir}"
file -> Path.join(test_dir, file)
end
{:ok, %{size: size}} = File.stat(log_file)
# Should be less than 10MB
assert size < 1024 * 1024 * 10
end
test "handles behavior near max file size limits", %{test_dir: test_dir} do
# Write events until we exceed the default max chunk size
# 1MB
chunk_size = 1024 * 1024
# 10KB per event
data_size = 10 * 1024
events_per_chunk = div(chunk_size, data_size)
# Generate events that will fill multiple chunks
events =
for i <- 1..(events_per_chunk * 2) do
data = String.duplicate("#{i}", data_size)
%{data: %{value: data}}
end
# Write events and force cleanup
assert :ok = EventStore.write_events(@test_topic, events)
assert :ok = EventStore.force_flush(@test_topic)
# Force cleanup
pid = Registry.whereis_name({EventStore.Registry, {EventStore, @test_topic}})
send(pid, :cleanup)
# Wait for cleanup
Process.sleep(100)
# Write more events after cleanup
more_events =
for i <- 1..10 do
%{data: %{value: i}}
end
assert :ok = EventStore.write_events(@test_topic, more_events)
assert :ok = EventStore.force_flush(@test_topic)
# Verify we can still read events after hitting size limits
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) > 0
# Verify log file size is managed
# List files in directory to see what was created
files = File.ls!(test_dir)
log_file =
files
|> Enum.find(fn file -> String.starts_with?(file, @test_topic) end)
|> case do
nil -> raise "Log file not found in #{test_dir}"
file -> Path.join(test_dir, file)
end
{:ok, %{size: size}} = File.stat(log_file)
# Should not exceed max total size
assert size < chunk_size * 10
end
end
describe "data format handling" do
test "handles deeply nested data structures" do
# Create a deeply nested map structure
deep_nest = fn depth ->
Enum.reduce(1..depth, %{value: "leaf"}, fn i, acc ->
%{"level_#{i}" => acc}
end)
end
# Create events with increasingly deep nesting
nested_events =
for depth <- 1..10 do
%{
data: %{
depth: depth,
nested: deep_nest.(depth),
arrays: List.duplicate([1, 2, [3, 4, [5]]], depth)
}
}
end
# Write all nested events
assert :ok = EventStore.write_events(@test_topic, nested_events)
assert :ok = EventStore.force_flush(@test_topic)
# Read back and verify
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) == length(nested_events)
# Compare original and read data
Enum.zip(nested_events, read_events)
|> Enum.each(fn {original, read} ->
assert read.data == original.data
assert is_integer(read.position)
assert is_integer(read.timestamp)
# Verify the depth matches
assert read.data.depth == original.data.depth
# Verify nested structure is preserved
assert read.data.nested == original.data.nested
# Verify nested arrays are preserved
assert read.data.arrays == original.data.arrays
end)
# Verify positions are sequential
positions = Enum.map(read_events, & &1.position)
assert positions == Enum.to_list(0..(length(nested_events) - 1))
end
test "handles events with special characters and unusual formats" do
special_events = [
# Unicode emoji
%{data: %{value: "Hello 🌍 World!"}},
# Control characters
%{data: %{value: "\t\n\r"}},
# Mixed quotes
%{data: %{value: "'''\"\"'''"}},
# Long Unicode string
%{data: %{value: String.duplicate("长", 100)}},
# Complex nested structure
%{data: %{value: %{nested: %{deep: "🎉", array: [1, "2", :three]}}}},
# Escaped characters
%{data: %{value: "\\special\\chars"}},
# Empty array
%{data: %{value: []}},
# Empty map
%{data: %{value: %{}}},
# Mixed types array
%{data: %{value: [nil, true, false, 42, 3.14]}},
# HTML-like content
%{data: %{value: "<script>alert('test')</script>"}}
]
# Write all special events
assert :ok = EventStore.write_events(@test_topic, special_events)
assert :ok = EventStore.force_flush(@test_topic)
# Read back and verify
assert {:ok, read_events} = EventStore.read_events(@test_topic)
assert length(read_events) == length(special_events)
# Compare original and read data
Enum.zip(special_events, read_events)
|> Enum.each(fn {original, read} ->
assert read.data == original.data
assert is_integer(read.position)
assert is_integer(read.timestamp)
end)
# Verify positions are sequential
positions = Enum.map(read_events, & &1.position)
assert positions == Enum.to_list(0..(length(special_events) - 1))
end
end
end
defmodule SimpleEventStore do
@moduledoc """
A simplified implementation of EventStore using only disk_log.
This version is primarily for testing and understanding disk_log behavior.
"""
use GenServer
require Logger
defmodule State do
defstruct [:topic, :log_name, :base_dir]
end
# Public API
def start_link(topic, opts \\ []) do
GenServer.start_link(__MODULE__, [topic, opts], name: via_tuple(topic))
end
def write_event(topic, event) do
GenServer.call(via_tuple(topic), {:write, event})
end
def read_events(topic) do
GenServer.call(via_tuple(topic), :read)
end
# Server Implementation
@impl true
def init([topic, opts]) do
base_dir = Keyword.get(opts, :base_dir, "data/#{topic}")
File.mkdir_p!(base_dir)
log_name = String.to_atom("#{topic}_log")
log_file = Path.join(base_dir, "#{topic}.log")
# Open or create disk_log
case :disk_log.open(
name: log_name,
file: String.to_charlist(log_file),
type: :wrap,
# 1MB file, 10 files max
size: {1024 * 1024, 10},
mode: :read_write
) do
{:ok, _} ->
{:ok, %State{topic: topic, log_name: log_name, base_dir: base_dir}}
{:repaired, _, _, _} ->
Logger.info("Repaired corrupted log file")
{:ok, %State{topic: topic, log_name: log_name, base_dir: base_dir}}
{:error, reason} ->
Logger.error("Failed to open disk_log: #{inspect(reason)}")
{:stop, reason}
end
end
@impl true
def handle_call({:write, event}, _from, state) do
wrapped_event = wrap_event(event)
binary_event = :erlang.term_to_binary(wrapped_event)
case :disk_log.log_terms(state.log_name, [binary_event]) do
:ok ->
{:reply, :ok, state}
{:error, reason} ->
Logger.error("Failed to write event: #{inspect(reason)}")
{:reply, {:error, reason}, state}
end
end
@impl true
def handle_call(:read, _from, state) do
case read_all_events(state.log_name) do
{:ok, events} ->
{:reply, {:ok, events}, state}
{:error, reason} ->
Logger.error("Failed to read events: #{inspect(reason)}")
{:reply, {:error, reason}, state}
end
end
@impl true
def terminate(_reason, %{log_name: log_name} = state) when not is_nil(log_name) do
with :ok <- :disk_log.sync(state.log_name),
:ok <- :disk_log.close(state.log_name) do
:ok
else
{:error, _reason} -> :ok
end
end
def terminate(_reason, _state) do
:ok
end
# Private Functions
defp via_tuple(topic) do
{:via, Registry, {EventStore.Registry, {__MODULE__, topic}}}
end
defp wrap_event(event) do
case event do
%{timestamp: _} -> event
_ -> Map.put(event, :timestamp, System.system_time(:second))
end
end
defp read_all_events(log_name) do
try do
chunk_loop(log_name, :start, [])
catch
:exit, reason ->
{:error, reason}
end
end
defp chunk_loop(log_name, continuation, acc) do
case :disk_log.chunk(log_name, continuation) do
:eof ->
{:ok, acc}
{:error, :no_such_log} ->
{:ok, []}
{:error, reason} ->
{:error, reason}
{next_continuation, terms} when is_list(terms) ->
chunk_loop(log_name, next_continuation, acc ++ process_terms(terms))
end
end
defp process_terms(terms) when is_list(terms) do
Enum.map(terms, fn
term when is_binary(term) -> :erlang.binary_to_term(term)
term -> term
end)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment