|
package main |
|
|
|
import ( |
|
"context" |
|
"flag" |
|
"log" |
|
"net/http" |
|
"os" |
|
"os/signal" |
|
"sync/atomic" |
|
"syscall" |
|
"time" |
|
|
|
"go.opentelemetry.io/otel/attribute" |
|
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric" |
|
pushclient "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" |
|
"go.opentelemetry.io/otel/metric/global" |
|
"go.opentelemetry.io/otel/metric/instrument" |
|
"go.opentelemetry.io/otel/metric/instrument/syncfloat64" |
|
"go.opentelemetry.io/otel/metric/instrument/syncint64" |
|
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" |
|
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" |
|
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" |
|
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" |
|
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple" |
|
"go.opentelemetry.io/otel/sdk/resource" |
|
) |
|
|
|
var ( |
|
collectorEndpoint = flag.String("vm.endpoint", "localhost:8428", "VictoriaMetrics endpoint - host:port.") |
|
collectorURL = flag.String("vm.ingestPath", "/opentelemetry/api/v1/push", "url path for ingestion path.") |
|
isSecure = flag.Bool("vm.isSecure", false, "enables https connection for metrics push.") |
|
pushInterval = flag.Duration("vm.pushInterval", 10*time.Second, "how often push samples, aka scrapeInterval at pull model.") |
|
jobName = flag.String("metrics.jobName", "otlp", "job name for web-application.") |
|
instanceName = flag.String("metrics.instance", "localhost", "hostname of web-application instance.") |
|
) |
|
|
|
var ( |
|
requestsCount syncint64.Counter |
|
requestsLatency syncfloat64.Histogram |
|
activeRequests int64 |
|
) |
|
|
|
// Initializes an OTLP exporter, and configures the corresponding |
|
// metric providers. |
|
func initMetrics(ctx context.Context) func(ctx context.Context) error { |
|
options := []pushclient.Option{ |
|
pushclient.WithEndpoint(*collectorEndpoint), |
|
pushclient.WithURLPath(*collectorURL), |
|
} |
|
if !*isSecure { |
|
options = append(options, pushclient.WithInsecure()) |
|
} |
|
c := pushclient.NewClient( |
|
options..., |
|
) |
|
metricExporter, err := otlpmetric.New(ctx, c) |
|
if err != nil { |
|
handleErr(err, "cannot create exporter") |
|
} |
|
|
|
resourceConfig, err := resource.New(ctx, resource.WithAttributes(attribute.String("job", *jobName), attribute.String("instance", *instanceName))) |
|
if err != nil { |
|
handleErr(err, "cannot create meter resource") |
|
} |
|
meterController := controller.New( |
|
processor.NewFactory( |
|
selector.NewWithHistogramDistribution( |
|
histogram.WithExplicitBoundaries([]float64{0.01, 0.05, 0.1, 0.5, 0.9, 1.0, 5.0, 10.0, 100.0}), |
|
), |
|
aggregation.CumulativeTemporalitySelector(), |
|
processor.WithMemory(true), |
|
), |
|
controller.WithExporter(metricExporter), |
|
controller.WithCollectPeriod(*pushInterval), |
|
controller.WithResource(resourceConfig), |
|
) |
|
if err := meterController.Start(ctx); err != nil { |
|
handleErr(err, "cannot start meter controller") |
|
} |
|
|
|
global.SetMeterProvider(meterController) |
|
|
|
prov := global.MeterProvider().Meter("") |
|
requestsLatency, err = prov.SyncFloat64().Histogram("http_request_latency_seconds") |
|
if err != nil { |
|
handleErr(err, "cannot create histogram") |
|
} |
|
requestsCount, err = prov.SyncInt64().Counter("http_requests_total") |
|
if err != nil { |
|
handleErr(err, "cannot create counter") |
|
} |
|
ar, err := prov.AsyncInt64().Gauge("http_active_requests") |
|
if err != nil { |
|
handleErr(err, "cannot create gauge") |
|
} |
|
if err := prov.RegisterCallback([]instrument.Asynchronous{ar}, func(ctx context.Context) { |
|
ar.Observe(ctx, atomic.LoadInt64(&activeRequests)) |
|
}); err != nil { |
|
handleErr(err, "cannot register callback") |
|
} |
|
return metricExporter.Shutdown |
|
} |
|
|
|
type metricMiddleWare struct { |
|
ctx context.Context |
|
h http.Handler |
|
} |
|
|
|
func (m *metricMiddleWare) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
|
t := time.Now() |
|
path := r.URL.Path |
|
requestsCount.Add(m.ctx, 1, attribute.String("path", path)) |
|
atomic.AddInt64(&activeRequests, 1) |
|
m.h.ServeHTTP(w, r) |
|
atomic.AddInt64(&activeRequests, -1) |
|
requestsLatency.Record(m.ctx, time.Since(t).Seconds(), attribute.String("path", path)) |
|
} |
|
|
|
func main() { |
|
flag.Parse() |
|
log.Printf("Starting web server...") |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
shutdown := initMetrics(ctx) |
|
mux := http.NewServeMux() |
|
mux.HandleFunc("/api/fast", func(writer http.ResponseWriter, request *http.Request) { |
|
writer.WriteHeader(http.StatusOK) |
|
writer.Write([]byte(`fast ok`)) |
|
}) |
|
mux.HandleFunc("/api/slow", func(writer http.ResponseWriter, request *http.Request) { |
|
time.Sleep(time.Second * 2) |
|
writer.WriteHeader(http.StatusOK) |
|
writer.Write([]byte(`slow ok`)) |
|
}) |
|
m := &metricMiddleWare{ |
|
ctx: ctx, |
|
h: mux, |
|
} |
|
mustStop := make(chan os.Signal) |
|
signal.Notify(mustStop, os.Interrupt, syscall.SIGTERM) |
|
go func() { |
|
http.ListenAndServe("localhost:8081", m) |
|
}() |
|
<-mustStop |
|
log.Println("receive shutdown signal, stopping webserver") |
|
|
|
if err := shutdown(ctx); err != nil { |
|
log.Println("cannot shutdown metric provider ", err) |
|
} |
|
|
|
cancel() |
|
log.Printf("Done!") |
|
} |
|
|
|
func handleErr(err error, message string) { |
|
if err != nil { |
|
log.Fatalf("%s: %v", message, err) |
|
} |
|
} |