Skip to content

Instantly share code, notes, and snippets.

@keithchambers
Last active April 14, 2025 16:02
Show Gist options
  • Save keithchambers/1feb7f2d0e2b91fb870d1bdb372beae0 to your computer and use it in GitHub Desktop.
Save keithchambers/1feb7f2d0e2b91fb870d1bdb372beae0 to your computer and use it in GitHub Desktop.
OTEL Agentic Event Gen
// Package main implements a load tester for sending OpenTelemetry (OTLP) data
// via HTTP JSON or gRPC Protobuf, with an option to output generated data to a
// file instead of sending it over the network.
package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"encoding/json" // Used for both HTTP payload and file output
"flag"
"fmt"
"log"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
// OTLP Collector Service definitions (used for gRPC client creation)
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
// OTLP Protobuf message definitions (used for constructing gRPC requests)
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
logspb "go.opentelemetry.io/proto/otlp/logs/v1"
metricspb "go.opentelemetry.io/proto/otlp/metrics/v1"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
// gRPC libraries for network communication
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
// Protobuf library helper for calculating message size
"google.golang.org/protobuf/proto"
)
// TelemetryPayload represents the internal structure for a batch of generated
// telemetry data (traces, metrics, logs) before potential marshalling or conversion.
type TelemetryPayload struct {
Resource ResourceData `json:"resource"`
ScopeSpans []ScopeSpans `json:"scopeSpans"`
Metrics []Metric `json:"metrics"`
Logs []LogRecord `json:"logs"`
}
// ResourceData holds attributes describing the source of the telemetry.
type ResourceData struct {
Attributes []KeyValue `json:"attributes"`
}
// KeyValue is a generic key-value pair used for attributes.
type KeyValue struct {
Key string `json:"key"`
Value ValueUnion `json:"value"`
}
// ValueUnion holds a value of varying type (string, int, double, bool).
// Uses pointers to facilitate JSON omission of unset fields (omitempty).
type ValueUnion struct {
StringValue *string `json:"stringValue,omitempty"`
IntValue *int64 `json:"intValue,omitempty"`
DoubleValue *float64 `json:"doubleValue,omitempty"`
BoolValue *bool `json:"boolValue,omitempty"`
}
// ScopeSpans groups spans originating from a specific instrumentation scope.
type ScopeSpans struct {
Scope Scope `json:"scope"`
Spans []Span `json:"spans"`
}
// Scope identifies the instrumentation library that produced telemetry.
type Scope struct {
Name string `json:"name"`
Version string `json:"version"`
}
// Span represents a single operation within a trace.
type Span struct {
TraceID string `json:"traceId"`
SpanID string `json:"spanId"`
ParentSpanID string `json:"parentSpanId,omitempty"`
Name string `json:"name"`
StartTimeUnixNano string `json:"startTimeUnixNano"`
EndTimeUnixNano string `json:"endTimeUnixNano"`
Attributes []KeyValue `json:"attributes"`
Events []Event `json:"events"`
Status Status `json:"status"`
Links []Link `json:"links,omitempty"`
}
// Event represents a time-stamped event within a span.
type Event struct {
TimeUnixNano string `json:"timeUnixNano"`
Name string `json:"name"`
Attributes []KeyValue `json:"attributes"`
}
// Status represents the status of a finished span.
type Status struct {
Code string `json:"code"`
Message string `json:"message"`
}
// Link represents a connection to another span.
type Link struct {
TraceID string `json:"traceId"`
Attributes []KeyValue `json:"attributes"`
}
// Metric represents a single metric instrument reading.
type Metric struct {
Name string `json:"name"`
Description string `json:"description"`
Unit string `json:"unit"`
Data MetricData `json:"data"`
}
// MetricData holds the actual data points for a metric (gauge or sum).
type MetricData struct {
DoubleGauge *DoubleGauge `json:"doubleGauge,omitempty"`
IntSum *IntSum `json:"intSum,omitempty"`
}
// DoubleGauge represents a gauge metric with double-precision data points.
type DoubleGauge struct {
DataPoints []DoubleDataPoint `json:"dataPoints"`
}
// DoubleDataPoint represents a single data point for a double gauge.
type DoubleDataPoint struct {
Attributes []KeyValue `json:"attributes"`
TimeUnixNano string `json:"timeUnixNano"`
Value float64 `json:"value"`
}
// IntSum represents a sum metric with integer data points.
type IntSum struct {
IsMonotonic bool `json:"isMonotonic"`
AggregationTemporality string `json:"aggregationTemporality"`
DataPoints []IntDataPoint `json:"dataPoints"`
}
// IntDataPoint represents a single data point for an integer sum.
type IntDataPoint struct {
Attributes []KeyValue `json:"attributes"`
TimeUnixNano string `json:"timeUnixNano"`
Value int64 `json:"value"`
}
// LogRecord represents a single log entry.
type LogRecord struct {
TimeUnixNano string `json:"timeUnixNano"`
SeverityNumber int32 `json:"severityNumber"`
SeverityText string `json:"severityText"`
Name string `json:"name,omitempty"`
Body ValueUnion `json:"body"`
Attributes []KeyValue `json:"attributes"`
}
// Command-line flags controlling the tool's behavior.
var (
// targetHost specifies a label for the target system being tested.
// It's added as a resource attribute and is REQUIRED only in network mode.
targetHost = flag.String("target-host", "", "Target host identifier (e.g., service name, cluster name). **Required unless -out-file is specified.**")
// otelHTTPEndpoint defines the base URL for OTLP/HTTP export. Ignored in file-only mode.
otelHTTPEndpoint = flag.String("http-endpoint", "http://localhost:4318", "OTEL collector HTTP base endpoint (e.g., http://host:port). Ignored if -out-file is set.")
// otelGRPCEndpoint defines the host:port for OTLP/gRPC export. Ignored in file-only mode.
otelGRPCEndpoint = flag.String("grpc-endpoint", "localhost:4317", "OTEL collector gRPC endpoint (host:port). Ignored if -out-file is set.")
// format determines the network protocol (http or grpc). Ignored in file-only mode.
format = flag.String("format", "http", "Transport format: http or grpc. Ignored if -out-file is set.")
// apiKey provides an API key, used for HTTP 'Authorization: Bearer' header
// or potentially as a resource attribute in gRPC/file mode.
apiKey = flag.String("api-key", "test-key-123", "API Key for Authorization header (HTTP) or api.key resource attribute.")
// grpcInsecure controls TLS usage for gRPC connections. Ignored in file-only mode.
grpcInsecure = flag.Bool("grpc-insecure", true, "Use insecure gRPC connection (no TLS). Ignored if -out-file is set.")
// totalEvents sets the total number of payloads to generate (-1 for infinite).
totalEvents = flag.Int("total", 1000, "Total number of telemetry payloads to generate. -1 means infinite.")
// workers defines the number of concurrent goroutines generating/sending data.
workers = flag.Int("workers", runtime.NumCPU(), "Number of concurrent worker goroutines.")
// rateLimit throttles the payload generation rate (payloads per second, 0 for no limit).
rateLimit = flag.Float64("rate", 0, "Target rate limit (payloads per second). 0 means no limit.")
// outFile enables file-only mode; if set, generated data is written as JSON lines
// (with indentation) to this file, and network connections are skipped.
outFile = flag.String("out-file", "", "If specified, run in file-only mode: generate payloads and write as indented JSON lines to this file, skipping network connection.")
// verbose enables more detailed logging, especially for sent/failed requests.
verbose = flag.Bool("verbose", false, "Enable verbose logging.")
)
// Global state variables used across the application.
var (
// httpClient is the shared HTTP client, initialized only in HTTP network mode.
httpClient *http.Client
// grpcConn is the shared gRPC connection, initialized only in gRPC network mode.
grpcConn *grpc.ClientConn
// gRPC service clients, initialized only in gRPC network mode.
traceClient coltracepb.TraceServiceClient
metricsClient colmetricspb.MetricsServiceClient
logsClient collogspb.LogsServiceClient
// wg synchronizes the main goroutine with worker goroutines.
wg sync.WaitGroup
// startTime records the application start time for calculating rates.
startTime time.Time
// Atomic counters for tracking statistics.
successCounter atomic.Uint64 // Payloads successfully sent or written.
errorCounter atomic.Uint64 // Payloads failed to send or write.
bytesCounter atomic.Uint64 // Total bytes successfully sent (protobuf size) or written (indented JSON size).
// outputFile is the file handle used in file-only mode.
outputFile *os.File
// outputFileMutex protects concurrent writes to the output file.
outputFileMutex sync.Mutex
// Shared random number generation source and generator.
randSource = rand.NewSource(time.Now().UnixNano())
rng = rand.New(randSource)
)
// Constants used in the application.
const (
// reportInterval defines how often statistics are printed to the console.
reportInterval = 5 * time.Second
// userAgent identifies this tool in network requests.
userAgent = "otel-load-tester/1.0"
// apiKeyAttrName is the standard OTLP resource attribute name for an API key.
apiKeyAttrName = "api.key"
)
// randomString generates a random hexadecimal string of length n.
func randomString(n int) string {
const hexChars = "0123456789abcdef"
b := make([]byte, n)
for i := range b {
b[i] = hexChars[rng.Intn(len(hexChars))]
}
return string(b)
}
// nowNanoStr returns the current time as nanoseconds since epoch, formatted as a string.
func nowNanoStr() string {
return strconv.FormatInt(time.Now().UnixNano(), 10)
}
// futureNanoStr returns a future time (now + ms milliseconds) as nanoseconds
// since epoch, formatted as a string.
func futureNanoStr(ms int64) string {
return strconv.FormatInt(time.Now().Add(time.Duration(ms)*time.Millisecond).UnixNano(), 10)
}
// strVal wraps a string in a ValueUnion.
func strVal(s string) ValueUnion { return ValueUnion{StringValue: &s} }
// intVal wraps an int64 in a ValueUnion.
func intVal(i int64) ValueUnion { return ValueUnion{IntValue: &i} }
// doubleVal wraps a float64 in a ValueUnion.
func doubleVal(f float64) ValueUnion { return ValueUnion{DoubleValue: &f} }
// boolVal wraps a bool in a ValueUnion.
func boolVal(b bool) ValueUnion { return ValueUnion{BoolValue: &b} }
// generateRandomTelemetry creates a TelemetryPayload struct populated with
// semi-realistic, randomly generated OTLP data (resource, spans, metrics, logs).
func generateRandomTelemetry() *TelemetryPayload {
payload := &TelemetryPayload{}
// Initialize resource attributes, including the target host if provided.
payload.Resource.Attributes = []KeyValue{
{Key: "service.name", Value: strVal("agent-federation-service")},
{Key: "service.instance.id", Value: strVal(fmt.Sprintf("load-tester-%s", randomString(8)))},
{Key: "deployment.environment", Value: strVal("production")},
}
if *targetHost != "" {
payload.Resource.Attributes = append(payload.Resource.Attributes, KeyValue{
Key: "target.host",
Value: strVal(*targetHost),
})
}
if *apiKey != "" {
payload.Resource.Attributes = append(payload.Resource.Attributes, KeyValue{
Key: apiKeyAttrName,
Value: strVal(*apiKey),
})
}
// Generate data for a random number of simulated agents (scopes).
numAgents := rng.Intn(3) + 1
var scopes []ScopeSpans
for i := 0; i < numAgents; i++ {
scopes = append(scopes, generateAgentScope(i))
}
payload.ScopeSpans = scopes
// Generate various metric types based on the generated scopes.
var metrics []Metric
metrics = append(metrics, randomLatencyMetrics(scopes)...)
metrics = append(metrics, randomSuccessCount(scopes)...)
metrics = append(metrics, randomTokenUsage(scopes)...)
errorCount := randomErrorCount(scopes)
if errorCount != nil {
metrics = append(metrics, *errorCount)
}
payload.Metrics = metrics
// Generate logs based on the generated scopes.
payload.Logs = randomLogs(scopes)
return payload
}
// generateAgentScope creates a ScopeSpans structure representing telemetry
// from a single simulated agent, including a root span and a sub-span.
func generateAgentScope(agentIndex int) ScopeSpans {
agents := []struct {
scopeName string
scopeVersion string
modelName string
modelProvider string
agentIDPrefix string
agentName string
userRole string
}{
{"openai-agent", "v2.1.0", "gpt-4", "openai", "openai-run", "email-assistant", "automated-assistant"},
{"llama-agent", "v1.4.0", "LLaMa", "facebook", "llama-run", "data-extractor", "data-analyst"},
{"anthropic-agent", "v1.5.0", "claude-v1.3", "anthropic", "anthropic-run", "summary-generator", "content-curator"},
}
agentData := agents[agentIndex%len(agents)]
traceID := randomString(32)
rootSpanID := randomString(16)
rootSpan := Span{
TraceID: traceID,
SpanID: rootSpanID,
Name: "agent.root." + strings.Split(agentData.scopeName, "-")[0],
StartTimeUnixNano: nowNanoStr(),
EndTimeUnixNano: futureNanoStr(int64(rng.Intn(300) + 300)),
Attributes: []KeyValue{
{Key: "ai.agent.id", Value: strVal(fmt.Sprintf("%s-%d", agentData.agentIDPrefix, rng.Intn(1000)))},
{Key: "ai.model.name", Value: strVal(agentData.modelName)},
{Key: "user.id", Value: strVal(fmt.Sprintf("user-%d", rng.Intn(5000)+1000))},
},
Events: []Event{
{TimeUnixNano: nowNanoStr(), Name: "llm.prompt", Attributes: []KeyValue{{Key: "correlation.id", Value: strVal("corr-" + agentData.agentIDPrefix)}}},
{TimeUnixNano: nowNanoStr(), Name: "llm.response", Attributes: []KeyValue{{Key: "llm.total_tokens", Value: intVal(int64(rng.Intn(200) + 70))}}},
},
Status: Status{Code: "STATUS_CODE_OK", Message: "Success"},
}
subSpan := Span{
TraceID: traceID,
SpanID: randomString(16),
ParentSpanID: rootSpanID,
Name: "agent.subtask.mcp_tool_call",
StartTimeUnixNano: nowNanoStr(),
EndTimeUnixNano: futureNanoStr(int64(rng.Intn(150) + 100)),
Attributes: []KeyValue{
{Key: "ai.tool.name", Value: strVal(fmt.Sprintf("MCP Tool %d", rng.Intn(999)))},
{Key: "correlation.id", Value: strVal("corr-" + agentData.agentIDPrefix)},
},
Events: []Event{
{TimeUnixNano: nowNanoStr(), Name: "tool.call.start"},
{TimeUnixNano: nowNanoStr(), Name: "tool.call.end", Attributes: []KeyValue{{Key: "tool.response_code", Value: intVal(200)}}},
},
Status: Status{Code: "STATUS_CODE_OK", Message: "Tool success"},
}
return ScopeSpans{
Scope: Scope{Name: agentData.scopeName, Version: agentData.scopeVersion},
Spans: []Span{rootSpan, subSpan},
}
}
// randomLatencyMetrics generates agent latency gauge metrics based on the generated spans.
func randomLatencyMetrics(scopes []ScopeSpans) []Metric {
var dps []DoubleDataPoint
for _, sc := range scopes {
agentName, modelName := parseAgentAndModel(sc)
dps = append(dps, DoubleDataPoint{
Attributes: []KeyValue{{Key: "agent.name", Value: strVal(agentName)}, {Key: "model.name", Value: strVal(modelName)}},
TimeUnixNano: nowNanoStr(),
Value: rng.Float64()*400 + 100,
})
}
if len(dps) == 0 {
return nil
}
return []Metric{
{Name: "ai.agent.latency", Description: "Latency", Unit: "ms", Data: MetricData{DoubleGauge: &DoubleGauge{DataPoints: dps}}},
}
}
// randomSuccessCount generates agent success count (sum) metrics.
func randomSuccessCount(scopes []ScopeSpans) []Metric {
var ips []IntDataPoint
for _, sc := range scopes {
agentName, _ := parseAgentAndModel(sc)
ips = append(ips, IntDataPoint{
Attributes: []KeyValue{{Key: "agent.name", Value: strVal(agentName)}},
TimeUnixNano: nowNanoStr(),
Value: 1,
})
}
if len(ips) == 0 {
return nil
}
return []Metric{
{Name: "ai.agent.success_count", Description: "Successes", Unit: "1", Data: MetricData{IntSum: &IntSum{IsMonotonic: true, AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", DataPoints: ips}}},
}
}
// randomTokenUsage generates model token usage (sum) metrics.
func randomTokenUsage(scopes []ScopeSpans) []Metric {
var ips []IntDataPoint
for _, sc := range scopes {
_, modelName := parseAgentAndModel(sc)
ips = append(ips, IntDataPoint{
Attributes: []KeyValue{{Key: "model.name", Value: strVal(modelName)}},
TimeUnixNano: nowNanoStr(),
Value: rng.Int63n(300) + 50,
})
}
if len(ips) == 0 {
return nil
}
return []Metric{
{Name: "ai.model.token_usage", Description: "Tokens", Unit: "tokens", Data: MetricData{IntSum: &IntSum{IsMonotonic: true, AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", DataPoints: ips}}},
}
}
// randomErrorCount occasionally generates an agent error count (sum) metric.
func randomErrorCount(scopes []ScopeSpans) *Metric {
if rng.Intn(5) > 0 {
return nil
}
if len(scopes) == 0 {
return nil
}
sc := scopes[rng.Intn(len(scopes))]
agentName, _ := parseAgentAndModel(sc)
ips := []IntDataPoint{{Attributes: []KeyValue{{Key: "agent.name", Value: strVal(agentName)}}, TimeUnixNano: nowNanoStr(), Value: 1}}
return &Metric{Name: "ai.error.count", Description: "Errors", Unit: "1", Data: MetricData{IntSum: &IntSum{IsMonotonic: true, AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", DataPoints: ips}}}
}
// parseAgentAndModel extracts agent and model names from a ScopeSpans structure,
// falling back to defaults or scope name if specific attributes are missing.
func parseAgentAndModel(sc ScopeSpans) (string, string) {
if len(sc.Spans) == 0 || len(sc.Spans[0].Attributes) == 0 {
return "unknown-agent", "unknown-model"
}
agentName, modelName := "unknown-agent", "unknown-model"
for _, attr := range sc.Spans[0].Attributes {
if attr.Key == "ai.agent.id" && attr.Value.StringValue != nil {
agentName = *attr.Value.StringValue
}
if attr.Key == "ai.model.name" && attr.Value.StringValue != nil {
modelName = *attr.Value.StringValue
}
}
if agentName == "unknown-agent" && sc.Scope.Name != "" {
agentName = sc.Scope.Name
}
return agentName, modelName
}
// randomLogs generates example log records associated with the generated agent scopes.
func randomLogs(scopes []ScopeSpans) []LogRecord {
var logs []LogRecord
for _, sc := range scopes {
agentName, _ := parseAgentAndModel(sc)
corrID := "corr-" + agentName + fmt.Sprintf("-%03d", rng.Intn(1000))
logs = append(logs, LogRecord{
TimeUnixNano: nowNanoStr(),
SeverityNumber: 9, // INFO
SeverityText: "INFO",
Name: "agent.step.started",
Body: strVal(fmt.Sprintf("%s: Step started.", agentName)),
Attributes: []KeyValue{{Key: "step.name", Value: strVal("some-step")}, {Key: "correlation.id", Value: strVal(corrID)}},
})
logs = append(logs, LogRecord{
TimeUnixNano: nowNanoStr(),
SeverityNumber: 9, // INFO
SeverityText: "INFO",
Name: "agent.step.completed",
Body: strVal(fmt.Sprintf("%s: Step completed.", agentName)),
Attributes: []KeyValue{{Key: "step.name", Value: strVal("some-step")}, {Key: "correlation.id", Value: strVal(corrID)}},
})
}
return logs
}
// convertValueUnion converts the internal ValueUnion struct to the OTLP Protobuf AnyValue type.
// This is only required when preparing data for gRPC export.
func convertValueUnion(v ValueUnion) *commonpb.AnyValue {
if v.StringValue != nil {
return &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: *v.StringValue}}
}
if v.IntValue != nil {
return &commonpb.AnyValue{Value: &commonpb.AnyValue_IntValue{IntValue: *v.IntValue}}
}
if v.DoubleValue != nil {
return &commonpb.AnyValue{Value: &commonpb.AnyValue_DoubleValue{DoubleValue: *v.DoubleValue}}
}
if v.BoolValue != nil {
return &commonpb.AnyValue{Value: &commonpb.AnyValue_BoolValue{BoolValue: *v.BoolValue}}
}
return &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: ""}}
}
// convertKeyValues converts a slice of internal KeyValue structs to the OTLP Protobuf KeyValue type.
// This is only required when preparing data for gRPC export.
func convertKeyValues(kvs []KeyValue) []*commonpb.KeyValue {
attrs := make([]*commonpb.KeyValue, 0, len(kvs))
for _, kv := range kvs {
attrs = append(attrs, &commonpb.KeyValue{
Key: kv.Key,
Value: convertValueUnion(kv.Value),
})
}
return attrs
}
// MustParseUnixNano converts a string timestamp (nanoseconds since epoch) to uint64.
// It logs a warning and returns the current time if parsing fails.
func MustParseUnixNano(ts string) uint64 {
val, err := strconv.ParseUint(ts, 10, 64)
if err != nil {
log.Printf("WARN: Failed to parse timestamp '%s': %v. Using current time.", ts, err)
return uint64(time.Now().UnixNano())
}
return val
}
// convertPayloadToProto transforms the internal TelemetryPayload struct into
// the corresponding OTLP Protobuf Export service request messages.
// This function is only called when using the gRPC network mode.
func convertPayloadToProto(payload *TelemetryPayload) (
traceReq *coltracepb.ExportTraceServiceRequest,
metricsReq *colmetricspb.ExportMetricsServiceRequest,
logsReq *collogspb.ExportLogsServiceRequest) {
resourceProto := &resourcepb.Resource{
Attributes: convertKeyValues(payload.Resource.Attributes),
}
// --- Convert Traces ---
resourceSpans := make([]*tracepb.ResourceSpans, 0, 1)
if len(payload.ScopeSpans) > 0 {
scopeSpansProto := make([]*tracepb.ScopeSpans, 0, len(payload.ScopeSpans))
for _, sc := range payload.ScopeSpans {
spansProto := make([]*tracepb.Span, 0, len(sc.Spans))
for _, span := range sc.Spans {
traceIDBytes, _ := hex.DecodeString(span.TraceID)
spanIDBytes, _ := hex.DecodeString(span.SpanID)
var parentSpanIDBytes []byte
if span.ParentSpanID != "" && span.ParentSpanID != "0000000000000000" {
parentSpanIDBytes, _ = hex.DecodeString(span.ParentSpanID)
}
eventsProto := make([]*tracepb.Span_Event, 0, len(span.Events))
for _, ev := range span.Events {
eventsProto = append(eventsProto, &tracepb.Span_Event{
TimeUnixNano: MustParseUnixNano(ev.TimeUnixNano),
Name: ev.Name,
Attributes: convertKeyValues(ev.Attributes),
})
}
linksProto := make([]*tracepb.Span_Link, 0, len(span.Links))
for _, link := range span.Links {
linkTraceIDBytes, _ := hex.DecodeString(link.TraceID)
linksProto = append(linksProto, &tracepb.Span_Link{
TraceId: linkTraceIDBytes,
Attributes: convertKeyValues(link.Attributes),
})
}
var statusCode tracepb.Status_StatusCode
switch span.Status.Code {
case "STATUS_CODE_OK":
statusCode = tracepb.Status_STATUS_CODE_OK
case "STATUS_CODE_ERROR":
statusCode = tracepb.Status_STATUS_CODE_ERROR
default:
statusCode = tracepb.Status_STATUS_CODE_UNSET
}
spansProto = append(spansProto, &tracepb.Span{
TraceId: traceIDBytes,
SpanId: spanIDBytes,
ParentSpanId: parentSpanIDBytes,
Name: span.Name,
Kind: tracepb.Span_SPAN_KIND_INTERNAL,
StartTimeUnixNano: MustParseUnixNano(span.StartTimeUnixNano),
EndTimeUnixNano: MustParseUnixNano(span.EndTimeUnixNano),
Attributes: convertKeyValues(span.Attributes),
Events: eventsProto,
Links: linksProto,
Status: &tracepb.Status{Code: statusCode, Message: span.Status.Message},
})
}
scopeSpansProto = append(scopeSpansProto, &tracepb.ScopeSpans{
Scope: &commonpb.InstrumentationScope{Name: sc.Scope.Name, Version: sc.Scope.Version},
Spans: spansProto,
})
}
resourceSpans = append(resourceSpans, &tracepb.ResourceSpans{
Resource: resourceProto,
ScopeSpans: scopeSpansProto,
})
}
traceReq = &coltracepb.ExportTraceServiceRequest{ResourceSpans: resourceSpans}
// --- Convert Metrics ---
resourceMetrics := make([]*metricspb.ResourceMetrics, 0, 1)
if len(payload.Metrics) > 0 {
scopeMetricsProto := make([]*metricspb.ScopeMetrics, 0, 1)
metricsProto := make([]*metricspb.Metric, 0, len(payload.Metrics))
for _, m := range payload.Metrics {
metricPb := &metricspb.Metric{
Name: m.Name,
Description: m.Description,
Unit: m.Unit,
}
if m.Data.DoubleGauge != nil {
dps := make([]*metricspb.NumberDataPoint, 0, len(m.Data.DoubleGauge.DataPoints))
for _, dp := range m.Data.DoubleGauge.DataPoints {
dps = append(dps, &metricspb.NumberDataPoint{
Attributes: convertKeyValues(dp.Attributes),
TimeUnixNano: MustParseUnixNano(dp.TimeUnixNano),
StartTimeUnixNano: MustParseUnixNano(dp.TimeUnixNano),
Value: &metricspb.NumberDataPoint_AsDouble{AsDouble: dp.Value},
})
}
metricPb.Data = &metricspb.Metric_Gauge{Gauge: &metricspb.Gauge{DataPoints: dps}}
} else if m.Data.IntSum != nil {
dps := make([]*metricspb.NumberDataPoint, 0, len(m.Data.IntSum.DataPoints))
for _, dp := range m.Data.IntSum.DataPoints {
dps = append(dps, &metricspb.NumberDataPoint{
Attributes: convertKeyValues(dp.Attributes),
TimeUnixNano: MustParseUnixNano(dp.TimeUnixNano),
StartTimeUnixNano: 0,
Value: &metricspb.NumberDataPoint_AsInt{AsInt: dp.Value},
})
}
var aggTemporality metricspb.AggregationTemporality
switch m.Data.IntSum.AggregationTemporality {
case "AGGREGATION_TEMPORALITY_CUMULATIVE":
aggTemporality = metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE
case "AGGREGATION_TEMPORALITY_DELTA":
aggTemporality = metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
default:
aggTemporality = metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED
}
metricPb.Data = &metricspb.Metric_Sum{Sum: &metricspb.Sum{
IsMonotonic: m.Data.IntSum.IsMonotonic,
AggregationTemporality: aggTemporality,
DataPoints: dps,
}}
}
if metricPb.Data != nil {
metricsProto = append(metricsProto, metricPb)
}
}
if len(metricsProto) > 0 {
scopeMetricsProto = append(scopeMetricsProto, &metricspb.ScopeMetrics{
Scope: &commonpb.InstrumentationScope{Name: "otel-load-tester"},
Metrics: metricsProto,
})
resourceMetrics = append(resourceMetrics, &metricspb.ResourceMetrics{
Resource: resourceProto,
ScopeMetrics: scopeMetricsProto,
})
}
}
metricsReq = &colmetricspb.ExportMetricsServiceRequest{ResourceMetrics: resourceMetrics}
// --- Convert Logs ---
resourceLogs := make([]*logspb.ResourceLogs, 0, 1)
if len(payload.Logs) > 0 {
scopeLogsProto := make([]*logspb.ScopeLogs, 0, 1)
logsRecordsProto := make([]*logspb.LogRecord, 0, len(payload.Logs))
for _, logr := range payload.Logs {
logsRecordsProto = append(logsRecordsProto, &logspb.LogRecord{
TimeUnixNano: MustParseUnixNano(logr.TimeUnixNano),
ObservedTimeUnixNano: uint64(time.Now().UnixNano()),
SeverityNumber: logspb.SeverityNumber(logr.SeverityNumber),
SeverityText: logr.SeverityText,
Body: convertValueUnion(logr.Body),
Attributes: convertKeyValues(logr.Attributes),
})
}
if len(logsRecordsProto) > 0 {
scopeLogsProto = append(scopeLogsProto, &logspb.ScopeLogs{
Scope: &commonpb.InstrumentationScope{Name: "otel-load-tester"},
LogRecords: logsRecordsProto,
})
resourceLogs = append(resourceLogs, &logspb.ResourceLogs{
Resource: resourceProto,
ScopeLogs: scopeLogsProto,
})
}
}
logsReq = &collogspb.ExportLogsServiceRequest{ResourceLogs: resourceLogs}
return traceReq, metricsReq, logsReq
}
// sendHTTP attempts to send the generated telemetry payload to the configured
// OTLP/HTTP endpoint.
// This function is only called when in HTTP network mode.
func sendHTTP(ctx context.Context, payload *TelemetryPayload) error {
traceURL := strings.TrimSuffix(*otelHTTPEndpoint, "/") + "/v1/traces"
metricsURL := strings.TrimSuffix(*otelHTTPEndpoint, "/") + "/v1/metrics"
logsURL := strings.TrimSuffix(*otelHTTPEndpoint, "/") + "/v1/logs"
var firstError error
var totalBytes int
// Send Traces
if len(payload.ScopeSpans) > 0 {
jsonData, err := json.Marshal(map[string]interface{}{"resourceSpans": []map[string]interface{}{{"resource": payload.Resource, "scopeSpans": payload.ScopeSpans}}})
if err == nil {
err = postJSON(ctx, traceURL, jsonData)
if err == nil {
totalBytes += len(jsonData)
}
}
if err != nil && firstError == nil {
firstError = fmt.Errorf("traces: %w", err)
}
if *verbose && err == nil {
log.Printf("HTTP Trace Sent OK")
}
}
// Send Metrics
if len(payload.Metrics) > 0 {
scopeMetrics := map[string]interface{}{"scope": map[string]interface{}{}, "metrics": payload.Metrics}
jsonData, err := json.Marshal(map[string]interface{}{"resourceMetrics": []map[string]interface{}{{"resource": payload.Resource, "scopeMetrics": []map[string]interface{}{scopeMetrics}}}})
if err == nil {
err = postJSON(ctx, metricsURL, jsonData)
if err == nil {
totalBytes += len(jsonData)
}
}
if err != nil && firstError == nil {
firstError = fmt.Errorf("metrics: %w", err)
}
if *verbose && err == nil {
log.Printf("HTTP Metrics Sent OK")
}
}
// Send Logs
if len(payload.Logs) > 0 {
scopeLogs := map[string]interface{}{"scope": map[string]interface{}{}, "logRecords": payload.Logs}
jsonData, err := json.Marshal(map[string]interface{}{"resourceLogs": []map[string]interface{}{{"resource": payload.Resource, "scopeLogs": []map[string]interface{}{scopeLogs}}}})
if err == nil {
err = postJSON(ctx, logsURL, jsonData)
if err == nil {
totalBytes += len(jsonData)
}
}
if err != nil && firstError == nil {
firstError = fmt.Errorf("logs: %w", err)
}
if *verbose && err == nil {
log.Printf("HTTP Logs Sent OK")
}
}
// Update global counters (success counter updated by caller worker on no error).
if firstError == nil {
// successCounter.Add(1) // Now handled by worker
bytesCounter.Add(uint64(totalBytes))
} else if *verbose {
log.Printf("HTTP Send Batch FAILED (first error): %v", firstError)
}
return firstError
}
// postJSON performs an HTTP POST request with the given JSON data to the specified URL.
func postJSON(ctx context.Context, url string, data []byte) error {
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", userAgent)
if *apiKey != "" {
req.Header.Set("Authorization", "Bearer "+*apiKey)
}
resp, err := httpClient.Do(req)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("http do: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("http status %d %s", resp.StatusCode, resp.Status)
}
return nil
}
// sendGRPC attempts to send the generated telemetry payload to the configured
// OTLP/gRPC endpoint.
// This function is only called when in gRPC network mode.
func sendGRPC(ctx context.Context, payload *TelemetryPayload) error {
traceReq, metricsReq, logsReq := convertPayloadToProto(payload)
var firstError error
var totalBytes int
callCtx, callCancel := context.WithTimeout(ctx, 10*time.Second)
defer callCancel()
// Send Traces
if len(traceReq.ResourceSpans) > 0 {
_, err := traceClient.Export(callCtx, traceReq)
if err != nil && firstError == nil {
firstError = fmt.Errorf("traces export: %w", err)
} else if err == nil {
totalBytes += proto.Size(traceReq)
if *verbose {
log.Printf("gRPC Trace Sent OK")
}
}
}
// Send Metrics
if len(metricsReq.ResourceMetrics) > 0 {
_, err := metricsClient.Export(callCtx, metricsReq)
if err != nil && firstError == nil {
firstError = fmt.Errorf("metrics export: %w", err)
} else if err == nil {
totalBytes += proto.Size(metricsReq)
if *verbose {
log.Printf("gRPC Metrics Sent OK")
}
}
}
// Send Logs
if len(logsReq.ResourceLogs) > 0 {
_, err := logsClient.Export(callCtx, logsReq)
if err != nil && firstError == nil {
firstError = fmt.Errorf("logs export: %w", err)
} else if err == nil {
totalBytes += proto.Size(logsReq)
if *verbose {
log.Printf("gRPC Logs Sent OK")
}
}
}
if ctx.Err() != nil {
return ctx.Err()
}
// Update global counters (success counter updated by caller worker on no error).
if firstError == nil {
// successCounter.Add(1) // Now handled by worker
bytesCounter.Add(uint64(totalBytes))
} else if *verbose {
log.Printf("gRPC Send Batch FAILED (first error): %v", firstError)
}
return firstError
}
// printStats calculates and prints the current performance statistics.
func printStats(final bool) {
endTime := time.Now()
duration := endTime.Sub(startTime)
if duration <= 0 {
return
}
sucCount := successCounter.Load()
errCount := errorCounter.Load()
byteCount := bytesCounter.Load()
totalCount := sucCount + errCount
rate := float64(totalCount) / duration.Seconds()
byteRate := float64(byteCount) / duration.Seconds()
status := "Running"
if final {
status = "Final"
}
mode := "Network"
if *outFile != "" {
mode = "File-Only"
}
fmt.Printf("[%s] Mode: %s | Time: %s | Total Generated: %d | Success (Sent/Written): %d | Errors: %d | Rate: %.2f req/s | Throughput: %.2f KB/s\n",
status,
mode,
duration.Round(time.Second),
totalCount,
sucCount,
errCount,
rate,
byteRate/1024.0,
)
}
// main is the application entry point.
func main() {
flag.Parse()
startTime = time.Now()
// Determine operating mode based on -out-file flag.
isFileOnlyMode := (*outFile != "")
// Enforce -target-host only required if *not* in file-only mode.
if !isFileOnlyMode && *targetHost == "" {
fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
flag.PrintDefaults()
log.Fatal("Error: -target-host flag is required when not using -out-file.")
}
// Setup context for graceful shutdown on signals.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\nReceived shutdown signal...")
cancel()
}()
// --- Mode-Specific Setup ---
isHTTP := false // Only relevant for network mode.
if isFileOnlyMode {
// Configure for writing generated payloads to a file.
log.Printf(">>> Running in FILE-ONLY mode. Outputting generated JSON payloads to: %s <<<", *outFile)
if *targetHost != "" {
log.Printf("Target Host specified (for resource attributes): %s", *targetHost)
} else {
log.Printf("Target Host not specified (optional in file-only mode).")
}
var err error
outputFile, err = os.Create(*outFile) // Create or overwrite the output file.
if err != nil {
log.Fatalf("Failed to open out-file '%s': %v", *outFile, err)
}
defer outputFile.Close() // Ensure file closure on exit.
// Warn if network flags were provided, as they are ignored.
if *otelHTTPEndpoint != "http://localhost:4318" {
log.Printf("WARN: -http-endpoint provided but ignored in file-only mode.")
}
if *otelGRPCEndpoint != "localhost:4317" {
log.Printf("WARN: -grpc-endpoint provided but ignored in file-only mode.")
}
if *format != "http" {
log.Printf("WARN: -format provided but ignored in file-only mode.")
}
if *grpcInsecure != true {
log.Printf("WARN: -grpc-insecure provided but ignored in file-only mode.")
}
} else {
// Configure for sending payloads over the network.
log.Printf(">>> Running in NETWORK mode. Target Host: %s <<<", *targetHost)
isHTTP = strings.ToLower(*format) == "http"
if isHTTP {
// Initialize shared HTTP client for sending OTLP/JSON.
log.Printf("Network Format: HTTP. Sending telemetry to %s", *otelHTTPEndpoint)
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: *workers * 2,
MaxIdleConnsPerHost: *workers,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
httpClient = &http.Client{
Transport: transport,
Timeout: 15 * time.Second,
}
} else {
// Initialize shared gRPC client for sending OTLP/Protobuf.
log.Printf("Network Format: gRPC.")
var dialOpts []grpc.DialOption
if *grpcInsecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
log.Printf("Using insecure gRPC connection.")
} else {
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
log.Printf("Using secure gRPC connection (TLS).")
}
dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent))
// Prepare target address for gRPC dialing.
var grpcTarget string
parsedURL, err := url.Parse(*otelGRPCEndpoint)
if err == nil && (parsedURL.Scheme == "http" || parsedURL.Scheme == "https") {
grpcTarget = parsedURL.Host
} else if err == nil && parsedURL.Scheme != "" {
grpcTarget = *otelGRPCEndpoint
} else {
grpcTarget = *otelGRPCEndpoint
}
if !strings.Contains(grpcTarget, ":") && !strings.Contains(grpcTarget, "/") {
grpcTarget += ":4317" // Append default port if missing
}
log.Printf("Dialing gRPC endpoint: %s (Insecure Flag: %v)", grpcTarget, *grpcInsecure)
dialCtx, dialCancel := context.WithTimeout(ctx, 10*time.Second)
var grpcErr error
grpcConn, grpcErr = grpc.DialContext(dialCtx, grpcTarget, dialOpts...)
dialCancel()
if grpcErr != nil {
log.Fatalf("Failed to dial gRPC server at %s: %v", grpcTarget, grpcErr)
}
// Setup graceful closure of the connection on context cancellation.
go func() {
<-ctx.Done()
log.Println("Shutting down gRPC connection...")
if grpcConn != nil {
grpcConn.Close()
}
}()
// Create OTLP service clients from the connection.
traceClient = coltracepb.NewTraceServiceClient(grpcConn)
metricsClient = colmetricspb.NewMetricsServiceClient(grpcConn)
logsClient = collogspb.NewLogsServiceClient(grpcConn)
log.Printf("Sending telemetry via gRPC to %s", grpcTarget)
}
}
// --- Setup Rate Limiter ---
var limiter <-chan time.Time
if *rateLimit > 0 {
if *rateLimit < 1e-9 {
log.Printf("WARN: Rate limit %.2f req/s is too low, running unlimited.", *rateLimit)
} else {
interval := time.Duration(1e9 / *rateLimit) * time.Nanosecond
if interval <= 0 {
log.Printf("WARN: Rate limit %.2f req/s is too high, running unlimited.", *rateLimit)
} else {
ticker := time.NewTicker(interval)
limiter = ticker.C
defer ticker.Stop() // Ensure ticker resources are released
log.Printf("Rate limiting enabled: interval %s (~%.2f req/s)", interval, *rateLimit)
}
}
}
// --- Start Reporting Goroutine ---
reportTicker := time.NewTicker(reportInterval)
defer reportTicker.Stop() // Ensure ticker resources are released
go func() {
for {
select {
case <-reportTicker.C:
printStats(false) // Periodically print stats
case <-ctx.Done():
log.Println("Stopping reporter.")
return // Exit on main context cancellation
}
}
}()
// --- Start Workers ---
// eventsCh acts as a work queue and semaphore for workers.
eventsCh := make(chan struct{}, *workers)
log.Printf("Starting %d workers, Target Payloads: %d", *workers, *totalEvents)
// --- Producer Goroutine ---
// Responsible for pushing work signals onto eventsCh, respecting rate limits and total count.
wg.Add(1)
go func() {
defer wg.Done()
defer close(eventsCh) // Signal workers that no more work is coming
count := 0
for {
// Check for shutdown signal *before* potentially blocking operations.
select {
case <-ctx.Done():
log.Println("Producer stopping due to context cancellation.")
return
default: // Non-blocking check
}
// Check if total event limit reached.
if *totalEvents != -1 && count >= *totalEvents {
log.Println("Producer reached total events limit.")
return
}
// Wait for rate limiter if enabled.
if limiter != nil {
select {
case <-limiter: // Wait for tick
case <-ctx.Done():
log.Println("Producer stopping during rate limit wait.")
return
}
}
// Send work signal to a worker; blocks if channel buffer is full.
select {
case eventsCh <- struct{}{}:
count++
case <-ctx.Done():
log.Println("Producer stopping while trying to send to worker.")
return
}
}
}()
// --- Consumer (Worker) Goroutines ---
// Each worker pulls work signals from eventsCh, generates data, and processes it (sends or writes).
for w := 0; w < *workers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done() // Signal completion when exiting
// Process work items from the channel until it's closed.
for range eventsCh {
// Check for shutdown signal *before* doing work.
select {
case <-ctx.Done():
return // Context cancelled, exit worker
default: // Non-blocking check
}
// Generate the telemetry data payload.
payload := generateRandomTelemetry()
var opErr error // Store error from send/write operation
// Execute logic based on the operating mode (File-Only or Network).
if isFileOnlyMode {
// Marshal payload to indented JSON.
jsonData, err := json.MarshalIndent(payload, "", " ") // Use MarshalIndent for pretty printing
if err != nil {
opErr = fmt.Errorf("failed to marshal payload to JSON: %w", err)
} else {
// Write JSON data to the file, protected by a mutex.
outputFileMutex.Lock()
_, writeErr := outputFile.Write(jsonData)
if writeErr == nil {
_, writeErr = outputFile.WriteString("\n") // Add newline separator
}
outputFileMutex.Unlock()
opErr = writeErr // Store potential write error
}
// Update counters if the write was successful.
if opErr == nil {
successCounter.Add(1)
bytesCounter.Add(uint64(len(jsonData))) // Track indented JSON size
// Avoid excessive logging in verbose mode for file writes.
}
} else {
// Send payload over the network (HTTP or gRPC).
if isHTTP {
opErr = sendHTTP(ctx, payload)
} else {
opErr = sendGRPC(ctx, payload)
}
// Success/byte counters are updated within send functions in network mode.
}
// Handle errors that occurred during the operation.
if opErr != nil {
// If the context was cancelled *during* the operation, don't count it as a failure.
if ctx.Err() != nil {
return // Exit worker cleanly on cancellation
}
// Otherwise, increment the error counter.
errorCounter.Add(1)
if *verbose {
log.Printf("Worker %d ERROR: %v", workerID, opErr)
}
}
} // Worker loop ends when eventsCh is closed
}(w)
}
// Wait for the producer and all workers to finish their execution.
wg.Wait()
// Print the final summary statistics.
log.Println("All workers finished.")
printStats(true)
log.Println("Shutdown complete.")
}
@keithchambers
Copy link
Author

First commit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment