Last active
April 14, 2025 16:02
-
-
Save keithchambers/1feb7f2d0e2b91fb870d1bdb372beae0 to your computer and use it in GitHub Desktop.
OTEL Agentic Event Gen
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package main implements a load tester for sending OpenTelemetry (OTLP) data | |
// via HTTP JSON or gRPC Protobuf, with an option to output generated data to a | |
// file instead of sending it over the network. | |
package main | |
import ( | |
"bytes" | |
"context" | |
"crypto/tls" | |
"encoding/hex" | |
"encoding/json" // Used for both HTTP payload and file output | |
"flag" | |
"fmt" | |
"log" | |
"math/rand" | |
"net" | |
"net/http" | |
"net/url" | |
"os" | |
"os/signal" | |
"runtime" | |
"strconv" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
// OTLP Collector Service definitions (used for gRPC client creation) | |
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" | |
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" | |
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" | |
// OTLP Protobuf message definitions (used for constructing gRPC requests) | |
commonpb "go.opentelemetry.io/proto/otlp/common/v1" | |
logspb "go.opentelemetry.io/proto/otlp/logs/v1" | |
metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" | |
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" | |
tracepb "go.opentelemetry.io/proto/otlp/trace/v1" | |
// gRPC libraries for network communication | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/credentials" | |
"google.golang.org/grpc/credentials/insecure" | |
// Protobuf library helper for calculating message size | |
"google.golang.org/protobuf/proto" | |
) | |
// TelemetryPayload represents the internal structure for a batch of generated | |
// telemetry data (traces, metrics, logs) before potential marshalling or conversion. | |
type TelemetryPayload struct { | |
Resource ResourceData `json:"resource"` | |
ScopeSpans []ScopeSpans `json:"scopeSpans"` | |
Metrics []Metric `json:"metrics"` | |
Logs []LogRecord `json:"logs"` | |
} | |
// ResourceData holds attributes describing the source of the telemetry. | |
type ResourceData struct { | |
Attributes []KeyValue `json:"attributes"` | |
} | |
// KeyValue is a generic key-value pair used for attributes. | |
type KeyValue struct { | |
Key string `json:"key"` | |
Value ValueUnion `json:"value"` | |
} | |
// ValueUnion holds a value of varying type (string, int, double, bool). | |
// Uses pointers to facilitate JSON omission of unset fields (omitempty). | |
type ValueUnion struct { | |
StringValue *string `json:"stringValue,omitempty"` | |
IntValue *int64 `json:"intValue,omitempty"` | |
DoubleValue *float64 `json:"doubleValue,omitempty"` | |
BoolValue *bool `json:"boolValue,omitempty"` | |
} | |
// ScopeSpans groups spans originating from a specific instrumentation scope. | |
type ScopeSpans struct { | |
Scope Scope `json:"scope"` | |
Spans []Span `json:"spans"` | |
} | |
// Scope identifies the instrumentation library that produced telemetry. | |
type Scope struct { | |
Name string `json:"name"` | |
Version string `json:"version"` | |
} | |
// Span represents a single operation within a trace. | |
type Span struct { | |
TraceID string `json:"traceId"` | |
SpanID string `json:"spanId"` | |
ParentSpanID string `json:"parentSpanId,omitempty"` | |
Name string `json:"name"` | |
StartTimeUnixNano string `json:"startTimeUnixNano"` | |
EndTimeUnixNano string `json:"endTimeUnixNano"` | |
Attributes []KeyValue `json:"attributes"` | |
Events []Event `json:"events"` | |
Status Status `json:"status"` | |
Links []Link `json:"links,omitempty"` | |
} | |
// Event represents a time-stamped event within a span. | |
type Event struct { | |
TimeUnixNano string `json:"timeUnixNano"` | |
Name string `json:"name"` | |
Attributes []KeyValue `json:"attributes"` | |
} | |
// Status represents the status of a finished span. | |
type Status struct { | |
Code string `json:"code"` | |
Message string `json:"message"` | |
} | |
// Link represents a connection to another span. | |
type Link struct { | |
TraceID string `json:"traceId"` | |
Attributes []KeyValue `json:"attributes"` | |
} | |
// Metric represents a single metric instrument reading. | |
type Metric struct { | |
Name string `json:"name"` | |
Description string `json:"description"` | |
Unit string `json:"unit"` | |
Data MetricData `json:"data"` | |
} | |
// MetricData holds the actual data points for a metric (gauge or sum). | |
type MetricData struct { | |
DoubleGauge *DoubleGauge `json:"doubleGauge,omitempty"` | |
IntSum *IntSum `json:"intSum,omitempty"` | |
} | |
// DoubleGauge represents a gauge metric with double-precision data points. | |
type DoubleGauge struct { | |
DataPoints []DoubleDataPoint `json:"dataPoints"` | |
} | |
// DoubleDataPoint represents a single data point for a double gauge. | |
type DoubleDataPoint struct { | |
Attributes []KeyValue `json:"attributes"` | |
TimeUnixNano string `json:"timeUnixNano"` | |
Value float64 `json:"value"` | |
} | |
// IntSum represents a sum metric with integer data points. | |
type IntSum struct { | |
IsMonotonic bool `json:"isMonotonic"` | |
AggregationTemporality string `json:"aggregationTemporality"` | |
DataPoints []IntDataPoint `json:"dataPoints"` | |
} | |
// IntDataPoint represents a single data point for an integer sum. | |
type IntDataPoint struct { | |
Attributes []KeyValue `json:"attributes"` | |
TimeUnixNano string `json:"timeUnixNano"` | |
Value int64 `json:"value"` | |
} | |
// LogRecord represents a single log entry. | |
type LogRecord struct { | |
TimeUnixNano string `json:"timeUnixNano"` | |
SeverityNumber int32 `json:"severityNumber"` | |
SeverityText string `json:"severityText"` | |
Name string `json:"name,omitempty"` | |
Body ValueUnion `json:"body"` | |
Attributes []KeyValue `json:"attributes"` | |
} | |
// Command-line flags controlling the tool's behavior. | |
var ( | |
// targetHost specifies a label for the target system being tested. | |
// It's added as a resource attribute and is REQUIRED only in network mode. | |
targetHost = flag.String("target-host", "", "Target host identifier (e.g., service name, cluster name). **Required unless -out-file is specified.**") | |
// otelHTTPEndpoint defines the base URL for OTLP/HTTP export. Ignored in file-only mode. | |
otelHTTPEndpoint = flag.String("http-endpoint", "http://localhost:4318", "OTEL collector HTTP base endpoint (e.g., http://host:port). Ignored if -out-file is set.") | |
// otelGRPCEndpoint defines the host:port for OTLP/gRPC export. Ignored in file-only mode. | |
otelGRPCEndpoint = flag.String("grpc-endpoint", "localhost:4317", "OTEL collector gRPC endpoint (host:port). Ignored if -out-file is set.") | |
// format determines the network protocol (http or grpc). Ignored in file-only mode. | |
format = flag.String("format", "http", "Transport format: http or grpc. Ignored if -out-file is set.") | |
// apiKey provides an API key, used for HTTP 'Authorization: Bearer' header | |
// or potentially as a resource attribute in gRPC/file mode. | |
apiKey = flag.String("api-key", "test-key-123", "API Key for Authorization header (HTTP) or api.key resource attribute.") | |
// grpcInsecure controls TLS usage for gRPC connections. Ignored in file-only mode. | |
grpcInsecure = flag.Bool("grpc-insecure", true, "Use insecure gRPC connection (no TLS). Ignored if -out-file is set.") | |
// totalEvents sets the total number of payloads to generate (-1 for infinite). | |
totalEvents = flag.Int("total", 1000, "Total number of telemetry payloads to generate. -1 means infinite.") | |
// workers defines the number of concurrent goroutines generating/sending data. | |
workers = flag.Int("workers", runtime.NumCPU(), "Number of concurrent worker goroutines.") | |
// rateLimit throttles the payload generation rate (payloads per second, 0 for no limit). | |
rateLimit = flag.Float64("rate", 0, "Target rate limit (payloads per second). 0 means no limit.") | |
// outFile enables file-only mode; if set, generated data is written as JSON lines | |
// (with indentation) to this file, and network connections are skipped. | |
outFile = flag.String("out-file", "", "If specified, run in file-only mode: generate payloads and write as indented JSON lines to this file, skipping network connection.") | |
// verbose enables more detailed logging, especially for sent/failed requests. | |
verbose = flag.Bool("verbose", false, "Enable verbose logging.") | |
) | |
// Global state variables used across the application. | |
var ( | |
// httpClient is the shared HTTP client, initialized only in HTTP network mode. | |
httpClient *http.Client | |
// grpcConn is the shared gRPC connection, initialized only in gRPC network mode. | |
grpcConn *grpc.ClientConn | |
// gRPC service clients, initialized only in gRPC network mode. | |
traceClient coltracepb.TraceServiceClient | |
metricsClient colmetricspb.MetricsServiceClient | |
logsClient collogspb.LogsServiceClient | |
// wg synchronizes the main goroutine with worker goroutines. | |
wg sync.WaitGroup | |
// startTime records the application start time for calculating rates. | |
startTime time.Time | |
// Atomic counters for tracking statistics. | |
successCounter atomic.Uint64 // Payloads successfully sent or written. | |
errorCounter atomic.Uint64 // Payloads failed to send or write. | |
bytesCounter atomic.Uint64 // Total bytes successfully sent (protobuf size) or written (indented JSON size). | |
// outputFile is the file handle used in file-only mode. | |
outputFile *os.File | |
// outputFileMutex protects concurrent writes to the output file. | |
outputFileMutex sync.Mutex | |
// Shared random number generation source and generator. | |
randSource = rand.NewSource(time.Now().UnixNano()) | |
rng = rand.New(randSource) | |
) | |
// Constants used in the application. | |
const ( | |
// reportInterval defines how often statistics are printed to the console. | |
reportInterval = 5 * time.Second | |
// userAgent identifies this tool in network requests. | |
userAgent = "otel-load-tester/1.0" | |
// apiKeyAttrName is the standard OTLP resource attribute name for an API key. | |
apiKeyAttrName = "api.key" | |
) | |
// randomString generates a random hexadecimal string of length n. | |
func randomString(n int) string { | |
const hexChars = "0123456789abcdef" | |
b := make([]byte, n) | |
for i := range b { | |
b[i] = hexChars[rng.Intn(len(hexChars))] | |
} | |
return string(b) | |
} | |
// nowNanoStr returns the current time as nanoseconds since epoch, formatted as a string. | |
func nowNanoStr() string { | |
return strconv.FormatInt(time.Now().UnixNano(), 10) | |
} | |
// futureNanoStr returns a future time (now + ms milliseconds) as nanoseconds | |
// since epoch, formatted as a string. | |
func futureNanoStr(ms int64) string { | |
return strconv.FormatInt(time.Now().Add(time.Duration(ms)*time.Millisecond).UnixNano(), 10) | |
} | |
// strVal wraps a string in a ValueUnion. | |
func strVal(s string) ValueUnion { return ValueUnion{StringValue: &s} } | |
// intVal wraps an int64 in a ValueUnion. | |
func intVal(i int64) ValueUnion { return ValueUnion{IntValue: &i} } | |
// doubleVal wraps a float64 in a ValueUnion. | |
func doubleVal(f float64) ValueUnion { return ValueUnion{DoubleValue: &f} } | |
// boolVal wraps a bool in a ValueUnion. | |
func boolVal(b bool) ValueUnion { return ValueUnion{BoolValue: &b} } | |
// generateRandomTelemetry creates a TelemetryPayload struct populated with | |
// semi-realistic, randomly generated OTLP data (resource, spans, metrics, logs). | |
func generateRandomTelemetry() *TelemetryPayload { | |
payload := &TelemetryPayload{} | |
// Initialize resource attributes, including the target host if provided. | |
payload.Resource.Attributes = []KeyValue{ | |
{Key: "service.name", Value: strVal("agent-federation-service")}, | |
{Key: "service.instance.id", Value: strVal(fmt.Sprintf("load-tester-%s", randomString(8)))}, | |
{Key: "deployment.environment", Value: strVal("production")}, | |
} | |
if *targetHost != "" { | |
payload.Resource.Attributes = append(payload.Resource.Attributes, KeyValue{ | |
Key: "target.host", | |
Value: strVal(*targetHost), | |
}) | |
} | |
if *apiKey != "" { | |
payload.Resource.Attributes = append(payload.Resource.Attributes, KeyValue{ | |
Key: apiKeyAttrName, | |
Value: strVal(*apiKey), | |
}) | |
} | |
// Generate data for a random number of simulated agents (scopes). | |
numAgents := rng.Intn(3) + 1 | |
var scopes []ScopeSpans | |
for i := 0; i < numAgents; i++ { | |
scopes = append(scopes, generateAgentScope(i)) | |
} | |
payload.ScopeSpans = scopes | |
// Generate various metric types based on the generated scopes. | |
var metrics []Metric | |
metrics = append(metrics, randomLatencyMetrics(scopes)...) | |
metrics = append(metrics, randomSuccessCount(scopes)...) | |
metrics = append(metrics, randomTokenUsage(scopes)...) | |
errorCount := randomErrorCount(scopes) | |
if errorCount != nil { | |
metrics = append(metrics, *errorCount) | |
} | |
payload.Metrics = metrics | |
// Generate logs based on the generated scopes. | |
payload.Logs = randomLogs(scopes) | |
return payload | |
} | |
// generateAgentScope creates a ScopeSpans structure representing telemetry | |
// from a single simulated agent, including a root span and a sub-span. | |
func generateAgentScope(agentIndex int) ScopeSpans { | |
agents := []struct { | |
scopeName string | |
scopeVersion string | |
modelName string | |
modelProvider string | |
agentIDPrefix string | |
agentName string | |
userRole string | |
}{ | |
{"openai-agent", "v2.1.0", "gpt-4", "openai", "openai-run", "email-assistant", "automated-assistant"}, | |
{"llama-agent", "v1.4.0", "LLaMa", "facebook", "llama-run", "data-extractor", "data-analyst"}, | |
{"anthropic-agent", "v1.5.0", "claude-v1.3", "anthropic", "anthropic-run", "summary-generator", "content-curator"}, | |
} | |
agentData := agents[agentIndex%len(agents)] | |
traceID := randomString(32) | |
rootSpanID := randomString(16) | |
rootSpan := Span{ | |
TraceID: traceID, | |
SpanID: rootSpanID, | |
Name: "agent.root." + strings.Split(agentData.scopeName, "-")[0], | |
StartTimeUnixNano: nowNanoStr(), | |
EndTimeUnixNano: futureNanoStr(int64(rng.Intn(300) + 300)), | |
Attributes: []KeyValue{ | |
{Key: "ai.agent.id", Value: strVal(fmt.Sprintf("%s-%d", agentData.agentIDPrefix, rng.Intn(1000)))}, | |
{Key: "ai.model.name", Value: strVal(agentData.modelName)}, | |
{Key: "user.id", Value: strVal(fmt.Sprintf("user-%d", rng.Intn(5000)+1000))}, | |
}, | |
Events: []Event{ | |
{TimeUnixNano: nowNanoStr(), Name: "llm.prompt", Attributes: []KeyValue{{Key: "correlation.id", Value: strVal("corr-" + agentData.agentIDPrefix)}}}, | |
{TimeUnixNano: nowNanoStr(), Name: "llm.response", Attributes: []KeyValue{{Key: "llm.total_tokens", Value: intVal(int64(rng.Intn(200) + 70))}}}, | |
}, | |
Status: Status{Code: "STATUS_CODE_OK", Message: "Success"}, | |
} | |
subSpan := Span{ | |
TraceID: traceID, | |
SpanID: randomString(16), | |
ParentSpanID: rootSpanID, | |
Name: "agent.subtask.mcp_tool_call", | |
StartTimeUnixNano: nowNanoStr(), | |
EndTimeUnixNano: futureNanoStr(int64(rng.Intn(150) + 100)), | |
Attributes: []KeyValue{ | |
{Key: "ai.tool.name", Value: strVal(fmt.Sprintf("MCP Tool %d", rng.Intn(999)))}, | |
{Key: "correlation.id", Value: strVal("corr-" + agentData.agentIDPrefix)}, | |
}, | |
Events: []Event{ | |
{TimeUnixNano: nowNanoStr(), Name: "tool.call.start"}, | |
{TimeUnixNano: nowNanoStr(), Name: "tool.call.end", Attributes: []KeyValue{{Key: "tool.response_code", Value: intVal(200)}}}, | |
}, | |
Status: Status{Code: "STATUS_CODE_OK", Message: "Tool success"}, | |
} | |
return ScopeSpans{ | |
Scope: Scope{Name: agentData.scopeName, Version: agentData.scopeVersion}, | |
Spans: []Span{rootSpan, subSpan}, | |
} | |
} | |
// randomLatencyMetrics generates agent latency gauge metrics based on the generated spans. | |
func randomLatencyMetrics(scopes []ScopeSpans) []Metric { | |
var dps []DoubleDataPoint | |
for _, sc := range scopes { | |
agentName, modelName := parseAgentAndModel(sc) | |
dps = append(dps, DoubleDataPoint{ | |
Attributes: []KeyValue{{Key: "agent.name", Value: strVal(agentName)}, {Key: "model.name", Value: strVal(modelName)}}, | |
TimeUnixNano: nowNanoStr(), | |
Value: rng.Float64()*400 + 100, | |
}) | |
} | |
if len(dps) == 0 { | |
return nil | |
} | |
return []Metric{ | |
{Name: "ai.agent.latency", Description: "Latency", Unit: "ms", Data: MetricData{DoubleGauge: &DoubleGauge{DataPoints: dps}}}, | |
} | |
} | |
// randomSuccessCount generates agent success count (sum) metrics. | |
func randomSuccessCount(scopes []ScopeSpans) []Metric { | |
var ips []IntDataPoint | |
for _, sc := range scopes { | |
agentName, _ := parseAgentAndModel(sc) | |
ips = append(ips, IntDataPoint{ | |
Attributes: []KeyValue{{Key: "agent.name", Value: strVal(agentName)}}, | |
TimeUnixNano: nowNanoStr(), | |
Value: 1, | |
}) | |
} | |
if len(ips) == 0 { | |
return nil | |
} | |
return []Metric{ | |
{Name: "ai.agent.success_count", Description: "Successes", Unit: "1", Data: MetricData{IntSum: &IntSum{IsMonotonic: true, AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", DataPoints: ips}}}, | |
} | |
} | |
// randomTokenUsage generates model token usage (sum) metrics. | |
func randomTokenUsage(scopes []ScopeSpans) []Metric { | |
var ips []IntDataPoint | |
for _, sc := range scopes { | |
_, modelName := parseAgentAndModel(sc) | |
ips = append(ips, IntDataPoint{ | |
Attributes: []KeyValue{{Key: "model.name", Value: strVal(modelName)}}, | |
TimeUnixNano: nowNanoStr(), | |
Value: rng.Int63n(300) + 50, | |
}) | |
} | |
if len(ips) == 0 { | |
return nil | |
} | |
return []Metric{ | |
{Name: "ai.model.token_usage", Description: "Tokens", Unit: "tokens", Data: MetricData{IntSum: &IntSum{IsMonotonic: true, AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", DataPoints: ips}}}, | |
} | |
} | |
// randomErrorCount occasionally generates an agent error count (sum) metric. | |
func randomErrorCount(scopes []ScopeSpans) *Metric { | |
if rng.Intn(5) > 0 { | |
return nil | |
} | |
if len(scopes) == 0 { | |
return nil | |
} | |
sc := scopes[rng.Intn(len(scopes))] | |
agentName, _ := parseAgentAndModel(sc) | |
ips := []IntDataPoint{{Attributes: []KeyValue{{Key: "agent.name", Value: strVal(agentName)}}, TimeUnixNano: nowNanoStr(), Value: 1}} | |
return &Metric{Name: "ai.error.count", Description: "Errors", Unit: "1", Data: MetricData{IntSum: &IntSum{IsMonotonic: true, AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", DataPoints: ips}}} | |
} | |
// parseAgentAndModel extracts agent and model names from a ScopeSpans structure, | |
// falling back to defaults or scope name if specific attributes are missing. | |
func parseAgentAndModel(sc ScopeSpans) (string, string) { | |
if len(sc.Spans) == 0 || len(sc.Spans[0].Attributes) == 0 { | |
return "unknown-agent", "unknown-model" | |
} | |
agentName, modelName := "unknown-agent", "unknown-model" | |
for _, attr := range sc.Spans[0].Attributes { | |
if attr.Key == "ai.agent.id" && attr.Value.StringValue != nil { | |
agentName = *attr.Value.StringValue | |
} | |
if attr.Key == "ai.model.name" && attr.Value.StringValue != nil { | |
modelName = *attr.Value.StringValue | |
} | |
} | |
if agentName == "unknown-agent" && sc.Scope.Name != "" { | |
agentName = sc.Scope.Name | |
} | |
return agentName, modelName | |
} | |
// randomLogs generates example log records associated with the generated agent scopes. | |
func randomLogs(scopes []ScopeSpans) []LogRecord { | |
var logs []LogRecord | |
for _, sc := range scopes { | |
agentName, _ := parseAgentAndModel(sc) | |
corrID := "corr-" + agentName + fmt.Sprintf("-%03d", rng.Intn(1000)) | |
logs = append(logs, LogRecord{ | |
TimeUnixNano: nowNanoStr(), | |
SeverityNumber: 9, // INFO | |
SeverityText: "INFO", | |
Name: "agent.step.started", | |
Body: strVal(fmt.Sprintf("%s: Step started.", agentName)), | |
Attributes: []KeyValue{{Key: "step.name", Value: strVal("some-step")}, {Key: "correlation.id", Value: strVal(corrID)}}, | |
}) | |
logs = append(logs, LogRecord{ | |
TimeUnixNano: nowNanoStr(), | |
SeverityNumber: 9, // INFO | |
SeverityText: "INFO", | |
Name: "agent.step.completed", | |
Body: strVal(fmt.Sprintf("%s: Step completed.", agentName)), | |
Attributes: []KeyValue{{Key: "step.name", Value: strVal("some-step")}, {Key: "correlation.id", Value: strVal(corrID)}}, | |
}) | |
} | |
return logs | |
} | |
// convertValueUnion converts the internal ValueUnion struct to the OTLP Protobuf AnyValue type. | |
// This is only required when preparing data for gRPC export. | |
func convertValueUnion(v ValueUnion) *commonpb.AnyValue { | |
if v.StringValue != nil { | |
return &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: *v.StringValue}} | |
} | |
if v.IntValue != nil { | |
return &commonpb.AnyValue{Value: &commonpb.AnyValue_IntValue{IntValue: *v.IntValue}} | |
} | |
if v.DoubleValue != nil { | |
return &commonpb.AnyValue{Value: &commonpb.AnyValue_DoubleValue{DoubleValue: *v.DoubleValue}} | |
} | |
if v.BoolValue != nil { | |
return &commonpb.AnyValue{Value: &commonpb.AnyValue_BoolValue{BoolValue: *v.BoolValue}} | |
} | |
return &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: ""}} | |
} | |
// convertKeyValues converts a slice of internal KeyValue structs to the OTLP Protobuf KeyValue type. | |
// This is only required when preparing data for gRPC export. | |
func convertKeyValues(kvs []KeyValue) []*commonpb.KeyValue { | |
attrs := make([]*commonpb.KeyValue, 0, len(kvs)) | |
for _, kv := range kvs { | |
attrs = append(attrs, &commonpb.KeyValue{ | |
Key: kv.Key, | |
Value: convertValueUnion(kv.Value), | |
}) | |
} | |
return attrs | |
} | |
// MustParseUnixNano converts a string timestamp (nanoseconds since epoch) to uint64. | |
// It logs a warning and returns the current time if parsing fails. | |
func MustParseUnixNano(ts string) uint64 { | |
val, err := strconv.ParseUint(ts, 10, 64) | |
if err != nil { | |
log.Printf("WARN: Failed to parse timestamp '%s': %v. Using current time.", ts, err) | |
return uint64(time.Now().UnixNano()) | |
} | |
return val | |
} | |
// convertPayloadToProto transforms the internal TelemetryPayload struct into | |
// the corresponding OTLP Protobuf Export service request messages. | |
// This function is only called when using the gRPC network mode. | |
func convertPayloadToProto(payload *TelemetryPayload) ( | |
traceReq *coltracepb.ExportTraceServiceRequest, | |
metricsReq *colmetricspb.ExportMetricsServiceRequest, | |
logsReq *collogspb.ExportLogsServiceRequest) { | |
resourceProto := &resourcepb.Resource{ | |
Attributes: convertKeyValues(payload.Resource.Attributes), | |
} | |
// --- Convert Traces --- | |
resourceSpans := make([]*tracepb.ResourceSpans, 0, 1) | |
if len(payload.ScopeSpans) > 0 { | |
scopeSpansProto := make([]*tracepb.ScopeSpans, 0, len(payload.ScopeSpans)) | |
for _, sc := range payload.ScopeSpans { | |
spansProto := make([]*tracepb.Span, 0, len(sc.Spans)) | |
for _, span := range sc.Spans { | |
traceIDBytes, _ := hex.DecodeString(span.TraceID) | |
spanIDBytes, _ := hex.DecodeString(span.SpanID) | |
var parentSpanIDBytes []byte | |
if span.ParentSpanID != "" && span.ParentSpanID != "0000000000000000" { | |
parentSpanIDBytes, _ = hex.DecodeString(span.ParentSpanID) | |
} | |
eventsProto := make([]*tracepb.Span_Event, 0, len(span.Events)) | |
for _, ev := range span.Events { | |
eventsProto = append(eventsProto, &tracepb.Span_Event{ | |
TimeUnixNano: MustParseUnixNano(ev.TimeUnixNano), | |
Name: ev.Name, | |
Attributes: convertKeyValues(ev.Attributes), | |
}) | |
} | |
linksProto := make([]*tracepb.Span_Link, 0, len(span.Links)) | |
for _, link := range span.Links { | |
linkTraceIDBytes, _ := hex.DecodeString(link.TraceID) | |
linksProto = append(linksProto, &tracepb.Span_Link{ | |
TraceId: linkTraceIDBytes, | |
Attributes: convertKeyValues(link.Attributes), | |
}) | |
} | |
var statusCode tracepb.Status_StatusCode | |
switch span.Status.Code { | |
case "STATUS_CODE_OK": | |
statusCode = tracepb.Status_STATUS_CODE_OK | |
case "STATUS_CODE_ERROR": | |
statusCode = tracepb.Status_STATUS_CODE_ERROR | |
default: | |
statusCode = tracepb.Status_STATUS_CODE_UNSET | |
} | |
spansProto = append(spansProto, &tracepb.Span{ | |
TraceId: traceIDBytes, | |
SpanId: spanIDBytes, | |
ParentSpanId: parentSpanIDBytes, | |
Name: span.Name, | |
Kind: tracepb.Span_SPAN_KIND_INTERNAL, | |
StartTimeUnixNano: MustParseUnixNano(span.StartTimeUnixNano), | |
EndTimeUnixNano: MustParseUnixNano(span.EndTimeUnixNano), | |
Attributes: convertKeyValues(span.Attributes), | |
Events: eventsProto, | |
Links: linksProto, | |
Status: &tracepb.Status{Code: statusCode, Message: span.Status.Message}, | |
}) | |
} | |
scopeSpansProto = append(scopeSpansProto, &tracepb.ScopeSpans{ | |
Scope: &commonpb.InstrumentationScope{Name: sc.Scope.Name, Version: sc.Scope.Version}, | |
Spans: spansProto, | |
}) | |
} | |
resourceSpans = append(resourceSpans, &tracepb.ResourceSpans{ | |
Resource: resourceProto, | |
ScopeSpans: scopeSpansProto, | |
}) | |
} | |
traceReq = &coltracepb.ExportTraceServiceRequest{ResourceSpans: resourceSpans} | |
// --- Convert Metrics --- | |
resourceMetrics := make([]*metricspb.ResourceMetrics, 0, 1) | |
if len(payload.Metrics) > 0 { | |
scopeMetricsProto := make([]*metricspb.ScopeMetrics, 0, 1) | |
metricsProto := make([]*metricspb.Metric, 0, len(payload.Metrics)) | |
for _, m := range payload.Metrics { | |
metricPb := &metricspb.Metric{ | |
Name: m.Name, | |
Description: m.Description, | |
Unit: m.Unit, | |
} | |
if m.Data.DoubleGauge != nil { | |
dps := make([]*metricspb.NumberDataPoint, 0, len(m.Data.DoubleGauge.DataPoints)) | |
for _, dp := range m.Data.DoubleGauge.DataPoints { | |
dps = append(dps, &metricspb.NumberDataPoint{ | |
Attributes: convertKeyValues(dp.Attributes), | |
TimeUnixNano: MustParseUnixNano(dp.TimeUnixNano), | |
StartTimeUnixNano: MustParseUnixNano(dp.TimeUnixNano), | |
Value: &metricspb.NumberDataPoint_AsDouble{AsDouble: dp.Value}, | |
}) | |
} | |
metricPb.Data = &metricspb.Metric_Gauge{Gauge: &metricspb.Gauge{DataPoints: dps}} | |
} else if m.Data.IntSum != nil { | |
dps := make([]*metricspb.NumberDataPoint, 0, len(m.Data.IntSum.DataPoints)) | |
for _, dp := range m.Data.IntSum.DataPoints { | |
dps = append(dps, &metricspb.NumberDataPoint{ | |
Attributes: convertKeyValues(dp.Attributes), | |
TimeUnixNano: MustParseUnixNano(dp.TimeUnixNano), | |
StartTimeUnixNano: 0, | |
Value: &metricspb.NumberDataPoint_AsInt{AsInt: dp.Value}, | |
}) | |
} | |
var aggTemporality metricspb.AggregationTemporality | |
switch m.Data.IntSum.AggregationTemporality { | |
case "AGGREGATION_TEMPORALITY_CUMULATIVE": | |
aggTemporality = metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE | |
case "AGGREGATION_TEMPORALITY_DELTA": | |
aggTemporality = metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA | |
default: | |
aggTemporality = metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED | |
} | |
metricPb.Data = &metricspb.Metric_Sum{Sum: &metricspb.Sum{ | |
IsMonotonic: m.Data.IntSum.IsMonotonic, | |
AggregationTemporality: aggTemporality, | |
DataPoints: dps, | |
}} | |
} | |
if metricPb.Data != nil { | |
metricsProto = append(metricsProto, metricPb) | |
} | |
} | |
if len(metricsProto) > 0 { | |
scopeMetricsProto = append(scopeMetricsProto, &metricspb.ScopeMetrics{ | |
Scope: &commonpb.InstrumentationScope{Name: "otel-load-tester"}, | |
Metrics: metricsProto, | |
}) | |
resourceMetrics = append(resourceMetrics, &metricspb.ResourceMetrics{ | |
Resource: resourceProto, | |
ScopeMetrics: scopeMetricsProto, | |
}) | |
} | |
} | |
metricsReq = &colmetricspb.ExportMetricsServiceRequest{ResourceMetrics: resourceMetrics} | |
// --- Convert Logs --- | |
resourceLogs := make([]*logspb.ResourceLogs, 0, 1) | |
if len(payload.Logs) > 0 { | |
scopeLogsProto := make([]*logspb.ScopeLogs, 0, 1) | |
logsRecordsProto := make([]*logspb.LogRecord, 0, len(payload.Logs)) | |
for _, logr := range payload.Logs { | |
logsRecordsProto = append(logsRecordsProto, &logspb.LogRecord{ | |
TimeUnixNano: MustParseUnixNano(logr.TimeUnixNano), | |
ObservedTimeUnixNano: uint64(time.Now().UnixNano()), | |
SeverityNumber: logspb.SeverityNumber(logr.SeverityNumber), | |
SeverityText: logr.SeverityText, | |
Body: convertValueUnion(logr.Body), | |
Attributes: convertKeyValues(logr.Attributes), | |
}) | |
} | |
if len(logsRecordsProto) > 0 { | |
scopeLogsProto = append(scopeLogsProto, &logspb.ScopeLogs{ | |
Scope: &commonpb.InstrumentationScope{Name: "otel-load-tester"}, | |
LogRecords: logsRecordsProto, | |
}) | |
resourceLogs = append(resourceLogs, &logspb.ResourceLogs{ | |
Resource: resourceProto, | |
ScopeLogs: scopeLogsProto, | |
}) | |
} | |
} | |
logsReq = &collogspb.ExportLogsServiceRequest{ResourceLogs: resourceLogs} | |
return traceReq, metricsReq, logsReq | |
} | |
// sendHTTP attempts to send the generated telemetry payload to the configured | |
// OTLP/HTTP endpoint. | |
// This function is only called when in HTTP network mode. | |
func sendHTTP(ctx context.Context, payload *TelemetryPayload) error { | |
traceURL := strings.TrimSuffix(*otelHTTPEndpoint, "/") + "/v1/traces" | |
metricsURL := strings.TrimSuffix(*otelHTTPEndpoint, "/") + "/v1/metrics" | |
logsURL := strings.TrimSuffix(*otelHTTPEndpoint, "/") + "/v1/logs" | |
var firstError error | |
var totalBytes int | |
// Send Traces | |
if len(payload.ScopeSpans) > 0 { | |
jsonData, err := json.Marshal(map[string]interface{}{"resourceSpans": []map[string]interface{}{{"resource": payload.Resource, "scopeSpans": payload.ScopeSpans}}}) | |
if err == nil { | |
err = postJSON(ctx, traceURL, jsonData) | |
if err == nil { | |
totalBytes += len(jsonData) | |
} | |
} | |
if err != nil && firstError == nil { | |
firstError = fmt.Errorf("traces: %w", err) | |
} | |
if *verbose && err == nil { | |
log.Printf("HTTP Trace Sent OK") | |
} | |
} | |
// Send Metrics | |
if len(payload.Metrics) > 0 { | |
scopeMetrics := map[string]interface{}{"scope": map[string]interface{}{}, "metrics": payload.Metrics} | |
jsonData, err := json.Marshal(map[string]interface{}{"resourceMetrics": []map[string]interface{}{{"resource": payload.Resource, "scopeMetrics": []map[string]interface{}{scopeMetrics}}}}) | |
if err == nil { | |
err = postJSON(ctx, metricsURL, jsonData) | |
if err == nil { | |
totalBytes += len(jsonData) | |
} | |
} | |
if err != nil && firstError == nil { | |
firstError = fmt.Errorf("metrics: %w", err) | |
} | |
if *verbose && err == nil { | |
log.Printf("HTTP Metrics Sent OK") | |
} | |
} | |
// Send Logs | |
if len(payload.Logs) > 0 { | |
scopeLogs := map[string]interface{}{"scope": map[string]interface{}{}, "logRecords": payload.Logs} | |
jsonData, err := json.Marshal(map[string]interface{}{"resourceLogs": []map[string]interface{}{{"resource": payload.Resource, "scopeLogs": []map[string]interface{}{scopeLogs}}}}) | |
if err == nil { | |
err = postJSON(ctx, logsURL, jsonData) | |
if err == nil { | |
totalBytes += len(jsonData) | |
} | |
} | |
if err != nil && firstError == nil { | |
firstError = fmt.Errorf("logs: %w", err) | |
} | |
if *verbose && err == nil { | |
log.Printf("HTTP Logs Sent OK") | |
} | |
} | |
// Update global counters (success counter updated by caller worker on no error). | |
if firstError == nil { | |
// successCounter.Add(1) // Now handled by worker | |
bytesCounter.Add(uint64(totalBytes)) | |
} else if *verbose { | |
log.Printf("HTTP Send Batch FAILED (first error): %v", firstError) | |
} | |
return firstError | |
} | |
// postJSON performs an HTTP POST request with the given JSON data to the specified URL. | |
func postJSON(ctx context.Context, url string, data []byte) error { | |
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(data)) | |
if err != nil { | |
return fmt.Errorf("create request: %w", err) | |
} | |
req.Header.Set("Content-Type", "application/json") | |
req.Header.Set("User-Agent", userAgent) | |
if *apiKey != "" { | |
req.Header.Set("Authorization", "Bearer "+*apiKey) | |
} | |
resp, err := httpClient.Do(req) | |
if err != nil { | |
if ctx.Err() != nil { | |
return ctx.Err() | |
} | |
return fmt.Errorf("http do: %w", err) | |
} | |
defer resp.Body.Close() | |
if resp.StatusCode < 200 || resp.StatusCode >= 300 { | |
return fmt.Errorf("http status %d %s", resp.StatusCode, resp.Status) | |
} | |
return nil | |
} | |
// sendGRPC attempts to send the generated telemetry payload to the configured | |
// OTLP/gRPC endpoint. | |
// This function is only called when in gRPC network mode. | |
func sendGRPC(ctx context.Context, payload *TelemetryPayload) error { | |
traceReq, metricsReq, logsReq := convertPayloadToProto(payload) | |
var firstError error | |
var totalBytes int | |
callCtx, callCancel := context.WithTimeout(ctx, 10*time.Second) | |
defer callCancel() | |
// Send Traces | |
if len(traceReq.ResourceSpans) > 0 { | |
_, err := traceClient.Export(callCtx, traceReq) | |
if err != nil && firstError == nil { | |
firstError = fmt.Errorf("traces export: %w", err) | |
} else if err == nil { | |
totalBytes += proto.Size(traceReq) | |
if *verbose { | |
log.Printf("gRPC Trace Sent OK") | |
} | |
} | |
} | |
// Send Metrics | |
if len(metricsReq.ResourceMetrics) > 0 { | |
_, err := metricsClient.Export(callCtx, metricsReq) | |
if err != nil && firstError == nil { | |
firstError = fmt.Errorf("metrics export: %w", err) | |
} else if err == nil { | |
totalBytes += proto.Size(metricsReq) | |
if *verbose { | |
log.Printf("gRPC Metrics Sent OK") | |
} | |
} | |
} | |
// Send Logs | |
if len(logsReq.ResourceLogs) > 0 { | |
_, err := logsClient.Export(callCtx, logsReq) | |
if err != nil && firstError == nil { | |
firstError = fmt.Errorf("logs export: %w", err) | |
} else if err == nil { | |
totalBytes += proto.Size(logsReq) | |
if *verbose { | |
log.Printf("gRPC Logs Sent OK") | |
} | |
} | |
} | |
if ctx.Err() != nil { | |
return ctx.Err() | |
} | |
// Update global counters (success counter updated by caller worker on no error). | |
if firstError == nil { | |
// successCounter.Add(1) // Now handled by worker | |
bytesCounter.Add(uint64(totalBytes)) | |
} else if *verbose { | |
log.Printf("gRPC Send Batch FAILED (first error): %v", firstError) | |
} | |
return firstError | |
} | |
// printStats calculates and prints the current performance statistics. | |
func printStats(final bool) { | |
endTime := time.Now() | |
duration := endTime.Sub(startTime) | |
if duration <= 0 { | |
return | |
} | |
sucCount := successCounter.Load() | |
errCount := errorCounter.Load() | |
byteCount := bytesCounter.Load() | |
totalCount := sucCount + errCount | |
rate := float64(totalCount) / duration.Seconds() | |
byteRate := float64(byteCount) / duration.Seconds() | |
status := "Running" | |
if final { | |
status = "Final" | |
} | |
mode := "Network" | |
if *outFile != "" { | |
mode = "File-Only" | |
} | |
fmt.Printf("[%s] Mode: %s | Time: %s | Total Generated: %d | Success (Sent/Written): %d | Errors: %d | Rate: %.2f req/s | Throughput: %.2f KB/s\n", | |
status, | |
mode, | |
duration.Round(time.Second), | |
totalCount, | |
sucCount, | |
errCount, | |
rate, | |
byteRate/1024.0, | |
) | |
} | |
// main is the application entry point. | |
func main() { | |
flag.Parse() | |
startTime = time.Now() | |
// Determine operating mode based on -out-file flag. | |
isFileOnlyMode := (*outFile != "") | |
// Enforce -target-host only required if *not* in file-only mode. | |
if !isFileOnlyMode && *targetHost == "" { | |
fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0]) | |
flag.PrintDefaults() | |
log.Fatal("Error: -target-host flag is required when not using -out-file.") | |
} | |
// Setup context for graceful shutdown on signals. | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
sigCh := make(chan os.Signal, 1) | |
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) | |
go func() { | |
<-sigCh | |
fmt.Println("\nReceived shutdown signal...") | |
cancel() | |
}() | |
// --- Mode-Specific Setup --- | |
isHTTP := false // Only relevant for network mode. | |
if isFileOnlyMode { | |
// Configure for writing generated payloads to a file. | |
log.Printf(">>> Running in FILE-ONLY mode. Outputting generated JSON payloads to: %s <<<", *outFile) | |
if *targetHost != "" { | |
log.Printf("Target Host specified (for resource attributes): %s", *targetHost) | |
} else { | |
log.Printf("Target Host not specified (optional in file-only mode).") | |
} | |
var err error | |
outputFile, err = os.Create(*outFile) // Create or overwrite the output file. | |
if err != nil { | |
log.Fatalf("Failed to open out-file '%s': %v", *outFile, err) | |
} | |
defer outputFile.Close() // Ensure file closure on exit. | |
// Warn if network flags were provided, as they are ignored. | |
if *otelHTTPEndpoint != "http://localhost:4318" { | |
log.Printf("WARN: -http-endpoint provided but ignored in file-only mode.") | |
} | |
if *otelGRPCEndpoint != "localhost:4317" { | |
log.Printf("WARN: -grpc-endpoint provided but ignored in file-only mode.") | |
} | |
if *format != "http" { | |
log.Printf("WARN: -format provided but ignored in file-only mode.") | |
} | |
if *grpcInsecure != true { | |
log.Printf("WARN: -grpc-insecure provided but ignored in file-only mode.") | |
} | |
} else { | |
// Configure for sending payloads over the network. | |
log.Printf(">>> Running in NETWORK mode. Target Host: %s <<<", *targetHost) | |
isHTTP = strings.ToLower(*format) == "http" | |
if isHTTP { | |
// Initialize shared HTTP client for sending OTLP/JSON. | |
log.Printf("Network Format: HTTP. Sending telemetry to %s", *otelHTTPEndpoint) | |
transport := &http.Transport{ | |
Proxy: http.ProxyFromEnvironment, | |
DialContext: (&net.Dialer{ | |
Timeout: 30 * time.Second, | |
KeepAlive: 30 * time.Second, | |
}).DialContext, | |
MaxIdleConns: *workers * 2, | |
MaxIdleConnsPerHost: *workers, | |
IdleConnTimeout: 90 * time.Second, | |
TLSHandshakeTimeout: 10 * time.Second, | |
} | |
httpClient = &http.Client{ | |
Transport: transport, | |
Timeout: 15 * time.Second, | |
} | |
} else { | |
// Initialize shared gRPC client for sending OTLP/Protobuf. | |
log.Printf("Network Format: gRPC.") | |
var dialOpts []grpc.DialOption | |
if *grpcInsecure { | |
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
log.Printf("Using insecure gRPC connection.") | |
} else { | |
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} | |
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) | |
log.Printf("Using secure gRPC connection (TLS).") | |
} | |
dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent)) | |
// Prepare target address for gRPC dialing. | |
var grpcTarget string | |
parsedURL, err := url.Parse(*otelGRPCEndpoint) | |
if err == nil && (parsedURL.Scheme == "http" || parsedURL.Scheme == "https") { | |
grpcTarget = parsedURL.Host | |
} else if err == nil && parsedURL.Scheme != "" { | |
grpcTarget = *otelGRPCEndpoint | |
} else { | |
grpcTarget = *otelGRPCEndpoint | |
} | |
if !strings.Contains(grpcTarget, ":") && !strings.Contains(grpcTarget, "/") { | |
grpcTarget += ":4317" // Append default port if missing | |
} | |
log.Printf("Dialing gRPC endpoint: %s (Insecure Flag: %v)", grpcTarget, *grpcInsecure) | |
dialCtx, dialCancel := context.WithTimeout(ctx, 10*time.Second) | |
var grpcErr error | |
grpcConn, grpcErr = grpc.DialContext(dialCtx, grpcTarget, dialOpts...) | |
dialCancel() | |
if grpcErr != nil { | |
log.Fatalf("Failed to dial gRPC server at %s: %v", grpcTarget, grpcErr) | |
} | |
// Setup graceful closure of the connection on context cancellation. | |
go func() { | |
<-ctx.Done() | |
log.Println("Shutting down gRPC connection...") | |
if grpcConn != nil { | |
grpcConn.Close() | |
} | |
}() | |
// Create OTLP service clients from the connection. | |
traceClient = coltracepb.NewTraceServiceClient(grpcConn) | |
metricsClient = colmetricspb.NewMetricsServiceClient(grpcConn) | |
logsClient = collogspb.NewLogsServiceClient(grpcConn) | |
log.Printf("Sending telemetry via gRPC to %s", grpcTarget) | |
} | |
} | |
// --- Setup Rate Limiter --- | |
var limiter <-chan time.Time | |
if *rateLimit > 0 { | |
if *rateLimit < 1e-9 { | |
log.Printf("WARN: Rate limit %.2f req/s is too low, running unlimited.", *rateLimit) | |
} else { | |
interval := time.Duration(1e9 / *rateLimit) * time.Nanosecond | |
if interval <= 0 { | |
log.Printf("WARN: Rate limit %.2f req/s is too high, running unlimited.", *rateLimit) | |
} else { | |
ticker := time.NewTicker(interval) | |
limiter = ticker.C | |
defer ticker.Stop() // Ensure ticker resources are released | |
log.Printf("Rate limiting enabled: interval %s (~%.2f req/s)", interval, *rateLimit) | |
} | |
} | |
} | |
// --- Start Reporting Goroutine --- | |
reportTicker := time.NewTicker(reportInterval) | |
defer reportTicker.Stop() // Ensure ticker resources are released | |
go func() { | |
for { | |
select { | |
case <-reportTicker.C: | |
printStats(false) // Periodically print stats | |
case <-ctx.Done(): | |
log.Println("Stopping reporter.") | |
return // Exit on main context cancellation | |
} | |
} | |
}() | |
// --- Start Workers --- | |
// eventsCh acts as a work queue and semaphore for workers. | |
eventsCh := make(chan struct{}, *workers) | |
log.Printf("Starting %d workers, Target Payloads: %d", *workers, *totalEvents) | |
// --- Producer Goroutine --- | |
// Responsible for pushing work signals onto eventsCh, respecting rate limits and total count. | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
defer close(eventsCh) // Signal workers that no more work is coming | |
count := 0 | |
for { | |
// Check for shutdown signal *before* potentially blocking operations. | |
select { | |
case <-ctx.Done(): | |
log.Println("Producer stopping due to context cancellation.") | |
return | |
default: // Non-blocking check | |
} | |
// Check if total event limit reached. | |
if *totalEvents != -1 && count >= *totalEvents { | |
log.Println("Producer reached total events limit.") | |
return | |
} | |
// Wait for rate limiter if enabled. | |
if limiter != nil { | |
select { | |
case <-limiter: // Wait for tick | |
case <-ctx.Done(): | |
log.Println("Producer stopping during rate limit wait.") | |
return | |
} | |
} | |
// Send work signal to a worker; blocks if channel buffer is full. | |
select { | |
case eventsCh <- struct{}{}: | |
count++ | |
case <-ctx.Done(): | |
log.Println("Producer stopping while trying to send to worker.") | |
return | |
} | |
} | |
}() | |
// --- Consumer (Worker) Goroutines --- | |
// Each worker pulls work signals from eventsCh, generates data, and processes it (sends or writes). | |
for w := 0; w < *workers; w++ { | |
wg.Add(1) | |
go func(workerID int) { | |
defer wg.Done() // Signal completion when exiting | |
// Process work items from the channel until it's closed. | |
for range eventsCh { | |
// Check for shutdown signal *before* doing work. | |
select { | |
case <-ctx.Done(): | |
return // Context cancelled, exit worker | |
default: // Non-blocking check | |
} | |
// Generate the telemetry data payload. | |
payload := generateRandomTelemetry() | |
var opErr error // Store error from send/write operation | |
// Execute logic based on the operating mode (File-Only or Network). | |
if isFileOnlyMode { | |
// Marshal payload to indented JSON. | |
jsonData, err := json.MarshalIndent(payload, "", " ") // Use MarshalIndent for pretty printing | |
if err != nil { | |
opErr = fmt.Errorf("failed to marshal payload to JSON: %w", err) | |
} else { | |
// Write JSON data to the file, protected by a mutex. | |
outputFileMutex.Lock() | |
_, writeErr := outputFile.Write(jsonData) | |
if writeErr == nil { | |
_, writeErr = outputFile.WriteString("\n") // Add newline separator | |
} | |
outputFileMutex.Unlock() | |
opErr = writeErr // Store potential write error | |
} | |
// Update counters if the write was successful. | |
if opErr == nil { | |
successCounter.Add(1) | |
bytesCounter.Add(uint64(len(jsonData))) // Track indented JSON size | |
// Avoid excessive logging in verbose mode for file writes. | |
} | |
} else { | |
// Send payload over the network (HTTP or gRPC). | |
if isHTTP { | |
opErr = sendHTTP(ctx, payload) | |
} else { | |
opErr = sendGRPC(ctx, payload) | |
} | |
// Success/byte counters are updated within send functions in network mode. | |
} | |
// Handle errors that occurred during the operation. | |
if opErr != nil { | |
// If the context was cancelled *during* the operation, don't count it as a failure. | |
if ctx.Err() != nil { | |
return // Exit worker cleanly on cancellation | |
} | |
// Otherwise, increment the error counter. | |
errorCounter.Add(1) | |
if *verbose { | |
log.Printf("Worker %d ERROR: %v", workerID, opErr) | |
} | |
} | |
} // Worker loop ends when eventsCh is closed | |
}(w) | |
} | |
// Wait for the producer and all workers to finish their execution. | |
wg.Wait() | |
// Print the final summary statistics. | |
log.Println("All workers finished.") | |
printStats(true) | |
log.Println("Shutdown complete.") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
First commit