Skip to content

Instantly share code, notes, and snippets.

@keithchambers
Created May 2, 2025 13:05
Show Gist options
  • Select an option

  • Save keithchambers/5fc1b47a90dbcbe5d29e4c97cab9c71f to your computer and use it in GitHub Desktop.

Select an option

Save keithchambers/5fc1b47a90dbcbe5d29e4c97cab9c71f to your computer and use it in GitHub Desktop.
OTEL Harrier DQL
// Package genaicleaner provides an OpenTelemetry Collector *processor* that
// sanitises Gen-AI spans in-flight.
//
// The processor performs three operations, all entirely in-memory:
//
// 1. **Validation** – incoming spans are checked against Harrier’s
// *authoritative* allow-lists (derived from OTEP ≥ 1.43).
// 2. **Cleaning / Forking**
// • A *clean* copy is forwarded on the same pipeline and tagged
// `messaging.kafka.destination="input"` so the kafkaexporter can map
// it directly to the `input` topic (no exporter code required).
// • A *dirty* copy (if the span was non-compliant) keeps the illegal
// data, annotates the offending keys in the `reason` attribute, and
// is rerouted to the DLQ topic `input-dlq`.
// 3. **Metrics** – per-span validation latency is emitted via obsreport so
// operators can observe processing overhead.
//
// The processor is *stateless*: horizontal scaling is trivial and no ordering
// guarantees are required because each span is evaluated in isolation.
package genaicleaner
import (
"context"
"strings"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
)
/******** routing constants ********/
const (
kind = "genai_cleaner" // registered processor type
topicAttr = "messaging.kafka.destination" // attribute inspected by kafkaexporter
mainTopic = "input" // canonical topic for clean telemetry
dlqTopic = "input-dlq" // dead-letter topic for rejected spans
reasonKey = "reason" // attribute holding semicolon list of problems
latencyMetric = "genai_cleaner_latency_ms" // custom metric name
metricNS = "genai_cleaner" // obsreport namespace
)
/******** allow-lists built from Harrier schema ********/
/*
allowRes and allowSpan are the single sources of truth for what is
considered “legal” on resources and spans. Updates to Harrier’s schema
should be reflected *only here* – the processor logic is table-driven and
does not need further modification.
*/
// resource-level attributes --------------------------------------------------
var allowRes = map[string]struct{}{
"service.name": {},
"service.namespace": {},
"service.version": {},
"service.instance.id": {},
"deployment.environment": {},
"gen_ai.system": {},
}
// span-level attributes (union of all Gen-AI semantic-convention tables) -----
var allowSpan = map[string]struct{}{
// ---- Agent ----
"gen_ai.agent.id": {},
"gen_ai.agent.name": {},
"gen_ai.agent.version": {},
"gen_ai.agent.description": {},
// ---- Operation / Output ----
"gen_ai.operation.name": {},
"gen_ai.output.type": {},
// ---- Tool ----
"gen_ai.tool.call.id": {},
"gen_ai.tool.name": {},
"gen_ai.tool.type": {},
// ---- Request ----
"gen_ai.request.model": {},
"gen_ai.request.max_tokens": {},
"gen_ai.request.temperature": {},
"gen_ai.request.top_p": {},
"gen_ai.request.top_k": {},
"gen_ai.request.presence_penalty": {},
"gen_ai.request.frequency_penalty": {},
"gen_ai.request.choice.count": {},
"gen_ai.request.encoding_formats": {},
"gen_ai.request.seed": {},
"gen_ai.request.stop_sequences": {},
// ---- Response ----
"gen_ai.response.id": {},
"gen_ai.response.model": {},
"gen_ai.response.finish_reasons": {},
// ---- Usage / Billing ----
"gen_ai.usage.input_tokens": {},
"gen_ai.usage.output_tokens": {},
"gen_ai.token.type": {},
// ---- OpenAI extension ----
"gen_ai.openai.request.service_tier": {},
"gen_ai.openai.response.service_tier": {},
"gen_ai.openai.response.system_fingerprint": {},
// ---- Content ----
"gen_ai.user.message": {},
"gen_ai.assistant.message": {},
// ---- Free-form labels ----
"tags": {},
}
/******** factory ********/
/*
Config is intentionally empty – all behaviour is driven from the compiled-in
allow-lists. A real implementation could add toggles (e.g. “drop dirty
spans instead of DLQ”) later without changing processor wiring.
*/
type Config struct{}
// Validate satisfies the config.Component interface; there is currently
// nothing to validate.
func (c *Config) Validate() error { return nil }
// createDefaultConfig is required by the factory; returns an empty Config.
func createDefaultConfig() component.Config { return &Config{} }
// NewFactory registers the processor so it can be enabled in collector.yml.
func NewFactory() processor.Factory {
return processor.NewFactory(
kind,
createDefaultConfig,
processor.WithTraces(newProcessor, component.StabilityLevelAlpha),
)
}
/******** processor constructor ********/
/*
newProcessor wires the helper that provides lifecycle hooks (Start/Shutdown)
and wraps our `cleanAndRoute` business logic.
*/
func newProcessor(
ctx context.Context,
set processor.CreateSettings,
cfg component.Config,
next consumer.Traces,
) (processor.Traces, error) {
// obsreport gives us free metrics (batches, spans, errors, latency, …)
obs, _ := obsreport.NewProcessor(obsreport.ProcessorSettings{
ProcessorName: kind,
ProcessorID: set.ID,
MetricNamespace: metricNS,
Level: configtelemetry.LevelNormal,
Stability: component.StabilityLevelAlpha,
})
return processorhelper.NewTracesProcessor(
ctx,
set,
cfg,
next,
cleanAndRoute,
processorhelper.WithStart(obs.Start),
processorhelper.WithShutdown(obs.Shutdown),
)
}
/******** core logic ********/
/*
cleanAndRoute iterates through the nested OTLP structure:
ResourceSpans → ScopeSpans → Spans
For each span it:
1. Detects disallowed *events*, *links*, and attributes.
2. Clones a “dirty” span **before** mutation so the original payload is
preserved for debugging in the DLQ.
3. Sanitises the original span in-place and sets the routing attribute.
Complexity:
• O(N) over spans.
• Tiny per-attribute map look-ups only (no allocations except when cloning
dirty spans).
*/
func cleanAndRoute(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
start := time.Now()
// ---- Resource level -----------------------------------------------------
for i := 0; i < td.ResourceSpans().Len(); i++ {
rs := td.ResourceSpans().At(i)
filterAttrs(rs.Resource().Attributes(), allowRes)
// ---- Scope level -----------------------------------------------------
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
spans := scopeSpans.At(j).Spans()
// ---- Span level --------------------------------------------------
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
var reason strings.Builder // collects semicolon-separated violations
issues := false // flag to track if we need a dirty copy
// 1️⃣ events & links are banned by Gen-AI spec -----------------
if span.Events().Len() > 0 || span.Links().Len() > 0 {
reason.WriteString("events_or_links;")
issues = true
}
// 2️⃣ walk attributes and note illegal keys --------------------
var illegal []string
span.Attributes().Range(func(key string, _ pcommon.Value) bool {
if _, ok := allowSpan[key]; !ok {
illegal = append(illegal, key)
reason.WriteString(key + ";")
issues = true
}
return true
})
// 3️⃣ dirty-span fork (cheap append-at-end pattern) ------------
if issues {
dirty := spans.AppendEmpty() // allocate slot
span.CopyTo(dirty) // deep copy contents
dirty.Attributes().PutStr( // record *why* it was rejected
reasonKey,
strings.TrimSuffix(reason.String(), ";"),
)
dirty.Attributes().PutStr(topicAttr, dlqTopic)
}
// 4️⃣ mutate original span to be pristine ----------------------
span.Events().Resize(0)
span.Links().Resize(0)
for _, key := range illegal {
span.Attributes().Remove(key)
}
span.Attributes().PutStr(topicAttr, mainTopic)
}
}
}
// custom latency metric (ms) for SLO dashboards
obsreport.RecordProcessorCustomMetric(
ctx,
latencyMetric,
float64(time.Since(start).Milliseconds()),
)
return td, nil
}
/******** helpers ********/
/*
filterAttrs removes resource attributes that are *not* in the allow-list.
We collect keys first then delete in a second pass to avoid mutation during
iteration (safe but clearer).
*/
func filterAttrs(attrs pcommon.Map, allow map[string]struct{}) {
var drop []string
attrs.Range(func(key string, _ pcommon.Value) bool {
if _, ok := allow[key]; !ok {
drop = append(drop, key)
}
return true
})
for _, key := range drop {
attrs.Remove(key)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment