Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Last active December 14, 2024 18:06
Show Gist options
  • Save monotykamary/ef86ce49afb9073adc9afd12071d3421 to your computer and use it in GitHub Desktop.
Save monotykamary/ef86ce49afb9073adc9afd12071d3421 to your computer and use it in GitHub Desktop.
Hot Landing Zone with Single-Node Disk Log
defmodule Nghenhan.LandingZone do
@moduledoc """
Hot Landing Zone implementation with fast recovery using segmented logs,
separate index files, and checkpoints.
"""
use GenServer
require Logger
@checkpoint_interval 300_000 # 5 minutes
@cleanup_interval 60_000 # 1 minute
@segment_max_size 64 * 1024 * 1024 # 64MB per segment
@index_checkpoint_size 1000 # Number of entries before index checkpoint
defmodule Segment do
defstruct [:id, :log, :index, :start_time, :end_time, :size]
end
defmodule State do
defstruct [
:topic,
:base_dir,
:active_segment,
:segments, # Map of segment_id -> Segment
:index_cache, # LRU cache of frequently accessed index entries
:last_checkpoint,
:retention_period, # in seconds
write_buffer: [],
write_buffer_size: 0
]
end
# Public API
def start_link(topic, opts \\ []) do
GenServer.start_link(__MODULE__, [topic, opts], name: via_tuple(topic))
end
def write_event(topic, event) do
GenServer.call(via_tuple(topic), {:write, event})
end
def read_events(topic, start_time, end_time) do
GenServer.call(via_tuple(topic), {:read, start_time, end_time})
end
# Server Implementation
@impl true
def init([topic, opts]) do
base_dir = Keyword.get(opts, :base_dir, "data/#{topic}")
retention_period = parse_retention_period(opts)
File.mkdir_p!(base_dir)
# Initialize directories
File.mkdir_p!(Path.join(base_dir, "segments"))
File.mkdir_p!(Path.join(base_dir, "indexes"))
File.mkdir_p!(Path.join(base_dir, "checkpoints"))
# Load or create checkpoint
state = case load_latest_checkpoint(base_dir) do
{:ok, checkpoint} ->
# Fast path - restore from checkpoint
state = restore_from_checkpoint(checkpoint, topic, base_dir)
%{state | retention_period: retention_period}
:error ->
# Slow path - scan segments and rebuild state
Logger.warn("No valid checkpoint found, rebuilding state from segments")
state = rebuild_state_from_segments(topic, base_dir)
%{state | retention_period: retention_period}
end
# Schedule periodic tasks
schedule_checkpoint()
schedule_cleanup()
{:ok, state}
end
@impl true
def handle_call({:write, event}, _from, state) do
wrapped_event = wrap_event(event)
# Check if we need a new segment
state = maybe_rotate_segment(state)
# Write to active segment
case write_to_segment(state.active_segment, wrapped_event) do
{:ok, new_segment} ->
state = %{state |
active_segment: new_segment,
write_buffer: [wrapped_event | state.write_buffer],
write_buffer_size: state.write_buffer_size + 1
}
# Update index if buffer threshold reached
state = maybe_update_index(state)
{:reply, :ok, state}
{:error, reason} ->
Logger.error("Failed to write event: #{inspect(reason)}")
{:reply, {:error, reason}, state}
end
end
@impl true
def handle_call({:read, start_time, end_time}, _from, state) do
# Find relevant segments using index
segments = find_segments_for_timerange(state, start_time, end_time)
# Read events from segments
events = read_events_from_segments(segments, start_time, end_time)
{:reply, events, state}
end
@impl true
def handle_info(:checkpoint, state) do
new_state = create_checkpoint(state)
schedule_checkpoint()
{:noreply, new_state}
end
@impl true
def handle_info(:cleanup, state) do
new_state = cleanup_old_segments(state)
schedule_cleanup()
{:noreply, new_state}
end
# Private Functions
defp parse_retention_period(opts) do
case Keyword.get(opts, :retention) do
{value, :minutes} -> value * 60
{value, :hours} -> value * 3600
{value, :days} -> value * 86400
nil -> 7 * 86400 # Default 7 days
invalid ->
Logger.warn("Invalid retention period #{inspect(invalid)}, using default 7 days")
7 * 86400
end
end
defp cleanup_old_segments(state) do
cutoff_time = System.system_time(:second) - state.retention_period
{to_remove, to_keep} = Enum.split_with(state.segments, fn {_id, segment} ->
segment.end_time < cutoff_time
end)
# Close and remove old segments
Enum.each(to_remove, fn {id, segment} ->
Logger.info("Cleaning up old segment #{id}")
:ok = :disk_log.close(segment.log)
:ok = :disk_log.close(segment.index)
File.rm(segment_path(state.base_dir, id))
File.rm(index_path(state.base_dir, id))
end)
%{state | segments: Map.new(to_keep)}
end
defp schedule_cleanup do
Process.send_after(self(), :cleanup, @cleanup_interval)
end
defp load_latest_checkpoint(base_dir) do
checkpoint_dir = Path.join(base_dir, "checkpoints")
case File.ls(checkpoint_dir) do
{:ok, files} ->
files
|> Enum.filter(&String.ends_with?(&1, ".checkpoint"))
|> Enum.sort()
|> List.last()
|> case do
nil -> :error
file ->
path = Path.join(checkpoint_dir, file)
case File.read(path) do
{:ok, binary} -> {:ok, :erlang.binary_to_term(binary)}
_ -> :error
end
end
_ -> :error
end
end
defp restore_from_checkpoint(checkpoint, topic, base_dir) do
Logger.info("Restoring from checkpoint for topic #{topic}")
# Validate checkpoint segments still exist
segments =
checkpoint.segments
|> Enum.filter(fn {_id, segment} ->
File.exists?(segment_path(base_dir, segment.id)) &&
File.exists?(index_path(base_dir, segment.id))
end)
|> Map.new()
# Open logs for segments
segments =
Enum.map(segments, fn {id, segment} ->
{:ok, log} = open_segment_log(base_dir, id)
{:ok, index} = open_segment_index(base_dir, id)
{id, %{segment | log: log, index: index}}
end)
|> Map.new()
# Create new active segment if needed
active_segment = case checkpoint.active_segment do
nil -> create_new_segment(base_dir)
segment when is_map(segment) ->
if File.exists?(segment_path(base_dir, segment.id)) do
{:ok, log} = open_segment_log(base_dir, segment.id)
{:ok, index} = open_segment_index(base_dir, segment.id)
%{segment | log: log, index: index}
else
create_new_segment(base_dir)
end
end
%State{
topic: topic,
base_dir: base_dir,
active_segment: active_segment,
segments: segments,
index_cache: :ets.new(:index_cache, [:set, :private]),
last_checkpoint: System.system_time(:second),
write_buffer: [],
write_buffer_size: 0
}
end
defp rebuild_state_from_segments(topic, base_dir) do
Logger.info("Rebuilding state from segments for topic #{topic}")
# Scan segments directory
segment_dir = Path.join(base_dir, "segments")
segments =
case File.ls(segment_dir) do
{:ok, files} ->
files
|> Enum.filter(&String.ends_with?(&1, ".segment"))
|> Enum.map(fn file ->
id = Path.rootname(file)
{:ok, log} = open_segment_log(base_dir, id)
{:ok, index} = open_segment_index(base_dir, id)
segment = scan_segment(log, index)
{id, segment}
end)
|> Map.new()
_ -> %{}
end
# Create new active segment
active_segment = create_new_segment(base_dir)
%State{
topic: topic,
base_dir: base_dir,
active_segment: active_segment,
segments: segments,
index_cache: :ets.new(:index_cache, [:set, :private]),
last_checkpoint: System.system_time(:second),
write_buffer: [],
write_buffer_size: 0
}
end
defp create_new_segment(base_dir) do
id = "#{System.system_time(:microsecond)}"
{:ok, log} = :disk_log.open([
name: String.to_atom("segment_#{id}"),
file: String.to_charlist(segment_path(base_dir, id)),
type: :wrap,
size: {@segment_max_size, 2},
mode: :read_write
])
{:ok, index} = :disk_log.open([
name: String.to_atom("index_#{id}"),
file: String.to_charlist(index_path(base_dir, id)),
type: :wrap,
size: {@segment_max_size div 10, 2}, # Index is typically smaller
mode: :read_write
])
%Segment{
id: id,
log: log,
index: index,
start_time: nil,
end_time: nil,
size: 0
}
end
defp write_to_segment(segment, event) do
binary_event = :erlang.term_to_binary(event)
size = byte_size(binary_event)
case :disk_log.log_terms(segment.log, [binary_event]) do
:ok ->
new_segment = %{segment |
start_time: segment.start_time || event.timestamp,
end_time: event.timestamp,
size: segment.size + size
}
{:ok, new_segment}
error -> error
end
end
defp maybe_rotate_segment(%{active_segment: segment} = state) do
if segment.size >= @segment_max_size do
# Flush current segment's index
flush_index(segment)
# Move current segment to segments map
segments = Map.put(state.segments, segment.id, segment)
# Create new active segment
%{state |
active_segment: create_new_segment(state.base_dir),
segments: segments
}
else
state
end
end
defp maybe_update_index(state) do
if state.write_buffer_size >= @index_checkpoint_size do
# Sort buffer by timestamp
sorted_buffer = Enum.sort_by(state.write_buffer, & &1.timestamp)
# Update index
:ok = update_segment_index(state.active_segment, sorted_buffer)
%{state |
write_buffer: [],
write_buffer_size: 0
}
else
state
end
end
defp update_segment_index(segment, events) do
index_entries = Enum.map(events, fn event ->
{event.timestamp, :disk_log.bytes_to_position(segment.log)}
end)
:disk_log.log_terms(segment.index, index_entries)
end
defp create_checkpoint(state) do
checkpoint = %{
timestamp: System.system_time(:second),
segments: state.segments,
active_segment: state.active_segment
}
checkpoint_path = Path.join([
state.base_dir,
"checkpoints",
"#{checkpoint.timestamp}.checkpoint"
])
File.write!(checkpoint_path, :erlang.term_to_binary(checkpoint))
# Cleanup old checkpoints
cleanup_old_checkpoints(state.base_dir)
%{state | last_checkpoint: checkpoint.timestamp}
end
defp cleanup_old_checkpoints(base_dir) do
checkpoint_dir = Path.join(base_dir, "checkpoints")
cutoff_time = System.system_time(:second) - 3600 # Keep last hour
case File.ls(checkpoint_dir) do
{:ok, files} ->
files
|> Enum.filter(&String.ends_with?(&1, ".checkpoint"))
|> Enum.sort()
|> Enum.drop(-5) # Keep last 5 checkpoints minimum
|> Enum.each(fn file ->
path = Path.join(checkpoint_dir, file)
{timestamp, _} =
file
|> Path.rootname()
|> Integer.parse()
if timestamp < cutoff_time do
File.rm(path)
end
end)
_ -> :ok
end
end
defp segment_path(base_dir, id), do: Path.join([base_dir, "segments", "#{id}.segment"])
defp index_path(base_dir, id), do: Path.join([base_dir, "indexes", "#{id}.index"])
defp schedule_checkpoint do
Process.send_after(self(), :checkpoint, @checkpoint_interval)
end
defp via_tuple(topic) do
{:via, Registry, {Nghenhan.LandingZone.Registry, topic}}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment