Last active
December 14, 2024 18:06
-
-
Save monotykamary/ef86ce49afb9073adc9afd12071d3421 to your computer and use it in GitHub Desktop.
Hot Landing Zone with Single-Node Disk Log
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Nghenhan.LandingZone do | |
@moduledoc """ | |
Hot Landing Zone implementation with fast recovery using segmented logs, | |
separate index files, and checkpoints. | |
""" | |
use GenServer | |
require Logger | |
@checkpoint_interval 300_000 # 5 minutes | |
@cleanup_interval 60_000 # 1 minute | |
@segment_max_size 64 * 1024 * 1024 # 64MB per segment | |
@index_checkpoint_size 1000 # Number of entries before index checkpoint | |
defmodule Segment do | |
defstruct [:id, :log, :index, :start_time, :end_time, :size] | |
end | |
defmodule State do | |
defstruct [ | |
:topic, | |
:base_dir, | |
:active_segment, | |
:segments, # Map of segment_id -> Segment | |
:index_cache, # LRU cache of frequently accessed index entries | |
:last_checkpoint, | |
:retention_period, # in seconds | |
write_buffer: [], | |
write_buffer_size: 0 | |
] | |
end | |
# Public API | |
def start_link(topic, opts \\ []) do | |
GenServer.start_link(__MODULE__, [topic, opts], name: via_tuple(topic)) | |
end | |
def write_event(topic, event) do | |
GenServer.call(via_tuple(topic), {:write, event}) | |
end | |
def read_events(topic, start_time, end_time) do | |
GenServer.call(via_tuple(topic), {:read, start_time, end_time}) | |
end | |
# Server Implementation | |
@impl true | |
def init([topic, opts]) do | |
base_dir = Keyword.get(opts, :base_dir, "data/#{topic}") | |
retention_period = parse_retention_period(opts) | |
File.mkdir_p!(base_dir) | |
# Initialize directories | |
File.mkdir_p!(Path.join(base_dir, "segments")) | |
File.mkdir_p!(Path.join(base_dir, "indexes")) | |
File.mkdir_p!(Path.join(base_dir, "checkpoints")) | |
# Load or create checkpoint | |
state = case load_latest_checkpoint(base_dir) do | |
{:ok, checkpoint} -> | |
# Fast path - restore from checkpoint | |
state = restore_from_checkpoint(checkpoint, topic, base_dir) | |
%{state | retention_period: retention_period} | |
:error -> | |
# Slow path - scan segments and rebuild state | |
Logger.warn("No valid checkpoint found, rebuilding state from segments") | |
state = rebuild_state_from_segments(topic, base_dir) | |
%{state | retention_period: retention_period} | |
end | |
# Schedule periodic tasks | |
schedule_checkpoint() | |
schedule_cleanup() | |
{:ok, state} | |
end | |
@impl true | |
def handle_call({:write, event}, _from, state) do | |
wrapped_event = wrap_event(event) | |
# Check if we need a new segment | |
state = maybe_rotate_segment(state) | |
# Write to active segment | |
case write_to_segment(state.active_segment, wrapped_event) do | |
{:ok, new_segment} -> | |
state = %{state | | |
active_segment: new_segment, | |
write_buffer: [wrapped_event | state.write_buffer], | |
write_buffer_size: state.write_buffer_size + 1 | |
} | |
# Update index if buffer threshold reached | |
state = maybe_update_index(state) | |
{:reply, :ok, state} | |
{:error, reason} -> | |
Logger.error("Failed to write event: #{inspect(reason)}") | |
{:reply, {:error, reason}, state} | |
end | |
end | |
@impl true | |
def handle_call({:read, start_time, end_time}, _from, state) do | |
# Find relevant segments using index | |
segments = find_segments_for_timerange(state, start_time, end_time) | |
# Read events from segments | |
events = read_events_from_segments(segments, start_time, end_time) | |
{:reply, events, state} | |
end | |
@impl true | |
def handle_info(:checkpoint, state) do | |
new_state = create_checkpoint(state) | |
schedule_checkpoint() | |
{:noreply, new_state} | |
end | |
@impl true | |
def handle_info(:cleanup, state) do | |
new_state = cleanup_old_segments(state) | |
schedule_cleanup() | |
{:noreply, new_state} | |
end | |
# Private Functions | |
defp parse_retention_period(opts) do | |
case Keyword.get(opts, :retention) do | |
{value, :minutes} -> value * 60 | |
{value, :hours} -> value * 3600 | |
{value, :days} -> value * 86400 | |
nil -> 7 * 86400 # Default 7 days | |
invalid -> | |
Logger.warn("Invalid retention period #{inspect(invalid)}, using default 7 days") | |
7 * 86400 | |
end | |
end | |
defp cleanup_old_segments(state) do | |
cutoff_time = System.system_time(:second) - state.retention_period | |
{to_remove, to_keep} = Enum.split_with(state.segments, fn {_id, segment} -> | |
segment.end_time < cutoff_time | |
end) | |
# Close and remove old segments | |
Enum.each(to_remove, fn {id, segment} -> | |
Logger.info("Cleaning up old segment #{id}") | |
:ok = :disk_log.close(segment.log) | |
:ok = :disk_log.close(segment.index) | |
File.rm(segment_path(state.base_dir, id)) | |
File.rm(index_path(state.base_dir, id)) | |
end) | |
%{state | segments: Map.new(to_keep)} | |
end | |
defp schedule_cleanup do | |
Process.send_after(self(), :cleanup, @cleanup_interval) | |
end | |
defp load_latest_checkpoint(base_dir) do | |
checkpoint_dir = Path.join(base_dir, "checkpoints") | |
case File.ls(checkpoint_dir) do | |
{:ok, files} -> | |
files | |
|> Enum.filter(&String.ends_with?(&1, ".checkpoint")) | |
|> Enum.sort() | |
|> List.last() | |
|> case do | |
nil -> :error | |
file -> | |
path = Path.join(checkpoint_dir, file) | |
case File.read(path) do | |
{:ok, binary} -> {:ok, :erlang.binary_to_term(binary)} | |
_ -> :error | |
end | |
end | |
_ -> :error | |
end | |
end | |
defp restore_from_checkpoint(checkpoint, topic, base_dir) do | |
Logger.info("Restoring from checkpoint for topic #{topic}") | |
# Validate checkpoint segments still exist | |
segments = | |
checkpoint.segments | |
|> Enum.filter(fn {_id, segment} -> | |
File.exists?(segment_path(base_dir, segment.id)) && | |
File.exists?(index_path(base_dir, segment.id)) | |
end) | |
|> Map.new() | |
# Open logs for segments | |
segments = | |
Enum.map(segments, fn {id, segment} -> | |
{:ok, log} = open_segment_log(base_dir, id) | |
{:ok, index} = open_segment_index(base_dir, id) | |
{id, %{segment | log: log, index: index}} | |
end) | |
|> Map.new() | |
# Create new active segment if needed | |
active_segment = case checkpoint.active_segment do | |
nil -> create_new_segment(base_dir) | |
segment when is_map(segment) -> | |
if File.exists?(segment_path(base_dir, segment.id)) do | |
{:ok, log} = open_segment_log(base_dir, segment.id) | |
{:ok, index} = open_segment_index(base_dir, segment.id) | |
%{segment | log: log, index: index} | |
else | |
create_new_segment(base_dir) | |
end | |
end | |
%State{ | |
topic: topic, | |
base_dir: base_dir, | |
active_segment: active_segment, | |
segments: segments, | |
index_cache: :ets.new(:index_cache, [:set, :private]), | |
last_checkpoint: System.system_time(:second), | |
write_buffer: [], | |
write_buffer_size: 0 | |
} | |
end | |
defp rebuild_state_from_segments(topic, base_dir) do | |
Logger.info("Rebuilding state from segments for topic #{topic}") | |
# Scan segments directory | |
segment_dir = Path.join(base_dir, "segments") | |
segments = | |
case File.ls(segment_dir) do | |
{:ok, files} -> | |
files | |
|> Enum.filter(&String.ends_with?(&1, ".segment")) | |
|> Enum.map(fn file -> | |
id = Path.rootname(file) | |
{:ok, log} = open_segment_log(base_dir, id) | |
{:ok, index} = open_segment_index(base_dir, id) | |
segment = scan_segment(log, index) | |
{id, segment} | |
end) | |
|> Map.new() | |
_ -> %{} | |
end | |
# Create new active segment | |
active_segment = create_new_segment(base_dir) | |
%State{ | |
topic: topic, | |
base_dir: base_dir, | |
active_segment: active_segment, | |
segments: segments, | |
index_cache: :ets.new(:index_cache, [:set, :private]), | |
last_checkpoint: System.system_time(:second), | |
write_buffer: [], | |
write_buffer_size: 0 | |
} | |
end | |
defp create_new_segment(base_dir) do | |
id = "#{System.system_time(:microsecond)}" | |
{:ok, log} = :disk_log.open([ | |
name: String.to_atom("segment_#{id}"), | |
file: String.to_charlist(segment_path(base_dir, id)), | |
type: :wrap, | |
size: {@segment_max_size, 2}, | |
mode: :read_write | |
]) | |
{:ok, index} = :disk_log.open([ | |
name: String.to_atom("index_#{id}"), | |
file: String.to_charlist(index_path(base_dir, id)), | |
type: :wrap, | |
size: {@segment_max_size div 10, 2}, # Index is typically smaller | |
mode: :read_write | |
]) | |
%Segment{ | |
id: id, | |
log: log, | |
index: index, | |
start_time: nil, | |
end_time: nil, | |
size: 0 | |
} | |
end | |
defp write_to_segment(segment, event) do | |
binary_event = :erlang.term_to_binary(event) | |
size = byte_size(binary_event) | |
case :disk_log.log_terms(segment.log, [binary_event]) do | |
:ok -> | |
new_segment = %{segment | | |
start_time: segment.start_time || event.timestamp, | |
end_time: event.timestamp, | |
size: segment.size + size | |
} | |
{:ok, new_segment} | |
error -> error | |
end | |
end | |
defp maybe_rotate_segment(%{active_segment: segment} = state) do | |
if segment.size >= @segment_max_size do | |
# Flush current segment's index | |
flush_index(segment) | |
# Move current segment to segments map | |
segments = Map.put(state.segments, segment.id, segment) | |
# Create new active segment | |
%{state | | |
active_segment: create_new_segment(state.base_dir), | |
segments: segments | |
} | |
else | |
state | |
end | |
end | |
defp maybe_update_index(state) do | |
if state.write_buffer_size >= @index_checkpoint_size do | |
# Sort buffer by timestamp | |
sorted_buffer = Enum.sort_by(state.write_buffer, & &1.timestamp) | |
# Update index | |
:ok = update_segment_index(state.active_segment, sorted_buffer) | |
%{state | | |
write_buffer: [], | |
write_buffer_size: 0 | |
} | |
else | |
state | |
end | |
end | |
defp update_segment_index(segment, events) do | |
index_entries = Enum.map(events, fn event -> | |
{event.timestamp, :disk_log.bytes_to_position(segment.log)} | |
end) | |
:disk_log.log_terms(segment.index, index_entries) | |
end | |
defp create_checkpoint(state) do | |
checkpoint = %{ | |
timestamp: System.system_time(:second), | |
segments: state.segments, | |
active_segment: state.active_segment | |
} | |
checkpoint_path = Path.join([ | |
state.base_dir, | |
"checkpoints", | |
"#{checkpoint.timestamp}.checkpoint" | |
]) | |
File.write!(checkpoint_path, :erlang.term_to_binary(checkpoint)) | |
# Cleanup old checkpoints | |
cleanup_old_checkpoints(state.base_dir) | |
%{state | last_checkpoint: checkpoint.timestamp} | |
end | |
defp cleanup_old_checkpoints(base_dir) do | |
checkpoint_dir = Path.join(base_dir, "checkpoints") | |
cutoff_time = System.system_time(:second) - 3600 # Keep last hour | |
case File.ls(checkpoint_dir) do | |
{:ok, files} -> | |
files | |
|> Enum.filter(&String.ends_with?(&1, ".checkpoint")) | |
|> Enum.sort() | |
|> Enum.drop(-5) # Keep last 5 checkpoints minimum | |
|> Enum.each(fn file -> | |
path = Path.join(checkpoint_dir, file) | |
{timestamp, _} = | |
file | |
|> Path.rootname() | |
|> Integer.parse() | |
if timestamp < cutoff_time do | |
File.rm(path) | |
end | |
end) | |
_ -> :ok | |
end | |
end | |
defp segment_path(base_dir, id), do: Path.join([base_dir, "segments", "#{id}.segment"]) | |
defp index_path(base_dir, id), do: Path.join([base_dir, "indexes", "#{id}.index"]) | |
defp schedule_checkpoint do | |
Process.send_after(self(), :checkpoint, @checkpoint_interval) | |
end | |
defp via_tuple(topic) do | |
{:via, Registry, {Nghenhan.LandingZone.Registry, topic}} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment