Created
May 2, 2025 13:05
-
-
Save keithchambers/5fc1b47a90dbcbe5d29e4c97cab9c71f to your computer and use it in GitHub Desktop.
OTEL Harrier DQL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Package genaicleaner provides an OpenTelemetry Collector *processor* that | |
| // sanitises Gen-AI spans in-flight. | |
| // | |
| // The processor performs three operations, all entirely in-memory: | |
| // | |
| // 1. **Validation** – incoming spans are checked against Harrier’s | |
| // *authoritative* allow-lists (derived from OTEP ≥ 1.43). | |
| // 2. **Cleaning / Forking** | |
| // • A *clean* copy is forwarded on the same pipeline and tagged | |
| // `messaging.kafka.destination="input"` so the kafkaexporter can map | |
| // it directly to the `input` topic (no exporter code required). | |
| // • A *dirty* copy (if the span was non-compliant) keeps the illegal | |
| // data, annotates the offending keys in the `reason` attribute, and | |
| // is rerouted to the DLQ topic `input-dlq`. | |
| // 3. **Metrics** – per-span validation latency is emitted via obsreport so | |
| // operators can observe processing overhead. | |
| // | |
| // The processor is *stateless*: horizontal scaling is trivial and no ordering | |
| // guarantees are required because each span is evaluated in isolation. | |
| package genaicleaner | |
| import ( | |
| "context" | |
| "strings" | |
| "time" | |
| "go.opentelemetry.io/collector/component" | |
| "go.opentelemetry.io/collector/config/configtelemetry" | |
| "go.opentelemetry.io/collector/consumer" | |
| "go.opentelemetry.io/collector/obsreport" | |
| "go.opentelemetry.io/collector/pdata/pcommon" | |
| "go.opentelemetry.io/collector/pdata/ptrace" | |
| "go.opentelemetry.io/collector/processor" | |
| "go.opentelemetry.io/collector/processor/processorhelper" | |
| ) | |
| /******** routing constants ********/ | |
| const ( | |
| kind = "genai_cleaner" // registered processor type | |
| topicAttr = "messaging.kafka.destination" // attribute inspected by kafkaexporter | |
| mainTopic = "input" // canonical topic for clean telemetry | |
| dlqTopic = "input-dlq" // dead-letter topic for rejected spans | |
| reasonKey = "reason" // attribute holding semicolon list of problems | |
| latencyMetric = "genai_cleaner_latency_ms" // custom metric name | |
| metricNS = "genai_cleaner" // obsreport namespace | |
| ) | |
| /******** allow-lists built from Harrier schema ********/ | |
| /* | |
| allowRes and allowSpan are the single sources of truth for what is | |
| considered “legal” on resources and spans. Updates to Harrier’s schema | |
| should be reflected *only here* – the processor logic is table-driven and | |
| does not need further modification. | |
| */ | |
| // resource-level attributes -------------------------------------------------- | |
| var allowRes = map[string]struct{}{ | |
| "service.name": {}, | |
| "service.namespace": {}, | |
| "service.version": {}, | |
| "service.instance.id": {}, | |
| "deployment.environment": {}, | |
| "gen_ai.system": {}, | |
| } | |
| // span-level attributes (union of all Gen-AI semantic-convention tables) ----- | |
| var allowSpan = map[string]struct{}{ | |
| // ---- Agent ---- | |
| "gen_ai.agent.id": {}, | |
| "gen_ai.agent.name": {}, | |
| "gen_ai.agent.version": {}, | |
| "gen_ai.agent.description": {}, | |
| // ---- Operation / Output ---- | |
| "gen_ai.operation.name": {}, | |
| "gen_ai.output.type": {}, | |
| // ---- Tool ---- | |
| "gen_ai.tool.call.id": {}, | |
| "gen_ai.tool.name": {}, | |
| "gen_ai.tool.type": {}, | |
| // ---- Request ---- | |
| "gen_ai.request.model": {}, | |
| "gen_ai.request.max_tokens": {}, | |
| "gen_ai.request.temperature": {}, | |
| "gen_ai.request.top_p": {}, | |
| "gen_ai.request.top_k": {}, | |
| "gen_ai.request.presence_penalty": {}, | |
| "gen_ai.request.frequency_penalty": {}, | |
| "gen_ai.request.choice.count": {}, | |
| "gen_ai.request.encoding_formats": {}, | |
| "gen_ai.request.seed": {}, | |
| "gen_ai.request.stop_sequences": {}, | |
| // ---- Response ---- | |
| "gen_ai.response.id": {}, | |
| "gen_ai.response.model": {}, | |
| "gen_ai.response.finish_reasons": {}, | |
| // ---- Usage / Billing ---- | |
| "gen_ai.usage.input_tokens": {}, | |
| "gen_ai.usage.output_tokens": {}, | |
| "gen_ai.token.type": {}, | |
| // ---- OpenAI extension ---- | |
| "gen_ai.openai.request.service_tier": {}, | |
| "gen_ai.openai.response.service_tier": {}, | |
| "gen_ai.openai.response.system_fingerprint": {}, | |
| // ---- Content ---- | |
| "gen_ai.user.message": {}, | |
| "gen_ai.assistant.message": {}, | |
| // ---- Free-form labels ---- | |
| "tags": {}, | |
| } | |
| /******** factory ********/ | |
| /* | |
| Config is intentionally empty – all behaviour is driven from the compiled-in | |
| allow-lists. A real implementation could add toggles (e.g. “drop dirty | |
| spans instead of DLQ”) later without changing processor wiring. | |
| */ | |
| type Config struct{} | |
| // Validate satisfies the config.Component interface; there is currently | |
| // nothing to validate. | |
| func (c *Config) Validate() error { return nil } | |
| // createDefaultConfig is required by the factory; returns an empty Config. | |
| func createDefaultConfig() component.Config { return &Config{} } | |
| // NewFactory registers the processor so it can be enabled in collector.yml. | |
| func NewFactory() processor.Factory { | |
| return processor.NewFactory( | |
| kind, | |
| createDefaultConfig, | |
| processor.WithTraces(newProcessor, component.StabilityLevelAlpha), | |
| ) | |
| } | |
| /******** processor constructor ********/ | |
| /* | |
| newProcessor wires the helper that provides lifecycle hooks (Start/Shutdown) | |
| and wraps our `cleanAndRoute` business logic. | |
| */ | |
| func newProcessor( | |
| ctx context.Context, | |
| set processor.CreateSettings, | |
| cfg component.Config, | |
| next consumer.Traces, | |
| ) (processor.Traces, error) { | |
| // obsreport gives us free metrics (batches, spans, errors, latency, …) | |
| obs, _ := obsreport.NewProcessor(obsreport.ProcessorSettings{ | |
| ProcessorName: kind, | |
| ProcessorID: set.ID, | |
| MetricNamespace: metricNS, | |
| Level: configtelemetry.LevelNormal, | |
| Stability: component.StabilityLevelAlpha, | |
| }) | |
| return processorhelper.NewTracesProcessor( | |
| ctx, | |
| set, | |
| cfg, | |
| next, | |
| cleanAndRoute, | |
| processorhelper.WithStart(obs.Start), | |
| processorhelper.WithShutdown(obs.Shutdown), | |
| ) | |
| } | |
| /******** core logic ********/ | |
| /* | |
| cleanAndRoute iterates through the nested OTLP structure: | |
| ResourceSpans → ScopeSpans → Spans | |
| For each span it: | |
| 1. Detects disallowed *events*, *links*, and attributes. | |
| 2. Clones a “dirty” span **before** mutation so the original payload is | |
| preserved for debugging in the DLQ. | |
| 3. Sanitises the original span in-place and sets the routing attribute. | |
| Complexity: | |
| • O(N) over spans. | |
| • Tiny per-attribute map look-ups only (no allocations except when cloning | |
| dirty spans). | |
| */ | |
| func cleanAndRoute(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { | |
| start := time.Now() | |
| // ---- Resource level ----------------------------------------------------- | |
| for i := 0; i < td.ResourceSpans().Len(); i++ { | |
| rs := td.ResourceSpans().At(i) | |
| filterAttrs(rs.Resource().Attributes(), allowRes) | |
| // ---- Scope level ----------------------------------------------------- | |
| scopeSpans := rs.ScopeSpans() | |
| for j := 0; j < scopeSpans.Len(); j++ { | |
| spans := scopeSpans.At(j).Spans() | |
| // ---- Span level -------------------------------------------------- | |
| for k := 0; k < spans.Len(); k++ { | |
| span := spans.At(k) | |
| var reason strings.Builder // collects semicolon-separated violations | |
| issues := false // flag to track if we need a dirty copy | |
| // 1️⃣ events & links are banned by Gen-AI spec ----------------- | |
| if span.Events().Len() > 0 || span.Links().Len() > 0 { | |
| reason.WriteString("events_or_links;") | |
| issues = true | |
| } | |
| // 2️⃣ walk attributes and note illegal keys -------------------- | |
| var illegal []string | |
| span.Attributes().Range(func(key string, _ pcommon.Value) bool { | |
| if _, ok := allowSpan[key]; !ok { | |
| illegal = append(illegal, key) | |
| reason.WriteString(key + ";") | |
| issues = true | |
| } | |
| return true | |
| }) | |
| // 3️⃣ dirty-span fork (cheap append-at-end pattern) ------------ | |
| if issues { | |
| dirty := spans.AppendEmpty() // allocate slot | |
| span.CopyTo(dirty) // deep copy contents | |
| dirty.Attributes().PutStr( // record *why* it was rejected | |
| reasonKey, | |
| strings.TrimSuffix(reason.String(), ";"), | |
| ) | |
| dirty.Attributes().PutStr(topicAttr, dlqTopic) | |
| } | |
| // 4️⃣ mutate original span to be pristine ---------------------- | |
| span.Events().Resize(0) | |
| span.Links().Resize(0) | |
| for _, key := range illegal { | |
| span.Attributes().Remove(key) | |
| } | |
| span.Attributes().PutStr(topicAttr, mainTopic) | |
| } | |
| } | |
| } | |
| // custom latency metric (ms) for SLO dashboards | |
| obsreport.RecordProcessorCustomMetric( | |
| ctx, | |
| latencyMetric, | |
| float64(time.Since(start).Milliseconds()), | |
| ) | |
| return td, nil | |
| } | |
| /******** helpers ********/ | |
| /* | |
| filterAttrs removes resource attributes that are *not* in the allow-list. | |
| We collect keys first then delete in a second pass to avoid mutation during | |
| iteration (safe but clearer). | |
| */ | |
| func filterAttrs(attrs pcommon.Map, allow map[string]struct{}) { | |
| var drop []string | |
| attrs.Range(func(key string, _ pcommon.Value) bool { | |
| if _, ok := allow[key]; !ok { | |
| drop = append(drop, key) | |
| } | |
| return true | |
| }) | |
| for _, key := range drop { | |
| attrs.Remove(key) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment