Created
December 18, 2024 11:33
-
-
Save monotykamary/3e4039f868c895bfe75efc76be688548 to your computer and use it in GitHub Desktop.
Validator for multi-sourced Hot Landing Zone
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Nghenhan.MarketDataValidator do | |
@moduledoc """ | |
Validates market data completeness and quality by reading from the landing zone. | |
Runs as a separate service that can identify data quality issues without | |
affecting raw data collection. | |
""" | |
use GenServer | |
require Logger | |
@validation_interval 1_000 # Check every second | |
@recent_window 2_000 # Last 2 seconds of data | |
@source_timeout 5_000 # Alert if source silent for 5s | |
defmodule State do | |
defstruct [ | |
:topics, # List of topics to monitor | |
:source_last_seen, # Map of source -> last timestamp | |
:sequence_trackers # Map of {symbol, source} -> last sequence | |
] | |
end | |
defmodule ValidationReport do | |
defstruct [ | |
timestamp: nil, | |
issues: [], | |
stats: %{} | |
] | |
end | |
# Public API | |
def start_link(topics, opts \\ []) do | |
GenServer.start_link(__MODULE__, topics, opts) | |
end | |
def get_latest_report do | |
GenServer.call(__MODULE__, :get_report) | |
end | |
# Server Implementation | |
@impl true | |
def init(topics) do | |
schedule_validation() | |
state = %State{ | |
topics: topics, | |
source_last_seen: %{}, | |
sequence_trackers: %{} | |
} | |
{:ok, state} | |
end | |
@impl true | |
def handle_info(:validate, state) do | |
now = System.system_time(:millisecond) | |
window_start = now - @recent_window | |
# Read recent data from landing zone | |
validation_data = | |
state.topics | |
|> Enum.flat_map(fn topic -> | |
Nghenhan.LandingZone.read_events(topic, window_start, now) | |
end) | |
# Process the data | |
{new_state, report} = validate_market_data(validation_data, state) | |
# Log any issues | |
log_validation_issues(report) | |
schedule_validation() | |
{:noreply, new_state} | |
end | |
@impl true | |
def handle_call(:get_report, _from, state) do | |
now = System.system_time(:millisecond) | |
window_start = now - @recent_window | |
validation_data = | |
state.topics | |
|> Enum.flat_map(fn topic -> | |
Nghenhan.LandingZone.read_events(topic, window_start, now) | |
end) | |
{_new_state, report} = validate_market_data(validation_data, state) | |
{:reply, report, state} | |
end | |
# Private Functions | |
defp validate_market_data(events, state) do | |
now = System.system_time(:millisecond) | |
# Group events by source and symbol | |
grouped_events = Enum.group_by(events, | |
fn e -> {e.source, e.symbol} end) | |
# Check each source/symbol combination | |
{sequence_trackers, issues} = | |
Enum.reduce(grouped_events, {state.sequence_trackers, []}, | |
fn {{source, symbol}, events}, {trackers, issues} -> | |
check_sequence_gaps(source, symbol, events, trackers, issues) | |
end) | |
# Update source last seen times | |
source_last_seen = | |
events | |
|> Enum.group_by(& &1.source) | |
|> Map.new(fn {source, source_events} -> | |
{source, Enum.max_by(source_events, & &1.timestamp).timestamp} | |
end) | |
# Check for stale sources | |
stale_issues = | |
source_last_seen | |
|> Enum.filter(fn {_source, last_seen} -> | |
now - last_seen > @source_timeout | |
end) | |
|> Enum.map(fn {source, last_seen} -> | |
{:stale_source, source, now - last_seen} | |
end) | |
# Build report | |
report = %ValidationReport{ | |
timestamp: now, | |
issues: issues ++ stale_issues, | |
stats: build_stats(events, grouped_events) | |
} | |
new_state = %{state | | |
source_last_seen: source_last_seen, | |
sequence_trackers: sequence_trackers | |
} | |
{new_state, report} | |
end | |
defp check_sequence_gaps(source, symbol, events, trackers, issues) do | |
# Sort by sequence number | |
sorted_events = Enum.sort_by(events, & &1.sequence_number) | |
# Get last known sequence for this source/symbol | |
tracker_key = {source, symbol} | |
last_sequence = Map.get(trackers, tracker_key) | |
# Check for gaps | |
{new_issues, final_sequence} = | |
case last_sequence do | |
nil -> | |
# First time seeing this source/symbol | |
{[], List.last(sorted_events).sequence_number} | |
last_seq -> | |
first_seq = List.first(sorted_events).sequence_number | |
if first_seq > last_seq + 1 do | |
# Gap between last check and now | |
{[{:sequence_gap, source, symbol, last_seq, first_seq}], | |
List.last(sorted_events).sequence_number} | |
else | |
# Check for gaps within current batch | |
gaps = find_sequence_gaps(sorted_events) | |
{gaps, List.last(sorted_events).sequence_number} | |
end | |
end | |
# Update trackers and accumulate issues | |
{Map.put(trackers, tracker_key, final_sequence), | |
issues ++ new_issues} | |
end | |
defp find_sequence_gaps(events) do | |
events | |
|> Enum.chunk_every(2, 1, :discard) | |
|> Enum.flat_map(fn [e1, e2] -> | |
gap = e2.sequence_number - e1.sequence_number - 1 | |
if gap > 0 do | |
[{:sequence_gap, e1.source, e1.symbol, | |
e1.sequence_number, e2.sequence_number}] | |
else | |
[] | |
end | |
end) | |
end | |
defp build_stats(events, grouped_events) do | |
%{ | |
total_events: length(events), | |
events_per_source: map_size(grouped_events), | |
unique_symbols: events |> Enum.map(& &1.symbol) |> Enum.uniq() |> length(), | |
event_rate: length(events) / (@recent_window / 1000) | |
} | |
end | |
defp log_validation_issues(%ValidationReport{issues: []}), do: :ok | |
defp log_validation_issues(%ValidationReport{issues: issues}) do | |
issues | |
|> Enum.each(fn | |
{:sequence_gap, source, symbol, start_seq, end_seq} -> | |
Logger.warn("Sequence gap detected for #{source}/#{symbol}: #{start_seq} to #{end_seq}") | |
{:stale_source, source, staleness} -> | |
Logger.warn("Source #{source} is stale by #{staleness}ms") | |
end) | |
end | |
defp schedule_validation do | |
Process.send_after(self(), :validate, @validation_interval) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment