Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Created December 18, 2024 11:33
Show Gist options
  • Save monotykamary/3e4039f868c895bfe75efc76be688548 to your computer and use it in GitHub Desktop.
Save monotykamary/3e4039f868c895bfe75efc76be688548 to your computer and use it in GitHub Desktop.
Validator for multi-sourced Hot Landing Zone
defmodule Nghenhan.MarketDataValidator do
@moduledoc """
Validates market data completeness and quality by reading from the landing zone.
Runs as a separate service that can identify data quality issues without
affecting raw data collection.
"""
use GenServer
require Logger
@validation_interval 1_000 # Check every second
@recent_window 2_000 # Last 2 seconds of data
@source_timeout 5_000 # Alert if source silent for 5s
defmodule State do
defstruct [
:topics, # List of topics to monitor
:source_last_seen, # Map of source -> last timestamp
:sequence_trackers # Map of {symbol, source} -> last sequence
]
end
defmodule ValidationReport do
defstruct [
timestamp: nil,
issues: [],
stats: %{}
]
end
# Public API
def start_link(topics, opts \\ []) do
GenServer.start_link(__MODULE__, topics, opts)
end
def get_latest_report do
GenServer.call(__MODULE__, :get_report)
end
# Server Implementation
@impl true
def init(topics) do
schedule_validation()
state = %State{
topics: topics,
source_last_seen: %{},
sequence_trackers: %{}
}
{:ok, state}
end
@impl true
def handle_info(:validate, state) do
now = System.system_time(:millisecond)
window_start = now - @recent_window
# Read recent data from landing zone
validation_data =
state.topics
|> Enum.flat_map(fn topic ->
Nghenhan.LandingZone.read_events(topic, window_start, now)
end)
# Process the data
{new_state, report} = validate_market_data(validation_data, state)
# Log any issues
log_validation_issues(report)
schedule_validation()
{:noreply, new_state}
end
@impl true
def handle_call(:get_report, _from, state) do
now = System.system_time(:millisecond)
window_start = now - @recent_window
validation_data =
state.topics
|> Enum.flat_map(fn topic ->
Nghenhan.LandingZone.read_events(topic, window_start, now)
end)
{_new_state, report} = validate_market_data(validation_data, state)
{:reply, report, state}
end
# Private Functions
defp validate_market_data(events, state) do
now = System.system_time(:millisecond)
# Group events by source and symbol
grouped_events = Enum.group_by(events,
fn e -> {e.source, e.symbol} end)
# Check each source/symbol combination
{sequence_trackers, issues} =
Enum.reduce(grouped_events, {state.sequence_trackers, []},
fn {{source, symbol}, events}, {trackers, issues} ->
check_sequence_gaps(source, symbol, events, trackers, issues)
end)
# Update source last seen times
source_last_seen =
events
|> Enum.group_by(& &1.source)
|> Map.new(fn {source, source_events} ->
{source, Enum.max_by(source_events, & &1.timestamp).timestamp}
end)
# Check for stale sources
stale_issues =
source_last_seen
|> Enum.filter(fn {_source, last_seen} ->
now - last_seen > @source_timeout
end)
|> Enum.map(fn {source, last_seen} ->
{:stale_source, source, now - last_seen}
end)
# Build report
report = %ValidationReport{
timestamp: now,
issues: issues ++ stale_issues,
stats: build_stats(events, grouped_events)
}
new_state = %{state |
source_last_seen: source_last_seen,
sequence_trackers: sequence_trackers
}
{new_state, report}
end
defp check_sequence_gaps(source, symbol, events, trackers, issues) do
# Sort by sequence number
sorted_events = Enum.sort_by(events, & &1.sequence_number)
# Get last known sequence for this source/symbol
tracker_key = {source, symbol}
last_sequence = Map.get(trackers, tracker_key)
# Check for gaps
{new_issues, final_sequence} =
case last_sequence do
nil ->
# First time seeing this source/symbol
{[], List.last(sorted_events).sequence_number}
last_seq ->
first_seq = List.first(sorted_events).sequence_number
if first_seq > last_seq + 1 do
# Gap between last check and now
{[{:sequence_gap, source, symbol, last_seq, first_seq}],
List.last(sorted_events).sequence_number}
else
# Check for gaps within current batch
gaps = find_sequence_gaps(sorted_events)
{gaps, List.last(sorted_events).sequence_number}
end
end
# Update trackers and accumulate issues
{Map.put(trackers, tracker_key, final_sequence),
issues ++ new_issues}
end
defp find_sequence_gaps(events) do
events
|> Enum.chunk_every(2, 1, :discard)
|> Enum.flat_map(fn [e1, e2] ->
gap = e2.sequence_number - e1.sequence_number - 1
if gap > 0 do
[{:sequence_gap, e1.source, e1.symbol,
e1.sequence_number, e2.sequence_number}]
else
[]
end
end)
end
defp build_stats(events, grouped_events) do
%{
total_events: length(events),
events_per_source: map_size(grouped_events),
unique_symbols: events |> Enum.map(& &1.symbol) |> Enum.uniq() |> length(),
event_rate: length(events) / (@recent_window / 1000)
}
end
defp log_validation_issues(%ValidationReport{issues: []}), do: :ok
defp log_validation_issues(%ValidationReport{issues: issues}) do
issues
|> Enum.each(fn
{:sequence_gap, source, symbol, start_seq, end_seq} ->
Logger.warn("Sequence gap detected for #{source}/#{symbol}: #{start_seq} to #{end_seq}")
{:stale_source, source, staleness} ->
Logger.warn("Source #{source} is stale by #{staleness}ms")
end)
end
defp schedule_validation do
Process.send_after(self(), :validate, @validation_interval)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment